instance_chart/chart/
notify.rs

1use super::{Entry, Id};
2
3use std::fmt::Debug;
4use std::net::IpAddr;
5use std::net::SocketAddr;
6use tokio::sync::broadcast;
7use tokio::sync::broadcast::error::RecvError;
8
9/// Wait for notifications of new discoveries, buffering up to 256 discoveries, created using
10/// [`Chart::notify()`](crate::Chart::notify).
11///
12/// # Examples
13/// ```
14/// # use std::error::Error;
15/// # use instance_chart::{discovery, ChartBuilder};
16/// #
17/// # #[tokio::main]
18/// # async fn main() {
19/// #    let chart = ChartBuilder::new()
20/// #       .with_id(1)
21/// #       .with_service_port(8042)
22/// #       .finish().unwrap();
23/// #  let full_size = 1;
24///
25///    let mut node_discoverd = chart.notify();
26///    let maintain = discovery::maintain(chart.clone());
27///    let _ = tokio::spawn(maintain); // maintain task will run forever
28///    
29///    while chart.size() < full_size as usize {
30///        let new = node_discoverd.recv().await.unwrap();
31///        println!("discoverd new node: {:?}", new);
32///    }
33/// }
34/// ```
35///
36#[derive(Debug)]
37pub struct Notify<const N: usize, T: Debug + Clone>(
38    pub(super) broadcast::Receiver<(Id, Entry<[T; N]>)>,
39);
40
41impl<T: Debug + Clone> Notify<1, T> {
42    /// await the next discovered instance. Returns the id and custom messag for new node
43    /// when it is discovered.
44    /// # Note
45    /// Can only be called on a
46    /// Notify for a chart created with [`ChartBuilder::custom_msg()`](crate::ChartBuilder::custom_msg)
47    /// # Errors
48    /// If more the 256 discoveries have been made since this was called this returns
49    /// `RecvError::Lagged`
50    pub async fn recv_one(&mut self) -> Result<(Id, IpAddr, T), RecvError> {
51        let (id, ip, [msg]) = self.recv().await?;
52        Ok((id, ip, msg))
53    }
54}
55
56impl<const N: usize, T: Debug + Clone> Notify<N, T> {
57    /// await the next discovered instance. Returns the id and custom messages for new node
58    /// when it is discovered.
59    /// # Note
60    /// Can only be called on a
61    /// Notify for a chart created with [`ChartBuilder::custom_msg()`](crate::ChartBuilder::custom_msg)
62    /// # Errors
63    /// If more the 256 discoveries have been made since this was called this returns
64    /// `RecvError::Lagged`
65    pub async fn recv(&mut self) -> Result<(Id, IpAddr, [T; N]), RecvError> {
66        let (id, entry) = self.0.recv().await?;
67        Ok((id, entry.ip, entry.msg))
68    }
69
70    /// await the next discovered instance. Returns the id and nth custom messages for new node
71    /// when it is discovered.
72    /// # Note
73    /// Can only be called on a
74    /// Notify for a chart created with [`ChartBuilder::custom_msg()`](crate::ChartBuilder::custom_msg)
75    /// # Errors
76    /// If more the 256 discoveries have been made since this was called this returns
77    /// `RecvError::Lagged`
78    #[allow(clippy::missing_panics_doc)] // the array msg is the same size >= IDX
79    pub async fn recv_nth<const IDX: usize>(&mut self) -> Result<(Id, IpAddr, T), RecvError> {
80        let (id, ip, msg) = self.recv().await?;
81        let msg = msg.into_iter().nth(IDX).unwrap(); // cant move out of array
82        Ok((id, ip, msg))
83    }
84}
85
86impl Notify<1, u16> {
87    /// await the next discovered instance. Returns the id and service adresses for new node
88    /// when it is discovered.
89    /// # Note
90    /// Can only be called on a
91    /// Notify for a chart created with [`ChartBuilder::finish()`](crate::ChartBuilder::finish)
92    /// that had as single service port set.
93    /// # Errors
94    /// If more the 256 discoveries have been made since this was called this returns
95    /// `RecvError::Lagged`
96    pub async fn recv_addr(&mut self) -> Result<(Id, SocketAddr), RecvError> {
97        let (id, ip, [port]) = self.recv().await?;
98        Ok((id, SocketAddr::new(ip, port)))
99    }
100}
101
102impl<const N: usize> Notify<N, u16> {
103    /// await the next discovered instance. Buffers up to 256 discoveries. Returns the id
104    /// and service adresseses for new node when it is discovered.
105    /// # Note
106    /// Can only be called on a
107    /// Notify for a chart created with [`ChartBuilder::finish()`](crate::ChartBuilder::finish)
108    /// that had multiple service ports set.
109    /// # Errors
110    /// If more the 256 discoveries have been made since this was called this returns
111    /// `RecvError::Lagged`
112    pub async fn recv_addresses(&mut self) -> Result<(Id, [SocketAddr; N]), RecvError> {
113        let (id, ip, ports) = self.recv().await?;
114        Ok((id, ports.map(|p| SocketAddr::new(ip, p))))
115    }
116
117    /// await the next discovered instance. Buffers up to 256 discoveries. Returns the id
118    /// and nth service adresses for new node when it is discovered.
119    /// # Note
120    /// Can only be called on a
121    /// Notify for a chart created with [`ChartBuilder::finish()`](crate::ChartBuilder::finish)
122    /// that had multiple service ports set.
123    /// # Errors
124    /// If more the 256 discoveries have been made since this was called this returns
125    /// `RecvError::Lagged`
126    pub async fn recv_nth_addr<const IDX: usize>(&mut self) -> Result<(Id, SocketAddr), RecvError> {
127        let (id, ip, ports) = self.recv().await?;
128        Ok((id, SocketAddr::new(ip, ports[IDX])))
129    }
130}