instance_chart/
discovery.rs

1use std::fmt::Debug;
2use serde::de::DeserializeOwned;
3use serde::Serialize;
4use tracing::info;
5
6use crate::{Chart, util};
7use crate::chart::{handle_incoming, broadcast_periodically};
8
9trait AcceptErr<T, E> {
10    fn accept_err_with(self, f: impl FnOnce(&E) -> bool) -> Result<Option<T>, E>;
11}
12
13impl<T, E> AcceptErr<T, E> for Result<T, E> {
14    fn accept_err_with(self, f: impl FnOnce(&E) -> bool) -> Result<Option<T>, E> {
15        match self {
16            Ok(v) => Ok(Some(v)),
17            Err(e) if f(&e) => Ok(None),
18            Err(e) => Err(e),
19        }
20    }
21}
22
23/// This listens only, mapping the cluster without announcing itself.
24/// Usefull for clients on the same subnet trying to find nodes to contact.
25/// You can drop the future but then the chart will no longer be updated.
26///
27/// # Note 
28/// Take care not to call `maintain` anywhere
29#[tracing::instrument]
30pub async fn sniff<'de, const N: usize, T>(chart: Chart<N, T>) 
31where
32    T: 'static + Debug + Clone + Serialize + DeserializeOwned + Sync + Send
33{
34    use tokio::task::JoinError;
35    let f = util::spawn(handle_incoming(chart.clone()));
36    f.await.accept_err_with(JoinError::is_cancelled).unwrap();
37}
38
39/// This drives the chart discovery. You can drop the future but then the chart
40/// will no longer be updated.
41#[tracing::instrument]
42pub async fn maintain<'de, const N: usize, T>(chart: Chart<N, T>) 
43where
44    T: 'static + Debug + Clone + Serialize + DeserializeOwned + Sync + Send
45{
46    use tokio::task::JoinError;
47    let f1 = util::spawn(handle_incoming(chart.clone()));
48    let f2 = util::spawn(broadcast_periodically(chart));
49    f1.await.accept_err_with(JoinError::is_cancelled).unwrap();
50    f2.await.accept_err_with(JoinError::is_cancelled).unwrap();
51}
52
53/// Block until `full_size` nodes have been found.
54#[tracing::instrument(skip(chart))]
55pub async fn found_everyone<const N:usize, T>(chart: &Chart<N, T>, full_size: u16) 
56where
57    T: 'static + Debug + Clone + Serialize + DeserializeOwned
58{
59    let mut node_discoverd = chart.notify();
60    while chart.size() < full_size as usize {
61        node_discoverd.recv().await.unwrap();
62    }
63    info!(
64        "found every member of the cluster, ({} nodes)",
65        chart.size()
66    );
67}
68
69/// Block until a majority of nodes have been found. Usefull when implementing vote based
70/// consensus such as Raft.
71#[tracing::instrument(skip(chart))]
72pub async fn found_majority<const N:usize, T>(chart: &Chart<N,T>, full_size: u16) 
73where
74    T: 'static + Debug + Clone + Serialize + DeserializeOwned
75{
76    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
77    let cluster_majority = (f32::from(full_size) * 0.5).ceil() as usize;
78
79    let mut node_discoverd = chart.notify();
80    while chart.size() < cluster_majority {
81        node_discoverd.recv().await.unwrap();
82    }
83    info!("found majority of cluster, ({} nodes)", chart.size());
84}