veilid_core/rpc_processor/fanout/
fanout_call.rs

1use super::*;
2
3impl_veilid_log_facility!("fanout");
4
5#[derive(Debug)]
6struct FanoutContext<'a> {
7    fanout_queue: FanoutQueue<'a>,
8    result: FanoutResult,
9    done: bool,
10}
11
12#[derive(Debug, Copy, Clone, Default)]
13pub enum FanoutResultKind {
14    #[default]
15    Incomplete,
16    Timeout,
17    Consensus,
18    Exhausted,
19}
20impl FanoutResultKind {
21    pub fn is_incomplete(&self) -> bool {
22        matches!(self, Self::Incomplete)
23    }
24}
25
26#[derive(Clone, Debug, Default)]
27pub struct FanoutResult {
28    /// How the fanout completed
29    pub kind: FanoutResultKind,
30    /// The set of nodes that counted toward consensus
31    /// (for example, had the most recent value for this subkey)
32    pub consensus_nodes: Vec<NodeRef>,
33    /// Which nodes accepted the request
34    pub value_nodes: Vec<NodeRef>,
35}
36
37impl fmt::Display for FanoutResult {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        let kc = match self.kind {
40            FanoutResultKind::Incomplete => "I",
41            FanoutResultKind::Timeout => "T",
42            FanoutResultKind::Consensus => "C",
43            FanoutResultKind::Exhausted => "E",
44        };
45        if f.alternate() {
46            write!(
47                f,
48                "{}:{}[{}]",
49                kc,
50                self.consensus_nodes.len(),
51                self.consensus_nodes
52                    .iter()
53                    .map(|x| x.to_string())
54                    .collect::<Vec<_>>()
55                    .join(","),
56            )
57        } else {
58            write!(f, "{}:{}", kc, self.consensus_nodes.len())
59        }
60    }
61}
62
63pub fn debug_fanout_results(results: &[FanoutResult]) -> String {
64    let mut col = 0;
65    let mut out = String::new();
66    let mut left = results.len();
67    for r in results {
68        if col == 0 {
69            out += "    ";
70        }
71        let sr = format!("{}", r);
72        out += &sr;
73        out += ",";
74        col += 1;
75        left -= 1;
76        if col == 32 && left != 0 {
77            col = 0;
78            out += "\n"
79        }
80    }
81    out
82}
83
84#[derive(Debug)]
85pub struct FanoutCallOutput {
86    pub peer_info_list: Vec<Arc<PeerInfo>>,
87    pub disposition: FanoutCallDisposition,
88}
89
90/// The return type of the fanout call routine
91#[derive(Debug)]
92pub enum FanoutCallDisposition {
93    /// The call routine timed out
94    Timeout,
95    /// The call routine returned an invalid result
96    Invalid,
97    /// The called node rejected the rpc request but may have returned more nodes
98    Rejected,
99    /// The called node accepted the rpc request and may have returned more nodes,
100    /// but we don't count the result toward our consensus
101    Stale,
102    /// The called node accepted the rpc request and may have returned more nodes,
103    /// counting the result toward our consensus
104    Accepted,
105    /// The called node accepted the rpc request and may have returned more nodes,
106    /// returning a newer value that indicates we should restart our consensus
107    AcceptedNewerRestart,
108    /// The called node accepted the rpc request and may have returned more nodes,
109    /// returning a newer value that indicates our current consensus is stale and should be ignored,
110    /// and counting the result toward a new consensus
111    AcceptedNewer,
112}
113
114pub type FanoutCallResult = Result<FanoutCallOutput, RPCError>;
115pub type FanoutNodeInfoFilter = Arc<dyn (Fn(&[TypedNodeId], &NodeInfo) -> bool) + Send + Sync>;
116pub type FanoutCheckDone = Arc<dyn (Fn(&FanoutResult) -> bool) + Send + Sync>;
117pub type FanoutCallRoutine =
118    Arc<dyn (Fn(NodeRef) -> PinBoxFutureStatic<FanoutCallResult>) + Send + Sync>;
119
120pub fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter {
121    Arc::new(|_, _| true)
122}
123
124pub fn capability_fanout_node_info_filter(caps: Vec<VeilidCapability>) -> FanoutNodeInfoFilter {
125    Arc::new(move |_, ni| ni.has_all_capabilities(&caps))
126}
127
128/// Contains the logic for generically searching the Veilid routing table for a set of nodes and applying an
129/// RPC operation that eventually converges on satisfactory result, or times out and returns some
130/// unsatisfactory but acceptable result. Or something.
131///
132/// The algorithm starts by creating a 'closest_nodes' working set of the nodes closest to some node id currently in our routing table
133/// If has pluggable callbacks:
134///  * 'check_done' - for checking for a termination condition
135///  * 'call_routine' - routine to call for each node that performs an operation and may add more nodes to our closest_nodes set
136///
137/// The algorithm is parameterized by:
138///  * 'node_count' - the number of nodes to keep in the closest_nodes set
139///  * 'fanout' - the number of concurrent calls being processed at the same time
140///  * 'consensus_count' - the number of nodes in the processed queue that need to be in the
141///    'Accepted' state before we terminate the fanout early.
142///
143/// The algorithm returns early if 'check_done' returns some value, or if an error is found during the process.
144/// If the algorithm times out, a Timeout result is returned, however operations will still have been performed and a
145/// timeout is not necessarily indicative of an algorithmic 'failure', just that no definitive stopping condition was found
146/// in the given time
147pub(crate) struct FanoutCall<'a> {
148    routing_table: &'a RoutingTable,
149    hash_coordinate: TypedHashDigest,
150    node_count: usize,
151    fanout_tasks: usize,
152    consensus_count: usize,
153    timeout_us: TimestampDuration,
154    node_info_filter: FanoutNodeInfoFilter,
155    call_routine: FanoutCallRoutine,
156    check_done: FanoutCheckDone,
157}
158
159impl VeilidComponentRegistryAccessor for FanoutCall<'_> {
160    fn registry(&self) -> VeilidComponentRegistry {
161        self.routing_table.registry()
162    }
163}
164
165impl<'a> FanoutCall<'a> {
166    #[allow(clippy::too_many_arguments)]
167    pub fn new(
168        routing_table: &'a RoutingTable,
169        hash_coordinate: TypedHashDigest,
170        node_count: usize,
171        fanout_tasks: usize,
172        consensus_count: usize,
173        timeout_us: TimestampDuration,
174        node_info_filter: FanoutNodeInfoFilter,
175        call_routine: FanoutCallRoutine,
176        check_done: FanoutCheckDone,
177    ) -> Self {
178        Self {
179            routing_table,
180            hash_coordinate,
181            node_count,
182            fanout_tasks,
183            consensus_count,
184            timeout_us,
185            node_info_filter,
186            call_routine,
187            check_done,
188        }
189    }
190
191    #[instrument(level = "trace", target = "fanout", skip_all)]
192    fn evaluate_done(&self, ctx: &mut FanoutContext) -> bool {
193        // If we already finished, just return
194        if ctx.done {
195            return true;
196        }
197
198        // Calculate fanout result so far
199        let fanout_result = ctx.fanout_queue.with_nodes(|nodes, sorted_nodes| {
200            // Count up nodes we have seen in order and see if our closest nodes have a consensus
201            let mut consensus: Option<bool> = None;
202            let mut consensus_nodes: Vec<NodeRef> = vec![];
203            let mut value_nodes: Vec<NodeRef> = vec![];
204            for sn in sorted_nodes {
205                let node = nodes.get(sn).unwrap();
206                match node.status {
207                    FanoutNodeStatus::Queued | FanoutNodeStatus::InProgress => {
208                        // Still have a closer node to do before reaching consensus,
209                        // or are doing it still, then wait until those are done
210                        if consensus.is_none() {
211                            consensus = Some(false);
212                        }
213                    }
214                    FanoutNodeStatus::Timeout
215                    | FanoutNodeStatus::Rejected
216                    | FanoutNodeStatus::Disqualified => {
217                        // Node does not count toward consensus or value node list
218                    }
219                    FanoutNodeStatus::Stale => {
220                        // Node does not count toward consensus but does count toward value node list
221                        value_nodes.push(node.node_ref.clone());
222                    }
223                    FanoutNodeStatus::Accepted => {
224                        // Node counts toward consensus and value node list
225                        value_nodes.push(node.node_ref.clone());
226
227                        consensus_nodes.push(node.node_ref.clone());
228                        if consensus.is_none() && consensus_nodes.len() >= self.consensus_count {
229                            consensus = Some(true);
230                        }
231                    }
232                }
233            }
234
235            // If we have reached sufficient consensus, return done
236            match consensus {
237                Some(true) => FanoutResult {
238                    kind: FanoutResultKind::Consensus,
239                    consensus_nodes,
240                    value_nodes,
241                },
242                Some(false) => FanoutResult {
243                    kind: FanoutResultKind::Incomplete,
244                    consensus_nodes,
245                    value_nodes,
246                },
247                None => FanoutResult {
248                    kind: FanoutResultKind::Exhausted,
249                    consensus_nodes,
250                    value_nodes,
251                },
252            }
253        });
254
255        let done = (self.check_done)(&fanout_result);
256        ctx.result = fanout_result;
257        ctx.done = done;
258        done
259    }
260
261    #[instrument(level = "trace", target = "fanout", skip_all)]
262    async fn fanout_processor<'b>(
263        &self,
264        context: &Mutex<FanoutContext<'b>>,
265    ) -> Result<bool, RPCError> {
266        // Make a work request channel
267        let (work_sender, work_receiver) = flume::bounded(1);
268
269        // Loop until we have a result or are done
270        loop {
271            // Put in a work request
272            {
273                let mut context_locked = context.lock();
274                context_locked
275                    .fanout_queue
276                    .request_work(work_sender.clone());
277            }
278
279            // Wait around for some work to do
280            let Ok(next_node) = work_receiver.recv_async().await else {
281                // If we don't have a node to process, stop fanning out
282                break Ok(false);
283            };
284
285            // Do the call for this node
286            match (self.call_routine)(next_node.clone()).await {
287                Ok(output) => {
288                    // Filter returned nodes
289                    let filtered_v: Vec<Arc<PeerInfo>> = output
290                        .peer_info_list
291                        .into_iter()
292                        .filter(|pi| {
293                            let node_ids = pi.node_ids().to_vec();
294                            if !(self.node_info_filter)(
295                                &node_ids,
296                                pi.signed_node_info().node_info(),
297                            ) {
298                                return false;
299                            }
300                            true
301                        })
302                        .collect();
303
304                    // Call succeeded
305                    // Register the returned nodes and add them to the fanout queue in sorted order
306                    let new_nodes = self
307                        .routing_table
308                        .register_nodes_with_peer_info_list(filtered_v);
309
310                    // Update queue
311                    {
312                        let mut context_locked = context.lock();
313                        context_locked.fanout_queue.add(&new_nodes);
314
315                        // Process disposition of the output of the fanout call routine
316                        match output.disposition {
317                            FanoutCallDisposition::Timeout => {
318                                context_locked.fanout_queue.timeout(next_node);
319                            }
320                            FanoutCallDisposition::Rejected => {
321                                context_locked.fanout_queue.rejected(next_node);
322                            }
323                            FanoutCallDisposition::Accepted => {
324                                context_locked.fanout_queue.accepted(next_node);
325                            }
326                            FanoutCallDisposition::AcceptedNewerRestart => {
327                                context_locked.fanout_queue.all_accepted_to_queued();
328                                context_locked.fanout_queue.accepted(next_node);
329                            }
330                            FanoutCallDisposition::AcceptedNewer => {
331                                context_locked.fanout_queue.all_accepted_to_stale();
332                                context_locked.fanout_queue.accepted(next_node);
333                            }
334                            FanoutCallDisposition::Invalid => {
335                                // Do nothing with invalid fanout calls
336                            }
337                            FanoutCallDisposition::Stale => {
338                                context_locked.fanout_queue.stale(next_node);
339                            }
340                        }
341
342                        // See if we're done before going back for more processing
343                        if self.evaluate_done(&mut context_locked) {
344                            break Ok(true);
345                        }
346
347                        // We modified the queue so we may have more work to do now,
348                        // tell the queue it should send more work to the workers
349                        context_locked.fanout_queue.send_more_work();
350                    }
351                }
352                Err(e) => {
353                    break Err(e);
354                }
355            };
356        }
357    }
358
359    #[instrument(level = "trace", target = "fanout", skip_all)]
360    fn init_closest_nodes(&self, context: &mut FanoutContext) -> Result<(), RPCError> {
361        // Get the 'node_count' closest nodes to the key out of our routing table
362        let closest_nodes = {
363            let node_info_filter = self.node_info_filter.clone();
364            let filter = Box::new(
365                move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
366                    // Exclude our own node
367                    if opt_entry.is_none() {
368                        return false;
369                    }
370                    let entry = opt_entry.unwrap();
371
372                    // Filter entries
373                    entry.with(rti, |_rti, e| {
374                        let Some(signed_node_info) =
375                            e.signed_node_info(RoutingDomain::PublicInternet)
376                        else {
377                            return false;
378                        };
379                        // Ensure only things that are valid/signed in the PublicInternet domain are returned
380                        if !signed_node_info.has_any_signature() {
381                            return false;
382                        }
383
384                        // Check our node info filter
385                        let node_ids = e.node_ids().to_vec();
386                        if !(node_info_filter)(&node_ids, signed_node_info.node_info()) {
387                            return false;
388                        }
389
390                        true
391                    })
392                },
393            ) as RoutingTableEntryFilter;
394            let filters = VecDeque::from([filter]);
395
396            let transform = |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
397                NodeRef::new(self.routing_table.registry(), v.unwrap().clone())
398            };
399
400            self.routing_table
401                .find_preferred_closest_nodes(
402                    self.node_count,
403                    self.hash_coordinate,
404                    filters,
405                    transform,
406                )
407                .map_err(RPCError::invalid_format)?
408        };
409        context.fanout_queue.add(&closest_nodes);
410
411        Ok(())
412    }
413
414    #[instrument(level = "trace", target = "fanout", skip_all)]
415    pub async fn run(&self, init_fanout_queue: Vec<NodeRef>) -> Result<FanoutResult, RPCError> {
416        // Create context for this run
417        let crypto = self.routing_table.crypto();
418        let Some(vcrypto) = crypto.get(self.hash_coordinate.kind) else {
419            return Err(RPCError::internal(
420                "should not try this on crypto we don't support",
421            ));
422        };
423        let node_sort = Box::new(
424            |a_key: &CryptoTyped<NodeId>, b_key: &CryptoTyped<NodeId>| -> core::cmp::Ordering {
425                let da =
426                    vcrypto.distance(&HashDigest::from(a_key.value), &self.hash_coordinate.value);
427                let db =
428                    vcrypto.distance(&HashDigest::from(b_key.value), &self.hash_coordinate.value);
429                da.cmp(&db)
430            },
431        );
432        let context = Arc::new(Mutex::new(FanoutContext {
433            fanout_queue: FanoutQueue::new(
434                self.routing_table.registry(),
435                self.hash_coordinate.kind,
436                node_sort,
437                self.consensus_count,
438            ),
439            result: FanoutResult {
440                kind: FanoutResultKind::Incomplete,
441                consensus_nodes: vec![],
442                value_nodes: vec![],
443            },
444            done: false,
445        }));
446
447        // Get timeout in milliseconds
448        let timeout_ms = us_to_ms(self.timeout_us.as_u64()).map_err(RPCError::internal)?;
449
450        // Initialize closest nodes list
451        {
452            let context_locked = &mut *context.lock();
453            self.init_closest_nodes(context_locked)?;
454
455            // Ensure we include the most recent nodes
456            context_locked.fanout_queue.add(&init_fanout_queue);
457
458            // Do a quick check to see if we're already done
459            if self.evaluate_done(context_locked) {
460                return Ok(core::mem::take(&mut context_locked.result));
461            }
462        }
463
464        // If not, do the fanout
465        let mut unord = FuturesUnordered::new();
466        {
467            // Spin up 'fanout' tasks to process the fanout
468            for _ in 0..self.fanout_tasks {
469                let h = self.fanout_processor(&context);
470                unord.push(h);
471            }
472        }
473        // Wait for them to complete
474        match timeout(
475            timeout_ms,
476            async {
477                loop {
478                    if let Some(res) = unord.next().in_current_span().await {
479                        match res {
480                            Ok(is_done) => {
481                                if is_done {
482                                    break Ok(());
483                                }
484                            }
485                            Err(e) => {
486                                break Err(e);
487                            }
488                        }
489                    } else {
490                        break Ok(());
491                    }
492                }
493            }
494            .in_current_span(),
495        )
496        .await
497        {
498            Ok(Ok(())) => {
499                // Finished, either by exhaustion or consensus,
500                // time to return whatever value we came up with
501                let context_locked = &mut *context.lock();
502                // Print final queue
503                veilid_log!(self debug "Finished FanoutQueue: {}", context_locked.fanout_queue);
504                return Ok(core::mem::take(&mut context_locked.result));
505            }
506            Ok(Err(e)) => {
507                // Fanout died with an error
508                return Err(e);
509            }
510            Err(_) => {
511                // Timeout, do one last evaluate with remaining nodes in timeout state
512                let context_locked = &mut *context.lock();
513                context_locked.fanout_queue.all_unfinished_to_timeout();
514
515                // Print final queue
516                veilid_log!(self debug "Timeout FanoutQueue: {}", context_locked.fanout_queue);
517
518                // Final evaluate
519                if self.evaluate_done(context_locked) {
520                    // Last-chance value returned at timeout
521                    return Ok(core::mem::take(&mut context_locked.result));
522                }
523
524                // We definitely weren't done, so this is just a plain timeout
525                let mut result = core::mem::take(&mut context_locked.result);
526                result.kind = FanoutResultKind::Timeout;
527                return Ok(result);
528            }
529        }
530    }
531}