instance_chart/chart/notify.rs
1use super::{Entry, Id};
2
3use std::fmt::Debug;
4use std::net::IpAddr;
5use std::net::SocketAddr;
6use tokio::sync::broadcast;
7use tokio::sync::broadcast::error::RecvError;
8
9/// Wait for notifications of new discoveries, buffering up to 256 discoveries, created using
10/// [`Chart::notify()`](crate::Chart::notify).
11///
12/// # Examples
13/// ```
14/// # use std::error::Error;
15/// # use instance_chart::{discovery, ChartBuilder};
16/// #
17/// # #[tokio::main]
18/// # async fn main() {
19/// # let chart = ChartBuilder::new()
20/// # .with_id(1)
21/// # .with_service_port(8042)
22/// # .finish().unwrap();
23/// # let full_size = 1;
24///
25/// let mut node_discoverd = chart.notify();
26/// let maintain = discovery::maintain(chart.clone());
27/// let _ = tokio::spawn(maintain); // maintain task will run forever
28///
29/// while chart.size() < full_size as usize {
30/// let new = node_discoverd.recv().await.unwrap();
31/// println!("discoverd new node: {:?}", new);
32/// }
33/// }
34/// ```
35///
36#[derive(Debug)]
37pub struct Notify<const N: usize, T: Debug + Clone>(
38 pub(super) broadcast::Receiver<(Id, Entry<[T; N]>)>,
39);
40
41impl<T: Debug + Clone> Notify<1, T> {
42 /// await the next discovered instance. Returns the id and custom messag for new node
43 /// when it is discovered.
44 /// # Note
45 /// Can only be called on a
46 /// Notify for a chart created with [`ChartBuilder::custom_msg()`](crate::ChartBuilder::custom_msg)
47 /// # Errors
48 /// If more the 256 discoveries have been made since this was called this returns
49 /// `RecvError::Lagged`
50 pub async fn recv_one(&mut self) -> Result<(Id, IpAddr, T), RecvError> {
51 let (id, ip, [msg]) = self.recv().await?;
52 Ok((id, ip, msg))
53 }
54}
55
56impl<const N: usize, T: Debug + Clone> Notify<N, T> {
57 /// await the next discovered instance. Returns the id and custom messages for new node
58 /// when it is discovered.
59 /// # Note
60 /// Can only be called on a
61 /// Notify for a chart created with [`ChartBuilder::custom_msg()`](crate::ChartBuilder::custom_msg)
62 /// # Errors
63 /// If more the 256 discoveries have been made since this was called this returns
64 /// `RecvError::Lagged`
65 pub async fn recv(&mut self) -> Result<(Id, IpAddr, [T; N]), RecvError> {
66 let (id, entry) = self.0.recv().await?;
67 Ok((id, entry.ip, entry.msg))
68 }
69
70 /// await the next discovered instance. Returns the id and nth custom messages for new node
71 /// when it is discovered.
72 /// # Note
73 /// Can only be called on a
74 /// Notify for a chart created with [`ChartBuilder::custom_msg()`](crate::ChartBuilder::custom_msg)
75 /// # Errors
76 /// If more the 256 discoveries have been made since this was called this returns
77 /// `RecvError::Lagged`
78 #[allow(clippy::missing_panics_doc)] // the array msg is the same size >= IDX
79 pub async fn recv_nth<const IDX: usize>(&mut self) -> Result<(Id, IpAddr, T), RecvError> {
80 let (id, ip, msg) = self.recv().await?;
81 let msg = msg.into_iter().nth(IDX).unwrap(); // cant move out of array
82 Ok((id, ip, msg))
83 }
84}
85
86impl Notify<1, u16> {
87 /// await the next discovered instance. Returns the id and service adresses for new node
88 /// when it is discovered.
89 /// # Note
90 /// Can only be called on a
91 /// Notify for a chart created with [`ChartBuilder::finish()`](crate::ChartBuilder::finish)
92 /// that had as single service port set.
93 /// # Errors
94 /// If more the 256 discoveries have been made since this was called this returns
95 /// `RecvError::Lagged`
96 pub async fn recv_addr(&mut self) -> Result<(Id, SocketAddr), RecvError> {
97 let (id, ip, [port]) = self.recv().await?;
98 Ok((id, SocketAddr::new(ip, port)))
99 }
100}
101
102impl<const N: usize> Notify<N, u16> {
103 /// await the next discovered instance. Buffers up to 256 discoveries. Returns the id
104 /// and service adresseses for new node when it is discovered.
105 /// # Note
106 /// Can only be called on a
107 /// Notify for a chart created with [`ChartBuilder::finish()`](crate::ChartBuilder::finish)
108 /// that had multiple service ports set.
109 /// # Errors
110 /// If more the 256 discoveries have been made since this was called this returns
111 /// `RecvError::Lagged`
112 pub async fn recv_addresses(&mut self) -> Result<(Id, [SocketAddr; N]), RecvError> {
113 let (id, ip, ports) = self.recv().await?;
114 Ok((id, ports.map(|p| SocketAddr::new(ip, p))))
115 }
116
117 /// await the next discovered instance. Buffers up to 256 discoveries. Returns the id
118 /// and nth service adresses for new node when it is discovered.
119 /// # Note
120 /// Can only be called on a
121 /// Notify for a chart created with [`ChartBuilder::finish()`](crate::ChartBuilder::finish)
122 /// that had multiple service ports set.
123 /// # Errors
124 /// If more the 256 discoveries have been made since this was called this returns
125 /// `RecvError::Lagged`
126 pub async fn recv_nth_addr<const IDX: usize>(&mut self) -> Result<(Id, SocketAddr), RecvError> {
127 let (id, ip, ports) = self.recv().await?;
128 Ok((id, SocketAddr::new(ip, ports[IDX])))
129 }
130}