instance-chart 0.2.1

Chart (discover) instances of your application on the same network and or machine
Documentation
use std::collections::HashMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use crate::Error;

use super::{interval, Chart, Id};
use serde::Serialize;
use tokio::net::UdpSocket;
use tokio::sync::broadcast;

#[derive(Debug, Default)]
pub struct Yes;
#[derive(Debug, Default)]
pub struct No;

pub trait ToAssign: core::fmt::Debug {}
pub trait Assigned: ToAssign {}
pub trait NotAssigned: ToAssign {}

impl ToAssign for Yes {}
impl ToAssign for No {}

impl Assigned for Yes {}
impl NotAssigned for No {}

const DEFAULT_HEADER: u64 = 6_687_164_552_036_412_667;
const DEFAULT_PORT: u16 = 8080;

pub type Port = u16;

/// Construct a Chart using a builder-like pattern. You must always set an `id`. You also
/// need to set `service port` or `service ports` and build with `finish()` or set a custom `msg`
/// and build using `custom_msg()`. See the examples of [ChartBuilder::custom_msg()]
/// or [ChartBuilder::finish()].
#[allow(clippy::pedantic)]
pub struct ChartBuilder<const N: usize, IdSet, PortSet, PortsSet>
where
    IdSet: ToAssign,
    PortSet: ToAssign,
    PortsSet: ToAssign,
{
    header: u64,
    service_id: Option<Id>,
    discovery_port: u16,
    service_port: Option<u16>,
    service_ports: [u16; N],
    rampdown: interval::Params,
    local: bool,
    id_set: PhantomData<IdSet>,
    port_set: PhantomData<PortSet>,
    ports_set: PhantomData<PortsSet>,
}

impl<const N: usize> ChartBuilder<N, No, No, No> {
    /// Create a new chart builder
    #[allow(clippy::new_without_default)] // builder struct not valid without other methods
    #[must_use]
    pub fn new() -> ChartBuilder<N, No, No, No> {
        ChartBuilder {
            header: DEFAULT_HEADER,
            service_id: None,
            discovery_port: DEFAULT_PORT,
            service_ports: [0u16; N],
            service_port: None,
            rampdown: interval::Params::default(),
            local: false,
            id_set: PhantomData {},
            port_set: PhantomData {},
            ports_set: PhantomData {},
        }
    }
}

impl<const N: usize, IdSet, PortSet, PortsSet> ChartBuilder<N, IdSet, PortSet, PortsSet>
where
    IdSet: ToAssign,
    PortSet: ToAssign,
    PortsSet: ToAssign,
{
    /// Set the `id` for this node, the `id` is the key for this node in the chart
    /// # Note
    /// Always needed, you can not build without an `id` set. The `id` must be __unique__
    #[must_use]
    pub fn with_id(self, id: Id) -> ChartBuilder<N, Yes, PortSet, PortsSet> {
        ChartBuilder {
            header: self.header,
            discovery_port: self.discovery_port,
            service_id: Some(id),
            service_port: self.service_port,
            service_ports: self.service_ports,
            rampdown: self.rampdown,
            local: self.local,
            id_set: PhantomData {},
            port_set: PhantomData {},
            ports_set: PhantomData {},
        }
    }
    /// Set a `port` for use by your application. This will appear to the other
    /// nodes in the Chart.
    /// # Note
    /// You need to use this or [`with_service_ports`](Self::with_service_ports) if 
    /// building with [`finish()`](Self::finish). Cant be used when bulding with
    /// [`custom_msg`](Self::custom_msg).
    #[must_use]
    pub fn with_service_port(self, port: u16) -> ChartBuilder<N, IdSet, Yes, No> {
        ChartBuilder {
            header: self.header,
            discovery_port: self.discovery_port,
            service_id: self.service_id,
            service_port: Some(port),
            service_ports: self.service_ports,
            rampdown: self.rampdown,
            local: self.local,
            id_set: PhantomData {},
            port_set: PhantomData {},
            ports_set: PhantomData {},
        }
    }
    /// Set mutiple `ports` for use by your application. This will appear to the other
    /// nodes in the Chart.
    /// # Note
    /// You need to use this or [`with_service_port`](Self::with_service_port) if 
    /// building with [`finish()`](Self::finish). Cant be used when bulding with
    /// [`custom_msg`](Self::custom_msg).
    #[must_use]
    pub fn with_service_ports(self, ports: [u16; N]) -> ChartBuilder<N, IdSet, No, Yes> {
        ChartBuilder {
            header: self.header,
            discovery_port: self.discovery_port,
            service_id: self.service_id,
            service_port: None,
            service_ports: ports,
            rampdown: self.rampdown,
            local: self.local,
            id_set: PhantomData {},
            port_set: PhantomData {},
            ports_set: PhantomData {},
        }
    }
    /// set a custom header number. The header is used to identify your application's chart
    /// from others multicast traffic when deployed your should set this to a [random](https://www.random.org) number.
    #[must_use]
    pub fn with_header(mut self, header: u64) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
        self.header = header;
        self
    }
    /// set custom port for discovery. With [local discovery] enabled this port needs to be
    /// free and unused on all nodes it is not free the multicast traffic caused by this library
    /// might corrupt network data of other applications. The default port is 8080.
    /// # Warning
    /// Not all ports seem to pass multicast traffic, you might need to experiment a bit.
    #[must_use]
    pub fn with_discovery_port(mut self, port: u16) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
        self.discovery_port = port;
        self
    }
    /// set duration between discovery broadcasts, decreases linearly from `max` to `min`
    /// over `rampdown` period.
    /// # Panics
    /// panics if min is larger then max
    #[must_use]
    pub fn with_rampdown(
        mut self,
        min: Duration,
        max: Duration,
        rampdown: Duration,
    ) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
        assert!(
            min <= max,
            "minimum duration: {min:?} must be smaller or equal to the maximum: {max:?}"
        );
        self.rampdown = interval::Params { rampdown, min, max };
        self
    }

    #[must_use]
    /// set whether discovery is enabled within the same host. Defaults to false.
    /// 
    /// # Warning
    /// When this is enabled you might not be warned if the `discovery port` is in use by another application. 
    /// The other application will recieve network traffic from this crate. This might lead to
    /// corruption in the application if it can not handle this.
    /// `ChartBuilder` will still fail if the `discovery port` is already bound to a multicast adress 
    /// without `SO_REUSEADDR` set.
    pub fn local_discovery(
        mut self,
        is_enabled: bool,
    ) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
        self.local = is_enabled;
        self
    }
}

