veilid_core/rpc_processor/
rpc_signal.rs1use super::*;
2
3impl RPCProcessor {
4 #[instrument(level = "trace", target = "rpc", skip(self), ret, err)]
7 pub async fn rpc_call_signal(
8 &self,
9 dest: Destination,
10 signal_info: SignalInfo,
11 ) -> RPCNetworkResult<()> {
12 let _guard = self
13 .startup_context
14 .startup_lock
15 .enter()
16 .map_err(RPCError::map_try_again("not started up"))?;
17
18 if matches!(
20 dest,
21 Destination::PrivateRoute {
22 private_route: _,
23 safety_selection: _
24 }
25 ) {
26 return Err(RPCError::internal(
27 "Never send signal requests over private routes",
28 ));
29 }
30
31 let signal = RPCOperationSignal::new(signal_info);
32 let statement = RPCStatement::new(RPCStatementDetail::Signal(Box::new(signal)));
33
34 self.statement(dest, statement).await
36 }
37
38 #[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
41 pub(super) async fn process_signal(&self, msg: Message) -> RPCNetworkResult<()> {
42 let routing_table = self.routing_table();
44
45 let has_capability_signal = routing_table
46 .get_published_peer_info(msg.header.routing_domain())
47 .map(|ppi| {
48 ppi.signed_node_info()
49 .node_info()
50 .has_capability(CAP_SIGNAL)
51 })
52 .unwrap_or(false);
53 if !has_capability_signal {
54 return Ok(NetworkResult::service_unavailable(
55 "signal is not available",
56 ));
57 }
58
59 let flow = match &msg.header.detail {
62 RPCMessageHeaderDetail::Direct(d) => d.flow,
63 RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
64 return Ok(NetworkResult::invalid_message("signal must be direct"));
65 }
66 };
67
68 let (_, _, kind) = msg.operation.destructure();
70 let signal = match kind {
71 RPCOperationKind::Statement(s) => match s.destructure() {
72 RPCStatementDetail::Signal(s) => s,
73 _ => panic!("not a signal"),
74 },
75 _ => panic!("not a statement"),
76 };
77
78 let network_manager = self.network_manager();
80 let signal_info = signal.destructure();
81 network_manager
82 .handle_signal(flow, signal_info)
83 .await
84 .map_err(RPCError::network)
85 }
86}