veilid_core/rpc_processor/fanout/
fanout_call.rs1use super::*;
2
3impl_veilid_log_facility!("fanout");
4
5#[derive(Debug)]
6struct FanoutContext<'a> {
7 fanout_queue: FanoutQueue<'a>,
8 result: FanoutResult,
9 done: bool,
10}
11
12#[derive(Debug, Copy, Clone, Default)]
13pub enum FanoutResultKind {
14 #[default]
15 Incomplete,
16 Timeout,
17 Consensus,
18 Exhausted,
19}
20impl FanoutResultKind {
21 pub fn is_incomplete(&self) -> bool {
22 matches!(self, Self::Incomplete)
23 }
24}
25
26#[derive(Clone, Debug, Default)]
27pub struct FanoutResult {
28 pub kind: FanoutResultKind,
30 pub consensus_nodes: Vec<NodeRef>,
33 pub value_nodes: Vec<NodeRef>,
35}
36
37impl fmt::Display for FanoutResult {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 let kc = match self.kind {
40 FanoutResultKind::Incomplete => "I",
41 FanoutResultKind::Timeout => "T",
42 FanoutResultKind::Consensus => "C",
43 FanoutResultKind::Exhausted => "E",
44 };
45 if f.alternate() {
46 write!(
47 f,
48 "{}:{}[{}]",
49 kc,
50 self.consensus_nodes.len(),
51 self.consensus_nodes
52 .iter()
53 .map(|x| x.to_string())
54 .collect::<Vec<_>>()
55 .join(","),
56 )
57 } else {
58 write!(f, "{}:{}", kc, self.consensus_nodes.len())
59 }
60 }
61}
62
63pub fn debug_fanout_results(results: &[FanoutResult]) -> String {
64 let mut col = 0;
65 let mut out = String::new();
66 let mut left = results.len();
67 for r in results {
68 if col == 0 {
69 out += " ";
70 }
71 let sr = format!("{}", r);
72 out += &sr;
73 out += ",";
74 col += 1;
75 left -= 1;
76 if col == 32 && left != 0 {
77 col = 0;
78 out += "\n"
79 }
80 }
81 out
82}
83
84#[derive(Debug)]
85pub struct FanoutCallOutput {
86 pub peer_info_list: Vec<Arc<PeerInfo>>,
87 pub disposition: FanoutCallDisposition,
88}
89
90#[derive(Debug)]
92pub enum FanoutCallDisposition {
93 Timeout,
95 Invalid,
97 Rejected,
99 Stale,
102 Accepted,
105 AcceptedNewerRestart,
108 AcceptedNewer,
112}
113
114pub type FanoutCallResult = Result<FanoutCallOutput, RPCError>;
115pub type FanoutNodeInfoFilter = Arc<dyn (Fn(&[TypedNodeId], &NodeInfo) -> bool) + Send + Sync>;
116pub type FanoutCheckDone = Arc<dyn (Fn(&FanoutResult) -> bool) + Send + Sync>;
117pub type FanoutCallRoutine =
118 Arc<dyn (Fn(NodeRef) -> PinBoxFutureStatic<FanoutCallResult>) + Send + Sync>;
119
120pub fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter {
121 Arc::new(|_, _| true)
122}
123
124pub fn capability_fanout_node_info_filter(caps: Vec<VeilidCapability>) -> FanoutNodeInfoFilter {
125 Arc::new(move |_, ni| ni.has_all_capabilities(&caps))
126}
127
128pub(crate) struct FanoutCall<'a> {
148 routing_table: &'a RoutingTable,
149 hash_coordinate: TypedHashDigest,
150 node_count: usize,
151 fanout_tasks: usize,
152 consensus_count: usize,
153 timeout_us: TimestampDuration,
154 node_info_filter: FanoutNodeInfoFilter,
155 call_routine: FanoutCallRoutine,
156 check_done: FanoutCheckDone,
157}
158
159impl VeilidComponentRegistryAccessor for FanoutCall<'_> {
160 fn registry(&self) -> VeilidComponentRegistry {
161 self.routing_table.registry()
162 }
163}
164
165impl<'a> FanoutCall<'a> {
166 #[allow(clippy::too_many_arguments)]
167 pub fn new(
168 routing_table: &'a RoutingTable,
169 hash_coordinate: TypedHashDigest,
170 node_count: usize,
171 fanout_tasks: usize,
172 consensus_count: usize,
173 timeout_us: TimestampDuration,
174 node_info_filter: FanoutNodeInfoFilter,
175 call_routine: FanoutCallRoutine,
176 check_done: FanoutCheckDone,
177 ) -> Self {
178 Self {
179 routing_table,
180 hash_coordinate,
181 node_count,
182 fanout_tasks,
183 consensus_count,
184 timeout_us,
185 node_info_filter,
186 call_routine,
187 check_done,
188 }
189 }
190
191 #[instrument(level = "trace", target = "fanout", skip_all)]
192 fn evaluate_done(&self, ctx: &mut FanoutContext) -> bool {
193 if ctx.done {
195 return true;
196 }
197
198 let fanout_result = ctx.fanout_queue.with_nodes(|nodes, sorted_nodes| {
200 let mut consensus: Option<bool> = None;
202 let mut consensus_nodes: Vec<NodeRef> = vec![];
203 let mut value_nodes: Vec<NodeRef> = vec![];
204 for sn in sorted_nodes {
205 let node = nodes.get(sn).unwrap();
206 match node.status {
207 FanoutNodeStatus::Queued | FanoutNodeStatus::InProgress => {
208 if consensus.is_none() {
211 consensus = Some(false);
212 }
213 }
214 FanoutNodeStatus::Timeout
215 | FanoutNodeStatus::Rejected
216 | FanoutNodeStatus::Disqualified => {
217 }
219 FanoutNodeStatus::Stale => {
220 value_nodes.push(node.node_ref.clone());
222 }
223 FanoutNodeStatus::Accepted => {
224 value_nodes.push(node.node_ref.clone());
226
227 consensus_nodes.push(node.node_ref.clone());
228 if consensus.is_none() && consensus_nodes.len() >= self.consensus_count {
229 consensus = Some(true);
230 }
231 }
232 }
233 }
234
235 match consensus {
237 Some(true) => FanoutResult {
238 kind: FanoutResultKind::Consensus,
239 consensus_nodes,
240 value_nodes,
241 },
242 Some(false) => FanoutResult {
243 kind: FanoutResultKind::Incomplete,
244 consensus_nodes,
245 value_nodes,
246 },
247 None => FanoutResult {
248 kind: FanoutResultKind::Exhausted,
249 consensus_nodes,
250 value_nodes,
251 },
252 }
253 });
254
255 let done = (self.check_done)(&fanout_result);
256 ctx.result = fanout_result;
257 ctx.done = done;
258 done
259 }
260
261 #[instrument(level = "trace", target = "fanout", skip_all)]
262 async fn fanout_processor<'b>(
263 &self,
264 context: &Mutex<FanoutContext<'b>>,
265 ) -> Result<bool, RPCError> {
266 let (work_sender, work_receiver) = flume::bounded(1);
268
269 loop {
271 {
273 let mut context_locked = context.lock();
274 context_locked
275 .fanout_queue
276 .request_work(work_sender.clone());
277 }
278
279 let Ok(next_node) = work_receiver.recv_async().await else {
281 break Ok(false);
283 };
284
285 match (self.call_routine)(next_node.clone()).await {
287 Ok(output) => {
288 let filtered_v: Vec<Arc<PeerInfo>> = output
290 .peer_info_list
291 .into_iter()
292 .filter(|pi| {
293 let node_ids = pi.node_ids().to_vec();
294 if !(self.node_info_filter)(
295 &node_ids,
296 pi.signed_node_info().node_info(),
297 ) {
298 return false;
299 }
300 true
301 })
302 .collect();
303
304 let new_nodes = self
307 .routing_table
308 .register_nodes_with_peer_info_list(filtered_v);
309
310 {
312 let mut context_locked = context.lock();
313 context_locked.fanout_queue.add(&new_nodes);
314
315 match output.disposition {
317 FanoutCallDisposition::Timeout => {
318 context_locked.fanout_queue.timeout(next_node);
319 }
320 FanoutCallDisposition::Rejected => {
321 context_locked.fanout_queue.rejected(next_node);
322 }
323 FanoutCallDisposition::Accepted => {
324 context_locked.fanout_queue.accepted(next_node);
325 }
326 FanoutCallDisposition::AcceptedNewerRestart => {
327 context_locked.fanout_queue.all_accepted_to_queued();
328 context_locked.fanout_queue.accepted(next_node);
329 }
330 FanoutCallDisposition::AcceptedNewer => {
331 context_locked.fanout_queue.all_accepted_to_stale();
332 context_locked.fanout_queue.accepted(next_node);
333 }
334 FanoutCallDisposition::Invalid => {
335 }
337 FanoutCallDisposition::Stale => {
338 context_locked.fanout_queue.stale(next_node);
339 }
340 }
341
342 if self.evaluate_done(&mut context_locked) {
344 break Ok(true);
345 }
346
347 context_locked.fanout_queue.send_more_work();
350 }
351 }
352 Err(e) => {
353 break Err(e);
354 }
355 };
356 }
357 }
358
359 #[instrument(level = "trace", target = "fanout", skip_all)]
360 fn init_closest_nodes(&self, context: &mut FanoutContext) -> Result<(), RPCError> {
361 let closest_nodes = {
363 let node_info_filter = self.node_info_filter.clone();
364 let filter = Box::new(
365 move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
366 if opt_entry.is_none() {
368 return false;
369 }
370 let entry = opt_entry.unwrap();
371
372 entry.with(rti, |_rti, e| {
374 let Some(signed_node_info) =
375 e.signed_node_info(RoutingDomain::PublicInternet)
376 else {
377 return false;
378 };
379 if !signed_node_info.has_any_signature() {
381 return false;
382 }
383
384 let node_ids = e.node_ids().to_vec();
386 if !(node_info_filter)(&node_ids, signed_node_info.node_info()) {
387 return false;
388 }
389
390 true
391 })
392 },
393 ) as RoutingTableEntryFilter;
394 let filters = VecDeque::from([filter]);
395
396 let transform = |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
397 NodeRef::new(self.routing_table.registry(), v.unwrap().clone())
398 };
399
400 self.routing_table
401 .find_preferred_closest_nodes(
402 self.node_count,
403 self.hash_coordinate,
404 filters,
405 transform,
406 )
407 .map_err(RPCError::invalid_format)?
408 };
409 context.fanout_queue.add(&closest_nodes);
410
411 Ok(())
412 }
413
414 #[instrument(level = "trace", target = "fanout", skip_all)]
415 pub async fn run(&self, init_fanout_queue: Vec<NodeRef>) -> Result<FanoutResult, RPCError> {
416 let crypto = self.routing_table.crypto();
418 let Some(vcrypto) = crypto.get(self.hash_coordinate.kind) else {
419 return Err(RPCError::internal(
420 "should not try this on crypto we don't support",
421 ));
422 };
423 let node_sort = Box::new(
424 |a_key: &CryptoTyped<NodeId>, b_key: &CryptoTyped<NodeId>| -> core::cmp::Ordering {
425 let da =
426 vcrypto.distance(&HashDigest::from(a_key.value), &self.hash_coordinate.value);
427 let db =
428 vcrypto.distance(&HashDigest::from(b_key.value), &self.hash_coordinate.value);
429 da.cmp(&db)
430 },
431 );
432 let context = Arc::new(Mutex::new(FanoutContext {
433 fanout_queue: FanoutQueue::new(
434 self.routing_table.registry(),
435 self.hash_coordinate.kind,
436 node_sort,
437 self.consensus_count,
438 ),
439 result: FanoutResult {
440 kind: FanoutResultKind::Incomplete,
441 consensus_nodes: vec![],
442 value_nodes: vec![],
443 },
444 done: false,
445 }));
446
447 let timeout_ms = us_to_ms(self.timeout_us.as_u64()).map_err(RPCError::internal)?;
449
450 {
452 let context_locked = &mut *context.lock();
453 self.init_closest_nodes(context_locked)?;
454
455 context_locked.fanout_queue.add(&init_fanout_queue);
457
458 if self.evaluate_done(context_locked) {
460 return Ok(core::mem::take(&mut context_locked.result));
461 }
462 }
463
464 let mut unord = FuturesUnordered::new();
466 {
467 for _ in 0..self.fanout_tasks {
469 let h = self.fanout_processor(&context);
470 unord.push(h);
471 }
472 }
473 match timeout(
475 timeout_ms,
476 async {
477 loop {
478 if let Some(res) = unord.next().in_current_span().await {
479 match res {
480 Ok(is_done) => {
481 if is_done {
482 break Ok(());
483 }
484 }
485 Err(e) => {
486 break Err(e);
487 }
488 }
489 } else {
490 break Ok(());
491 }
492 }
493 }
494 .in_current_span(),
495 )
496 .await
497 {
498 Ok(Ok(())) => {
499 let context_locked = &mut *context.lock();
502 veilid_log!(self debug "Finished FanoutQueue: {}", context_locked.fanout_queue);
504 return Ok(core::mem::take(&mut context_locked.result));
505 }
506 Ok(Err(e)) => {
507 return Err(e);
509 }
510 Err(_) => {
511 let context_locked = &mut *context.lock();
513 context_locked.fanout_queue.all_unfinished_to_timeout();
514
515 veilid_log!(self debug "Timeout FanoutQueue: {}", context_locked.fanout_queue);
517
518 if self.evaluate_done(context_locked) {
520 return Ok(core::mem::take(&mut context_locked.result));
522 }
523
524 let mut result = core::mem::take(&mut context_locked.result);
526 result.kind = FanoutResultKind::Timeout;
527 return Ok(result);
528 }
529 }
530 }
531}