1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
use super::*;
impl_veilid_log_facility!("rpc");
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default)]
pub struct StatusResult {
pub opt_sender_info: Option<SenderInfo>,
pub opt_previous_sender_info: Option<SenderInfo>,
}
impl RPCProcessor {
// Send StatusQ RPC request, receive StatusA answer
// Can be sent via relays or routes, but will have less information via routes
// sender:
// unsafe -> node status
// safe -> nothing
// receiver:
// direct -> node status + sender info
// safety -> node status
// private -> nothing
#[cfg_attr(feature = "instrument", instrument(level = "trace", target = "rpc", skip(self), ret, err(level=Level::DEBUG)))]
pub async fn rpc_call_status(
&self,
dest: Destination,
) -> RPCNetworkResult<Answer<StatusResult>> {
let _guard = self
.startup_context
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
// Determine routing domain and node status to send
let routing_table = self.routing_table();
let (opt_target_nr, routing_domain, node_status) = if let Some(UnsafeRoutingInfo {
opt_node,
opt_routing_domain,
}) =
dest.get_unsafe_routing_info(&routing_table)
{
let Some(routing_domain) = opt_routing_domain else {
// Because this exits before calling 'question()',
// a failure to find a routing domain constitutes a send failure
// Record the send failure on the node
let send_ts = Timestamp::now_non_decreasing();
if let Some(node) = &opt_node {
self.record_send_failure(RPCKind::Question, send_ts, node.clone(), None, None);
}
return Ok(NetworkResult::no_connection_other(
"no routing domain for target",
));
};
let node_status = Some(self.network_manager().generate_node_status(routing_domain));
(opt_node, routing_domain, node_status)
} else {
// Safety route means we don't exchange node status and things are all PublicInternet RoutingDomain
(None, RoutingDomain::PublicInternet, None)
};
// Create status rpc question
let status_q = RPCOperationStatusQ::new(node_status);
let question = RPCQuestion::new(
network_result_try!(self.get_destination_respond_to(&dest).await?),
RPCQuestionDetail::StatusQ(Box::new(status_q)),
);
let debug_string = format!("Status => {}", dest);
// Send the info request
let waitable_reply =
network_result_try!(self.question(dest.clone(), question, None, None).await?);
// Note what kind of ping this was and to what peer scope
let send_data_method = waitable_reply.context.send_data_result.clone();
// Wait for reply
let (msg, answer_context) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
TimeoutOr::Value(v) => v,
};
// Get the right answer type
let (_, _, kind) = msg.operation.destructure();
let status_a = match kind {
RPCOperationKind::Answer(a) => match a.destructure() {
RPCAnswerDetail::StatusA(a) => a,
_ => return Ok(NetworkResult::invalid_message("not a status answer")),
},
_ => return Ok(NetworkResult::invalid_message("not an answer")),
};
let (a_node_status, sender_info) = status_a.destructure();
// Ensure the returned node status is the kind for the routing domain we asked for
if let Some(target_nr) = opt_target_nr {
if let Some(a_node_status) = a_node_status {
// Update latest node status in routing table
target_nr.update_node_status(routing_domain, a_node_status.clone());
}
}
// Report sender_info IP addresses to network manager
// Don't need to validate these addresses for the current routing domain
// the address itself is irrelevant, and the remote node can lie anyway
let mut opt_sender_info = None;
let mut opt_previous_sender_info = None;
match dest {
Destination::Direct {
node: target,
safety_selection,
} => {
if matches!(safety_selection, SafetySelection::Unsafe(_)) {
if let Some(sender_info) = sender_info {
if send_data_method.is_direct() {
// Directly requested status that actually gets sent directly and not over a relay will tell us what our IP address appears as
// If this changes, we'd want to know about that to reset the networking stack
opt_previous_sender_info = target.report_sender_info(
routing_domain,
send_data_method.unique_flow().flow.protocol_type(),
send_data_method.unique_flow().flow.address_type(),
sender_info,
);
};
opt_sender_info = Some(sender_info);
// Report ping status results to network manager
if let Err(e) = self.event_bus().post(SocketAddressChangeEvent {
routing_domain,
socket_address: sender_info.socket_address,
old_socket_address: opt_previous_sender_info.map(|s| s.socket_address),
flow: send_data_method.unique_flow().flow,
reporting_peer: target.unfiltered(),
}) {
veilid_log!(self debug "Failed to post event: {}", e);
}
}
}
}
Destination::Relay {
relay_di: _,
node: _,
}
| Destination::PrivateRoute {
private_route: _,
safety_selection: _,
} => {
// sender info is irrelevant over relays and routes
}
};
Ok(NetworkResult::value(Answer::new(
answer_context,
StatusResult {
opt_sender_info,
opt_previous_sender_info,
},
)))
}
////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg_attr(feature = "instrument", instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(super) async fn process_status_q(&self, msg: Message) -> RPCNetworkResult<()> {
// Get the question
let kind = msg.operation.kind().clone();
let status_q = match kind {
RPCOperationKind::Question(q) => match q.destructure() {
(_, RPCQuestionDetail::StatusQ(q)) => q,
_ => panic!("not a status question"),
},
_ => panic!("not a question"),
};
let q_node_status = status_q.destructure();
let (node_status, sender_info) = match &msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => {
let flow = detail.flow;
let routing_domain = detail.routing_domain;
// Ensure the node status from the question is the kind for the routing domain we received the request in
if let Some(q_node_status) = q_node_status {
// update node status for the requesting node to our routing table
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
// Update latest node status in routing table for the statusq sender
sender_nr.update_node_status(routing_domain, q_node_status.clone());
}
}
// Get the peer address in the returned sender info
let sender_info = SenderInfo {
socket_address: *flow.remote_address(),
};
// Make status answer
let node_status = self.network_manager().generate_node_status(routing_domain);
(Some(node_status), Some(sender_info))
}
RPCMessageHeaderDetail::SafetyRouted(_) => {
// Make status answer
let node_status = self
.network_manager()
.generate_node_status(RoutingDomain::PublicInternet);
(Some(node_status), None)
}
RPCMessageHeaderDetail::PrivateRouted(_) => (None, None),
};
// Make status answer
let status_a = RPCOperationStatusA::new(node_status, sender_info);
// Send status answer
self.answer(
msg,
RPCAnswer::new(RPCAnswerDetail::StatusA(Box::new(status_a))),
None,
)
.await
}
}