1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
use super::{Id, Entry};
use std::fmt::Debug;
use std::net::IpAddr;
use std::net::SocketAddr;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
/// Wait for notifications of new discoveries, buffering up to 16 discoveries, created using
/// [`Chart::notify()`](crate::Chart::notify).
#[derive(Debug)]
pub struct Notify<const N: usize, T: Debug + Clone>(pub(super) broadcast::Receiver<(Id, Entry<[T; N]>)>);
impl<T: Debug + Clone> Notify<1, T> {
/// await the next discovered instance. Returns the id and custom messag for new node
/// when it is discovered.
/// # Note
/// Can only be called on a
/// Notify for a chart created with [`ChartBuilder::custom_msg()`](crate::ChartBuilder::custom_msg)
/// # Errors
/// If more the 16 discoveries have been made since this was called this returns
/// `RecvError::Lagged`
pub async fn recv_one(&mut self) -> Result<(Id, IpAddr, T), RecvError> {
let (id, ip, [msg]) = self.recv().await?;
Ok((id, ip, msg))
}
}
impl<const N: usize, T: Debug + Clone> Notify<N, T> {
/// await the next discovered instance. Returns the id and custom messages for new node
/// when it is discovered.
/// # Note
/// Can only be called on a
/// Notify for a chart created with [`ChartBuilder::custom_msg()`](crate::ChartBuilder::custom_msg)
/// # Errors
/// If more the 16 discoveries have been made since this was called this returns
/// `RecvError::Lagged`
pub async fn recv(&mut self) -> Result<(Id, IpAddr, [T; N]), RecvError> {
let (id, entry) = self.0.recv().await?;
Ok((id, entry.ip, entry.msg))
}
/// await the next discovered instance. Returns the id and nth custom messages for new node
/// when it is discovered.
/// # Note
/// Can only be called on a
/// Notify for a chart created with [`ChartBuilder::custom_msg()`](crate::ChartBuilder::custom_msg)
/// # Errors
/// If more the 16 discoveries have been made since this was called this returns
/// `RecvError::Lagged`
pub async fn recv_nth<const IDX:usize>(&mut self) ->Result<(Id, IpAddr, T), RecvError> {
let (id, ip, msg) = self.recv().await?;
let msg = msg.into_iter().nth(IDX).unwrap(); // cant move out of array
Ok((id, ip, msg))
}
}
impl Notify<1, u16> {
/// await the next discovered instance. Returns the id and service adresses for new node
/// when it is discovered.
/// # Note
/// Can only be called on a
/// Notify for a chart created with [`ChartBuilder::finish()`](crate::ChartBuilder::finish)
/// that had as single service port set.
/// # Errors
/// If more the 16 discoveries have been made since this was called this returns
/// `RecvError::Lagged`
pub async fn recv_addr(&mut self) -> Result<(Id, SocketAddr), RecvError> {
let (id, ip, [port]) = self.recv().await?;
Ok((id, SocketAddr::new(ip, port)))
}
}
impl<const N: usize> Notify<N, u16> {
/// await the next discovered instance. Buffers up to 16 discoveries. Returns the id
/// and service adresseses for new node when it is discovered.
/// # Note
/// Can only be called on a
/// Notify for a chart created with [`ChartBuilder::finish()`](crate::ChartBuilder::finish)
/// that had multiple service ports set.
/// # Errors
/// If more the 16 discoveries have been made since this was called this returns
/// `RecvError::Lagged`
pub async fn recv_addresses(&mut self) -> Result<(Id, [SocketAddr; N]), RecvError> {
let (id, ip, ports) = self.recv().await?;
Ok((id, ports.map(|p| SocketAddr::new(ip, p))))
}
/// await the next discovered instance. Buffers up to 16 discoveries. Returns the id
/// and nth service adresses for new node when it is discovered.
/// # Note
/// Can only be called on a
/// Notify for a chart created with [`ChartBuilder::finish()`](crate::ChartBuilder::finish)
/// that had multiple service ports set.
/// # Errors
/// If more the 16 discoveries have been made since this was called this returns
/// `RecvError::Lagged`
pub async fn recv_nth_addr<const IDX:usize>(&mut self) ->Result<(Id, SocketAddr), RecvError> {
let (id, ip, ports) = self.recv().await?;
Ok((id, SocketAddr::new(ip, ports[IDX])))
}
}