use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
#[cfg_attr(feature = "instrument", instrument(level = "trace", target = "rpc", skip(self, value), err(level=Level::DEBUG)))]
pub async fn rpc_call_value_changed(
&self,
dest: Destination,
record_key: OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
watch_id: u64,
value: Option<SignedValueData>,
) -> RPCNetworkResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
if matches!(dest.get_safety_selection(), SafetySelection::Safe(_)) {
return Err(RPCError::internal(
"Never send value changes over safety routes",
));
}
let value_changed =
RPCOperationValueChanged::new(record_key, subkeys, count, watch_id, value)?;
let statement =
RPCStatement::new(RPCStatementDetail::ValueChanged(Box::new(value_changed)));
self.statement(dest, statement, None, None).await
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "rpc", skip_all)
)]
pub(super) async fn process_value_changed(&self, msg: Message) -> RPCNetworkResult<()> {
let (_, _, kind) = msg.operation.destructure();
let (key, subkeys, count, watch_id, value) = match kind {
RPCOperationKind::Statement(s) => match s.destructure() {
RPCStatementDetail::ValueChanged(s) => s.destructure(),
_ => panic!("not a value changed statement"),
},
_ => panic!("not a statement"),
};
let inbound_node_id = match &msg.header.detail {
RPCMessageHeaderDetail::Direct(d) => d.envelope.get_sender_id(),
RPCMessageHeaderDetail::SafetyRouted(_) => {
return Ok(NetworkResult::invalid_message(
"not processing value change over only a safety route",
));
}
RPCMessageHeaderDetail::PrivateRouted(p) => self
.routing_table()
.generate_node_id(&p.remote_safety_route)
.map_err(RPCError::internal)?,
};
if debug_target_enabled!("dht") {
let debug_string_value = value
.as_ref()
.map(|v| format!(" {}", v))
.unwrap_or_default();
let debug_string_stmt = format!(
"IN <== ValueChanged(id={} {} #{:?}+{}{}) from {} <= {}",
watch_id,
key,
subkeys,
count,
debug_string_value,
inbound_node_id,
msg.header.direct_sender_node_id(),
);
veilid_log!(self debug target: "dht", "{}", debug_string_stmt);
}
let storage_manager = self.storage_manager();
storage_manager
.inbound_value_changed(
key,
subkeys,
count,
value.map(Arc::new),
inbound_node_id,
watch_id,
)
.await
.map_err(RPCError::internal)
}
}