instance_chart/
discovery.rs1use std::fmt::Debug;
2use serde::de::DeserializeOwned;
3use serde::Serialize;
4use tracing::info;
5
6use crate::{Chart, util};
7use crate::chart::{handle_incoming, broadcast_periodically};
8
9trait AcceptErr<T, E> {
10 fn accept_err_with(self, f: impl FnOnce(&E) -> bool) -> Result<Option<T>, E>;
11}
12
13impl<T, E> AcceptErr<T, E> for Result<T, E> {
14 fn accept_err_with(self, f: impl FnOnce(&E) -> bool) -> Result<Option<T>, E> {
15 match self {
16 Ok(v) => Ok(Some(v)),
17 Err(e) if f(&e) => Ok(None),
18 Err(e) => Err(e),
19 }
20 }
21}
22
23#[tracing::instrument]
30pub async fn sniff<'de, const N: usize, T>(chart: Chart<N, T>)
31where
32 T: 'static + Debug + Clone + Serialize + DeserializeOwned + Sync + Send
33{
34 use tokio::task::JoinError;
35 let f = util::spawn(handle_incoming(chart.clone()));
36 f.await.accept_err_with(JoinError::is_cancelled).unwrap();
37}
38
39#[tracing::instrument]
42pub async fn maintain<'de, const N: usize, T>(chart: Chart<N, T>)
43where
44 T: 'static + Debug + Clone + Serialize + DeserializeOwned + Sync + Send
45{
46 use tokio::task::JoinError;
47 let f1 = util::spawn(handle_incoming(chart.clone()));
48 let f2 = util::spawn(broadcast_periodically(chart));
49 f1.await.accept_err_with(JoinError::is_cancelled).unwrap();
50 f2.await.accept_err_with(JoinError::is_cancelled).unwrap();
51}
52
53#[tracing::instrument(skip(chart))]
55pub async fn found_everyone<const N:usize, T>(chart: &Chart<N, T>, full_size: u16)
56where
57 T: 'static + Debug + Clone + Serialize + DeserializeOwned
58{
59 let mut node_discoverd = chart.notify();
60 while chart.size() < full_size as usize {
61 node_discoverd.recv().await.unwrap();
62 }
63 info!(
64 "found every member of the cluster, ({} nodes)",
65 chart.size()
66 );
67}
68
69#[tracing::instrument(skip(chart))]
72pub async fn found_majority<const N:usize, T>(chart: &Chart<N,T>, full_size: u16)
73where
74 T: 'static + Debug + Clone + Serialize + DeserializeOwned
75{
76 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
77 let cluster_majority = (f32::from(full_size) * 0.5).ceil() as usize;
78
79 let mut node_discoverd = chart.notify();
80 while chart.size() < cluster_majority {
81 node_discoverd.recv().await.unwrap();
82 }
83 info!("found majority of cluster, ({} nodes)", chart.size());
84}