instance_chart/chart/
builder.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::marker::PhantomData;
4use std::net::{Ipv4Addr, SocketAddr};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use crate::Error;
9
10use super::{interval, Chart, Id};
11use rand::rngs::OsRng;
12use rand::RngCore;
13use serde::Serialize;
14use tokio::net::UdpSocket;
15use tokio::sync::broadcast;
16use tracing::info;
17
18#[derive(Debug, Default)]
19pub struct Yes;
20#[derive(Debug, Default)]
21pub struct No;
22
23pub trait ToAssign: core::fmt::Debug {}
24pub trait Assigned: ToAssign {}
25pub trait NotAssigned: ToAssign {}
26
27impl ToAssign for Yes {}
28impl ToAssign for No {}
29
30impl Assigned for Yes {}
31impl NotAssigned for No {}
32
33const DEFAULT_HEADER: u64 = 6_687_164_552_036_412_667;
34const DEFAULT_PORT: u16 = 8080;
35
36pub type Port = u16;
37
38/// Construct a Chart using a builder-like pattern. You must always set an `id`. You also
39/// need to set [`service port`](ChartBuilder::with_service_port) or [`service ports`](ChartBuilder::with_service_ports). Now you can build with [`finish`](ChartBuilder::finish) or using [`custom_msg`][ChartBuilder::custom_msg]. The latter allowes you to set a custom message to share with other instances when they discover you.
40#[allow(clippy::pedantic)]
41pub struct ChartBuilder<const N: usize, IdSet, PortSet, PortsSet>
42where
43    IdSet: ToAssign,
44    PortSet: ToAssign,
45    PortsSet: ToAssign,
46{
47    header: u64,
48    service_id: Option<Id>,
49    discovery_port: u16,
50    service_port: Option<u16>,
51    service_ports: [u16; N],
52    rampdown: interval::Params,
53    local: bool,
54    id_set: PhantomData<IdSet>,
55    port_set: PhantomData<PortSet>,
56    ports_set: PhantomData<PortsSet>,
57}
58
59impl<const N: usize> ChartBuilder<N, No, No, No> {
60    /// Create a new chart builder
61    #[allow(clippy::new_without_default)] // builder struct not valid without other methods
62    #[must_use]
63    pub fn new() -> ChartBuilder<N, No, No, No> {
64        ChartBuilder {
65            header: DEFAULT_HEADER,
66            service_id: None,
67            discovery_port: DEFAULT_PORT,
68            service_ports: [0u16; N],
69            service_port: None,
70            rampdown: interval::Params::default(),
71            local: false,
72            id_set: PhantomData {},
73            port_set: PhantomData {},
74            ports_set: PhantomData {},
75        }
76    }
77}
78
79impl<const N: usize, IdSet, PortSet, PortsSet> ChartBuilder<N, IdSet, PortSet, PortsSet>
80where
81    IdSet: ToAssign,
82    PortSet: ToAssign,
83    PortsSet: ToAssign,
84{
85    /// Set the [`Id`] for this node, the [`Id`] is the key for this node in the chart
86    /// # Note
87    /// Always needed, you can not build without an [`Id`] set. The [`Id`] must be __unique__
88    #[must_use]
89    pub fn with_id(self, id: Id) -> ChartBuilder<N, Yes, PortSet, PortsSet> {
90        ChartBuilder {
91            header: self.header,
92            discovery_port: self.discovery_port,
93            service_id: Some(id),
94            service_port: self.service_port,
95            service_ports: self.service_ports,
96            rampdown: self.rampdown,
97            local: self.local,
98            id_set: PhantomData {},
99            port_set: PhantomData {},
100            ports_set: PhantomData {},
101        }
102    }
103
104    /// Use a true random number from a reliable source of randomness as an [`Id`].
105    /// # Note
106    /// I recommend setting the [`Id`] in a deterministic way if possible, it makes debugging a lot
107    /// easier. Theoretically using this method can fail if multiple instances get the same random
108    /// number, the chance of this is unrealistically small.
109    ///
110    /// It is *extreemly* unlikely though possible that this fails. This happens if the systems source of random is configured incorrectly.
111    #[must_use]
112    pub fn with_random_id(self) -> ChartBuilder<N, Yes, PortSet, PortsSet> {
113        let mut rng = OsRng::default();
114        let id = rng.next_u64();
115        info!("Using random id: {id}");
116        ChartBuilder {
117            header: self.header,
118            discovery_port: self.discovery_port,
119            service_id: Some(id),
120            service_port: self.service_port,
121            service_ports: self.service_ports,
122            rampdown: self.rampdown,
123            local: self.local,
124            id_set: PhantomData {},
125            port_set: PhantomData {},
126            ports_set: PhantomData {},
127        }
128    }
129    /// Set a `port` for use by your application. This will appear to the other
130    /// nodes in the Chart.
131    /// # Note
132    /// You need to use this or [`with_service_ports`](Self::with_service_ports) if
133    /// building with [`finish()`](Self::finish). Cant be used when bulding with
134    /// [`custom_msg`](Self::custom_msg).
135    #[must_use]
136    pub fn with_service_port(self, port: u16) -> ChartBuilder<N, IdSet, Yes, No> {
137        ChartBuilder {
138            header: self.header,
139            discovery_port: self.discovery_port,
140            service_id: self.service_id,
141            service_port: Some(port),
142            service_ports: self.service_ports,
143            rampdown: self.rampdown,
144            local: self.local,
145            id_set: PhantomData {},
146            port_set: PhantomData {},
147            ports_set: PhantomData {},
148        }
149    }
150    /// Set mutiple `ports` for use by your application. This will appear to the other
151    /// nodes in the Chart.
152    /// # Note
153    /// You need to use this or [`with_service_port`](Self::with_service_port) if
154    /// building with [`finish()`](Self::finish). Cant be used when bulding with
155    /// [`custom_msg`](Self::custom_msg).
156    #[must_use]
157    pub fn with_service_ports(self, ports: [u16; N]) -> ChartBuilder<N, IdSet, No, Yes> {
158        ChartBuilder {
159            header: self.header,
160            discovery_port: self.discovery_port,
161            service_id: self.service_id,
162            service_port: None,
163            service_ports: ports,
164            rampdown: self.rampdown,
165            local: self.local,
166            id_set: PhantomData {},
167            port_set: PhantomData {},
168            ports_set: PhantomData {},
169        }
170    }
171    /// set a custom header number. The header is used to identify your application's chart
172    /// from others multicast traffic when deployed your should set this to a [random](https://www.random.org) number.
173    #[must_use]
174    pub fn with_header(mut self, header: u64) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
175        self.header = header;
176        self
177    }
178    /// set custom port for discovery. With [local discovery] enabled this port needs to be
179    /// free and unused on all nodes it is not free the multicast traffic caused by this library
180    /// might corrupt network data of other applications. The default port is 8080.
181    /// # Warning
182    /// Not all ports seem to pass multicast traffic, you might need to experiment a bit.
183    #[must_use]
184    pub fn with_discovery_port(mut self, port: u16) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
185        self.discovery_port = port;
186        self
187    }
188    /// set duration between discovery broadcasts, decreases linearly from `max` to `min`
189    /// over `rampdown` period.
190    /// # Panics
191    /// panics if min is larger then max
192    #[must_use]
193    pub fn with_rampdown(
194        mut self,
195        min: Duration,
196        max: Duration,
197        rampdown: Duration,
198    ) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
199        assert!(
200            min <= max,
201            "minimum duration: {min:?} must be smaller or equal to the maximum: {max:?}"
202        );
203        self.rampdown = interval::Params { rampdown, min, max };
204        self
205    }
206
207    #[must_use]
208    /// set whether discovery is enabled within the same host. Defaults to false.
209    ///
210    /// # Warning
211    /// When this is enabled you might not be warned if the `discovery port` is in use by another application.
212    /// The other application will recieve network traffic from this crate. This might lead to
213    /// corruption in the application if it can not handle this.
214    /// `ChartBuilder` will still fail if the `discovery port` is already bound to a multicast adress
215    /// without `SO_REUSEADDR` set.
216    pub fn local_discovery(
217        mut self,
218        is_enabled: bool,
219    ) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
220        self.local = is_enabled;
221        self
222    }
223}
224
225impl ChartBuilder<1, Yes, No, No> {
226    /// build a chart with a custom msg instead of a service port. The message can
227    /// be any struct that implements `Debug`, `Clone`, `serde::Serialize` and `serde::Deserialize`
228    ///
229    /// # Errors
230    /// This errors if the discovery port could not be opened. see: [`Self::with_discovery_port`].
231    ///
232    /// # Example
233    /// ```rust
234    ///use instance_chart::{discovery, ChartBuilder};
235    ///use serde::{Serialize, Deserialize};
236    ///use std::error::Error;
237    ///use std::time::Duration;
238    ///
239    ///#[derive(Debug, Clone, Serialize, Deserialize)]
240    ///struct Msg(u32);
241    ///
242    ///#[tokio::main]
243    ///async fn main() -> Result<(), Box<dyn Error>> {
244    ///   let msg = Msg(0);
245    ///   let chart = ChartBuilder::new()
246    ///       .with_id(1)
247    ///       .with_discovery_port(8888)
248    ///       .with_header(17249479) // optional
249    ///       .with_rampdown( // optional
250    ///           Duration::from_millis(10),
251    ///           Duration::from_secs(10),
252    ///           Duration::from_secs(60))
253    ///       .custom_msg(msg)?;
254    ///   let maintain = discovery::maintain(chart.clone());
255    ///   let _ = tokio::spawn(maintain); // maintain task will run forever
256    ///   Ok(())
257    /// }
258    /// ```
259    #[allow(clippy::missing_panics_doc)] // with generic IdSet and PortSet set service_id must be set
260    pub fn custom_msg<Msg>(self, msg: Msg) -> Result<Chart<1, Msg>, Error>
261    where
262        Msg: Debug + Serialize + Clone,
263    {
264        let sock = open_socket(self.discovery_port, self.local)?;
265        Ok(Chart {
266            header: self.header,
267            service_id: self.service_id.unwrap(),
268            msg: [msg],
269            sock: Arc::new(sock),
270            map: Arc::new(Mutex::new(HashMap::new())),
271            interval: self.rampdown.into(),
272            broadcast: broadcast::channel(256).0,
273        })
274    }
275}
276
277impl ChartBuilder<1, Yes, Yes, No> {
278    /// build a chart that has a single service ports set
279    ///
280    /// # Errors
281    /// This errors if the discovery port could not be opened. see: [`Self::with_discovery_port`].
282    ///
283    /// # Example
284    /// ```rust
285    ///use std::error::Error;
286    ///use instance_chart::{discovery, ChartBuilder};
287    ///use std::time::Duration;
288    ///
289    ///#[tokio::main]
290    ///async fn main() -> Result<(), Box<dyn Error>> {
291    ///   let chart = ChartBuilder::new()
292    ///       .with_id(1)
293    ///       .with_service_port(8042)
294    ///       .with_discovery_port(8888)
295    ///       .with_header(17249479) // optional
296    ///       .with_rampdown( // optional
297    ///           Duration::from_millis(10),
298    ///           Duration::from_secs(10),
299    ///           Duration::from_secs(60))
300    ///       .finish()?;
301    ///   let maintain = discovery::maintain(chart.clone());
302    ///   let _ = tokio::spawn(maintain); // maintain task will run forever
303    ///   Ok(())
304    /// }
305    /// ```
306
307    // with generic IdSet, PortSet set service_id and service_port are always Some
308    #[allow(clippy::missing_panics_doc)]
309    pub fn finish(self) -> Result<Chart<1, Port>, Error> {
310        let sock = open_socket(self.discovery_port, self.local)?;
311        Ok(Chart {
312            header: self.header,
313            service_id: self.service_id.unwrap(),
314            msg: [self.service_port.unwrap()],
315            sock: Arc::new(sock),
316            map: Arc::new(Mutex::new(HashMap::new())),
317            interval: self.rampdown.into(),
318            broadcast: broadcast::channel(256).0,
319        })
320    }
321}
322
323impl<const N: usize> ChartBuilder<N, Yes, No, Yes> {
324    /// build a chart that has a multiple service ports set
325    ///
326    /// # Errors
327    /// This errors if the discovery port could not be opened. see: [`Self::with_discovery_port`].
328    ///
329    /// # Example
330    /// ```rust
331    ///use std::error::Error;
332    ///use instance_chart::{discovery, ChartBuilder};
333    ///use std::time::Duration;
334    ///
335    ///#[tokio::main]
336    ///async fn main() -> Result<(), Box<dyn Error>> {
337    ///   let chart = ChartBuilder::new()
338    ///       .with_id(1)
339    ///       .with_service_ports([8042,9042])
340    ///       .with_discovery_port(8888)
341    ///       .with_header(17249479) // optional
342    ///       .with_rampdown( // optional
343    ///           Duration::from_millis(10),
344    ///           Duration::from_secs(10),
345    ///           Duration::from_secs(60))
346    ///       .finish()?;
347    ///   let maintain = discovery::maintain(chart.clone());
348    ///   let _ = tokio::spawn(maintain); // maintain task will run forever
349    ///   Ok(())
350    /// }
351    /// ```
352
353    // with generic IdSet, PortSets set service_id and service_ports are always Some
354    #[allow(clippy::missing_panics_doc)]
355    pub fn finish(self) -> Result<Chart<N, Port>, Error> {
356        let sock = open_socket(self.discovery_port, self.local)?;
357        Ok(Chart {
358            header: self.header,
359            service_id: self.service_id.unwrap(),
360            msg: self.service_ports,
361            sock: Arc::new(sock),
362            map: Arc::new(Mutex::new(HashMap::new())),
363            interval: self.rampdown.into(),
364            broadcast: broadcast::channel(256).0,
365        })
366    }
367}
368
369fn open_socket(port: u16, local_discovery: bool) -> Result<UdpSocket, Error> {
370    use socket2::{Domain, SockAddr, Socket, Type};
371    use Error::{
372        Bind, Construct, JoinMulticast, SetBroadcast, SetMulticast, SetNonBlocking, SetReuse,
373        SetTTL, ToTokio,
374    };
375
376    assert_ne!(port, 0);
377
378    let interface = Ipv4Addr::from([0, 0, 0, 0]);
379    let multiaddr = Ipv4Addr::from([224, 0, 0, 251]);
380
381    let sock = Socket::new(Domain::IPV4, Type::DGRAM, None).map_err(Construct)?;
382
383    if local_discovery {
384        sock.set_reuse_port(true).map_err(SetReuse)?; // allow binding to a port already in use
385    }
386    sock.set_broadcast(true).map_err(SetBroadcast)?; // enable udp broadcasting
387    sock.set_multicast_loop_v4(true).map_err(SetMulticast)?; // send broadcast to self
388    sock.set_ttl(4).map_err(SetTTL)?; // deliver to other subnetworks
389
390    let address = SocketAddr::from((interface, port));
391    let address = SockAddr::from(address);
392    sock.bind(&address).map_err(|error| Bind { error, port })?;
393    sock.join_multicast_v4(&multiaddr, &interface)
394        .map_err(JoinMulticast)?;
395
396    let sock = std::net::UdpSocket::from(sock);
397    sock.set_nonblocking(true).map_err(SetNonBlocking)?;
398    let sock = UdpSocket::from_std(sock).map_err(ToTokio)?;
399    Ok(sock)
400}
401
402#[cfg(test)]
403mod compiles {
404    use super::*;
405
406    #[tokio::test]
407    async fn with_service_port() {
408        let chart = ChartBuilder::new()
409            .with_id(0)
410            .with_service_port(15)
411            .local_discovery(true)
412            .finish()
413            .unwrap();
414        let _ = chart.our_service_port();
415    }
416
417    #[tokio::test]
418    async fn with_service_ports() {
419        let chart = ChartBuilder::new()
420            .with_id(0)
421            .with_service_ports([1, 2])
422            .local_discovery(true)
423            .finish()
424            .unwrap();
425        let _ = chart.our_service_ports();
426    }
427
428    #[tokio::test]
429    async fn custom_msg() {
430        let chart = ChartBuilder::new()
431            .with_id(0)
432            .local_discovery(true)
433            .custom_msg("hi")
434            .unwrap();
435        let _ = chart.our_msg();
436    }
437}