use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
#[cfg_attr(feature = "instrument", instrument(level = "trace", target = "rpc", skip(self), ret, err(level=Level::DEBUG)))]
pub async fn rpc_call_signal(
&self,
dest: Destination,
signal_info: SignalInfo,
) -> RPCNetworkResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
if matches!(
dest,
Destination::PrivateRoute {
private_route: _,
safety_selection: _
}
) {
return Err(RPCError::internal(
"Never send signal requests over private routes",
));
}
let signal = RPCOperationSignal::new(signal_info);
let statement = RPCStatement::new(RPCStatementDetail::Signal(Box::new(signal)));
self.statement(dest, statement, None, None).await
}
#[cfg_attr(feature = "instrument", instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(super) async fn process_signal(&self, msg: Message) -> RPCNetworkResult<()> {
let routing_table = self.routing_table();
let has_capability_signal = routing_table
.get_published_peer_info(msg.header.routing_domain())
.map(|ppi| ppi.node_info().has_capability(VEILID_CAPABILITY_SIGNAL))
.unwrap_or(false);
if !has_capability_signal {
return Ok(NetworkResult::service_unavailable(
"signal is not available",
));
}
let flow = match &msg.header.detail {
RPCMessageHeaderDetail::Direct(d) => d.flow,
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
return Ok(NetworkResult::invalid_message("signal must be direct"));
}
};
let (_, _, kind) = msg.operation.destructure();
let signal = match kind {
RPCOperationKind::Statement(s) => match s.destructure() {
RPCStatementDetail::Signal(s) => s,
_ => panic!("not a signal"),
},
_ => panic!("not a statement"),
};
let network_manager = self.network_manager();
let signal_info = signal.destructure();
network_manager
.handle_signal(flow, signal_info)
.await
.map_err(RPCError::network)
}
}