impl ChartBuilder<1, Yes, No, No> {
    /// build a chart with a custom msg instead of a service port. The message can
    /// be any struct that implements `Debug`, `Clone`, `serde::Serialize` and `serde::Deserialize`
    ///
    /// # Errors
    /// This errors if the discovery port could not be opened. see: [`Self::with_discovery_port`].
    ///
    /// # Example
    /// ```rust
    ///use instance_chart::{discovery, ChartBuilder};
    ///use serde::{Serialize, Deserialize};
    ///use std::error::Error;
    ///use std::time::Duration;
    ///
    ///#[derive(Debug, Clone, Serialize, Deserialize)]
    ///struct Msg(u32);
    ///
    ///#[tokio::main]
    ///async fn main() -> Result<(), Box<dyn Error>> {
    ///   let msg = Msg(0);
    ///   let chart = ChartBuilder::new()
    ///       .with_id(1)
    ///       .with_discovery_port(8888)
    ///       .with_header(17249479) // optional
    ///       .with_rampdown( // optional
    ///           Duration::from_millis(10),
    ///           Duration::from_secs(10),
    ///           Duration::from_secs(60))
    ///       .custom_msg(msg)?;
    ///   let maintain = discovery::maintain(chart.clone());
    ///   let _ = tokio::spawn(maintain); // maintain task will run forever
    ///   Ok(())
    /// }
    /// ```
    #[allow(clippy::missing_panics_doc)] // with generic IdSet and PortSet set service_id must be set
    pub fn custom_msg<Msg>(self, msg: Msg) -> Result<Chart<1, Msg>, Error>
    where
        Msg: Debug + Serialize + Clone,
    {
        let sock = open_socket(self.discovery_port, self.local)?;
        Ok(Chart {
            header: self.header,
            service_id: self.service_id.unwrap(),
            msg: [msg],
            sock: Arc::new(sock),
            map: Arc::new(Mutex::new(HashMap::new())),
            interval: self.rampdown.into(),
            broadcast: broadcast::channel(16).0,
        })
    }
}

impl ChartBuilder<1, Yes, Yes, No> {
    /// build a chart that has a single service ports set
    ///
    /// # Errors
    /// This errors if the discovery port could not be opened. see: [`Self::with_discovery_port`].
    ///
    /// # Example
    /// ```rust
    ///use std::error::Error;
    ///use instance_chart::{discovery, ChartBuilder};
    ///use std::time::Duration;
    ///
    ///#[tokio::main]
    ///async fn main() -> Result<(), Box<dyn Error>> {
    ///   let chart = ChartBuilder::new()
    ///       .with_id(1)
    ///       .with_service_port(8042)
    ///       .with_discovery_port(8888)
    ///       .with_header(17249479) // optional
    ///       .with_rampdown( // optional
    ///           Duration::from_millis(10),
    ///           Duration::from_secs(10),
    ///           Duration::from_secs(60))
    ///       .finish()?;
    ///   let maintain = discovery::maintain(chart.clone());
    ///   let _ = tokio::spawn(maintain); // maintain task will run forever
    ///   Ok(())
    /// }
    /// ```

    // with generic IdSet, PortSet set service_id and service_port are always Some
    #[allow(clippy::missing_panics_doc)]
    pub fn finish(self) -> Result<Chart<1, Port>, Error> {
        let sock = open_socket(self.discovery_port, self.local)?;
        Ok(Chart {
            header: self.header,
            service_id: self.service_id.unwrap(),
            msg: [self.service_port.unwrap()],
            sock: Arc::new(sock),
            map: Arc::new(Mutex::new(HashMap::new())),
            interval: self.rampdown.into(),
            broadcast: broadcast::channel(16).0,
        })
    }
}

impl<const N: usize> ChartBuilder<N, Yes, No, Yes> {
    /// build a chart that has a multiple service ports set
    ///
    /// # Errors
    /// This errors if the discovery port could not be opened. see: [`Self::with_discovery_port`].
    ///
    /// # Example
    /// ```rust
    ///use std::error::Error;
    ///use instance_chart::{discovery, ChartBuilder};
    ///use std::time::Duration;
    ///
    ///#[tokio::main]
    ///async fn main() -> Result<(), Box<dyn Error>> {
    ///   let chart = ChartBuilder::new()
    ///       .with_id(1)
    ///       .with_service_ports([8042,9042])
    ///       .with_discovery_port(8888)
    ///       .with_header(17249479) // optional
    ///       .with_rampdown( // optional
    ///           Duration::from_millis(10),
    ///           Duration::from_secs(10),
    ///           Duration::from_secs(60))
    ///       .finish()?;
    ///   let maintain = discovery::maintain(chart.clone());
    ///   let _ = tokio::spawn(maintain); // maintain task will run forever
    ///   Ok(())
    /// }
    /// ```

    // with generic IdSet, PortSets set service_id and service_ports are always Some
    #[allow(clippy::missing_panics_doc)]
    pub fn finish(self) -> Result<Chart<N, Port>, Error> {
        let sock = open_socket(self.discovery_port, self.local)?;
        Ok(Chart {
            header: self.header,
            service_id: self.service_id.unwrap(),
            msg: self.service_ports,
            sock: Arc::new(sock),
            map: Arc::new(Mutex::new(HashMap::new())),
            interval: self.rampdown.into(),
            broadcast: broadcast::channel(16).0,
        })
    }
}

fn open_socket(port: u16, local_discovery: bool) -> Result<UdpSocket, Error> {
    use socket2::{Domain, SockAddr, Socket, Type};
    use Error::*;

    assert_ne!(port, 0);

    let interface = Ipv4Addr::from([0, 0, 0, 0]);
    let multiaddr = Ipv4Addr::from([224, 0, 0, 251]);

    let sock = Socket::new(Domain::IPV4, Type::DGRAM, None).map_err(Construct)?;

    if local_discovery {
        sock.set_reuse_port(true).map_err(SetReuse)?; // allow binding to a port already in use
    }
    sock.set_broadcast(true).map_err(SetBroadcast)?; // enable udp broadcasting
    sock.set_multicast_loop_v4(true).map_err(SetMulticast)?; // send broadcast to self
    sock.set_ttl(4).map_err(SetTTL)?; // deliver to other subnetworks

    let address = SocketAddr::from((interface, port));
    let address = SockAddr::from(address);
    sock.bind(&address).map_err(|error| Bind { error, port })?;
    sock.join_multicast_v4(&multiaddr, &interface)
        .map_err(JoinMulticast)?;

    let sock = std::net::UdpSocket::from(sock);
    sock.set_nonblocking(true).map_err(SetNonBlocking)?;
    let sock = UdpSocket::from_std(sock).map_err(ToTokio)?;
    Ok(sock)
}

#[cfg(test)]
mod compiles {
    use super::*;

    #[tokio::test]
    async fn with_service_port() {
        let chart = ChartBuilder::new()
            .with_id(0)
            .with_service_port(15)
            .local_discovery(true)
            .finish()
            .unwrap();
        let _ = chart.our_service_port();
    }

    #[tokio::test]
    async fn with_service_ports() {
        let chart = ChartBuilder::new()
            .with_id(0)
            .with_service_ports([1, 2])
            .local_discovery(true)
            .finish()
            .unwrap();
        let _ = chart.our_service_ports();
    }

    #[tokio::test]
    async fn custom_msg() {
        let chart = ChartBuilder::new()
            .with_id(0)
            .local_discovery(true)
            .custom_msg("hi")
            .unwrap();
        let _ = chart.our_msg();
    }
}