instance_chart/chart/builder.rs
1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::marker::PhantomData;
4use std::net::{Ipv4Addr, SocketAddr};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use crate::Error;
9
10use super::{interval, Chart, Id};
11use rand::rngs::OsRng;
12use rand::RngCore;
13use serde::Serialize;
14use tokio::net::UdpSocket;
15use tokio::sync::broadcast;
16use tracing::info;
17
18#[derive(Debug, Default)]
19pub struct Yes;
20#[derive(Debug, Default)]
21pub struct No;
22
23pub trait ToAssign: core::fmt::Debug {}
24pub trait Assigned: ToAssign {}
25pub trait NotAssigned: ToAssign {}
26
27impl ToAssign for Yes {}
28impl ToAssign for No {}
29
30impl Assigned for Yes {}
31impl NotAssigned for No {}
32
33const DEFAULT_HEADER: u64 = 6_687_164_552_036_412_667;
34const DEFAULT_PORT: u16 = 8080;
35
36pub type Port = u16;
37
38/// Construct a Chart using a builder-like pattern. You must always set an `id`. You also
39/// need to set [`service port`](ChartBuilder::with_service_port) or [`service ports`](ChartBuilder::with_service_ports). Now you can build with [`finish`](ChartBuilder::finish) or using [`custom_msg`][ChartBuilder::custom_msg]. The latter allowes you to set a custom message to share with other instances when they discover you.
40#[allow(clippy::pedantic)]
41pub struct ChartBuilder<const N: usize, IdSet, PortSet, PortsSet>
42where
43 IdSet: ToAssign,
44 PortSet: ToAssign,
45 PortsSet: ToAssign,
46{
47 header: u64,
48 service_id: Option<Id>,
49 discovery_port: u16,
50 service_port: Option<u16>,
51 service_ports: [u16; N],
52 rampdown: interval::Params,
53 local: bool,
54 id_set: PhantomData<IdSet>,
55 port_set: PhantomData<PortSet>,
56 ports_set: PhantomData<PortsSet>,
57}
58
59impl<const N: usize> ChartBuilder<N, No, No, No> {
60 /// Create a new chart builder
61 #[allow(clippy::new_without_default)] // builder struct not valid without other methods
62 #[must_use]
63 pub fn new() -> ChartBuilder<N, No, No, No> {
64 ChartBuilder {
65 header: DEFAULT_HEADER,
66 service_id: None,
67 discovery_port: DEFAULT_PORT,
68 service_ports: [0u16; N],
69 service_port: None,
70 rampdown: interval::Params::default(),
71 local: false,
72 id_set: PhantomData {},
73 port_set: PhantomData {},
74 ports_set: PhantomData {},
75 }
76 }
77}
78
79impl<const N: usize, IdSet, PortSet, PortsSet> ChartBuilder<N, IdSet, PortSet, PortsSet>
80where
81 IdSet: ToAssign,
82 PortSet: ToAssign,
83 PortsSet: ToAssign,
84{
85 /// Set the [`Id`] for this node, the [`Id`] is the key for this node in the chart
86 /// # Note
87 /// Always needed, you can not build without an [`Id`] set. The [`Id`] must be __unique__
88 #[must_use]
89 pub fn with_id(self, id: Id) -> ChartBuilder<N, Yes, PortSet, PortsSet> {
90 ChartBuilder {
91 header: self.header,
92 discovery_port: self.discovery_port,
93 service_id: Some(id),
94 service_port: self.service_port,
95 service_ports: self.service_ports,
96 rampdown: self.rampdown,
97 local: self.local,
98 id_set: PhantomData {},
99 port_set: PhantomData {},
100 ports_set: PhantomData {},
101 }
102 }
103
104 /// Use a true random number from a reliable source of randomness as an [`Id`].
105 /// # Note
106 /// I recommend setting the [`Id`] in a deterministic way if possible, it makes debugging a lot
107 /// easier. Theoretically using this method can fail if multiple instances get the same random
108 /// number, the chance of this is unrealistically small.
109 ///
110 /// It is *extreemly* unlikely though possible that this fails. This happens if the systems source of random is configured incorrectly.
111 #[must_use]
112 pub fn with_random_id(self) -> ChartBuilder<N, Yes, PortSet, PortsSet> {
113 let mut rng = OsRng::default();
114 let id = rng.next_u64();
115 info!("Using random id: {id}");
116 ChartBuilder {
117 header: self.header,
118 discovery_port: self.discovery_port,
119 service_id: Some(id),
120 service_port: self.service_port,
121 service_ports: self.service_ports,
122 rampdown: self.rampdown,
123 local: self.local,
124 id_set: PhantomData {},
125 port_set: PhantomData {},
126 ports_set: PhantomData {},
127 }
128 }
129 /// Set a `port` for use by your application. This will appear to the other
130 /// nodes in the Chart.
131 /// # Note
132 /// You need to use this or [`with_service_ports`](Self::with_service_ports) if
133 /// building with [`finish()`](Self::finish). Cant be used when bulding with
134 /// [`custom_msg`](Self::custom_msg).
135 #[must_use]
136 pub fn with_service_port(self, port: u16) -> ChartBuilder<N, IdSet, Yes, No> {
137 ChartBuilder {
138 header: self.header,
139 discovery_port: self.discovery_port,
140 service_id: self.service_id,
141 service_port: Some(port),
142 service_ports: self.service_ports,
143 rampdown: self.rampdown,
144 local: self.local,
145 id_set: PhantomData {},
146 port_set: PhantomData {},
147 ports_set: PhantomData {},
148 }
149 }
150 /// Set mutiple `ports` for use by your application. This will appear to the other
151 /// nodes in the Chart.
152 /// # Note
153 /// You need to use this or [`with_service_port`](Self::with_service_port) if
154 /// building with [`finish()`](Self::finish). Cant be used when bulding with
155 /// [`custom_msg`](Self::custom_msg).
156 #[must_use]
157 pub fn with_service_ports(self, ports: [u16; N]) -> ChartBuilder<N, IdSet, No, Yes> {
158 ChartBuilder {
159 header: self.header,
160 discovery_port: self.discovery_port,
161 service_id: self.service_id,
162 service_port: None,
163 service_ports: ports,
164 rampdown: self.rampdown,
165 local: self.local,
166 id_set: PhantomData {},
167 port_set: PhantomData {},
168 ports_set: PhantomData {},
169 }
170 }
171 /// set a custom header number. The header is used to identify your application's chart
172 /// from others multicast traffic when deployed your should set this to a [random](https://www.random.org) number.
173 #[must_use]
174 pub fn with_header(mut self, header: u64) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
175 self.header = header;
176 self
177 }
178 /// set custom port for discovery. With [local discovery] enabled this port needs to be
179 /// free and unused on all nodes it is not free the multicast traffic caused by this library
180 /// might corrupt network data of other applications. The default port is 8080.
181 /// # Warning
182 /// Not all ports seem to pass multicast traffic, you might need to experiment a bit.
183 #[must_use]
184 pub fn with_discovery_port(mut self, port: u16) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
185 self.discovery_port = port;
186 self
187 }
188 /// set duration between discovery broadcasts, decreases linearly from `max` to `min`
189 /// over `rampdown` period.
190 /// # Panics
191 /// panics if min is larger then max
192 #[must_use]
193 pub fn with_rampdown(
194 mut self,
195 min: Duration,
196 max: Duration,
197 rampdown: Duration,
198 ) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
199 assert!(
200 min <= max,
201 "minimum duration: {min:?} must be smaller or equal to the maximum: {max:?}"
202 );
203 self.rampdown = interval::Params { rampdown, min, max };
204 self
205 }
206
207 #[must_use]
208 /// set whether discovery is enabled within the same host. Defaults to false.
209 ///
210 /// # Warning
211 /// When this is enabled you might not be warned if the `discovery port` is in use by another application.
212 /// The other application will recieve network traffic from this crate. This might lead to
213 /// corruption in the application if it can not handle this.
214 /// `ChartBuilder` will still fail if the `discovery port` is already bound to a multicast adress
215 /// without `SO_REUSEADDR` set.
216 pub fn local_discovery(
217 mut self,
218 is_enabled: bool,
219 ) -> ChartBuilder<N, IdSet, PortSet, PortsSet> {
220 self.local = is_enabled;
221 self
222 }
223}
224
225impl ChartBuilder<1, Yes, No, No> {
226 /// build a chart with a custom msg instead of a service port. The message can
227 /// be any struct that implements `Debug`, `Clone`, `serde::Serialize` and `serde::Deserialize`
228 ///
229 /// # Errors
230 /// This errors if the discovery port could not be opened. see: [`Self::with_discovery_port`].
231 ///
232 /// # Example
233 /// ```rust
234 ///use instance_chart::{discovery, ChartBuilder};
235 ///use serde::{Serialize, Deserialize};
236 ///use std::error::Error;
237 ///use std::time::Duration;
238 ///
239 ///#[derive(Debug, Clone, Serialize, Deserialize)]
240 ///struct Msg(u32);
241 ///
242 ///#[tokio::main]
243 ///async fn main() -> Result<(), Box<dyn Error>> {
244 /// let msg = Msg(0);
245 /// let chart = ChartBuilder::new()
246 /// .with_id(1)
247 /// .with_discovery_port(8888)
248 /// .with_header(17249479) // optional
249 /// .with_rampdown( // optional
250 /// Duration::from_millis(10),
251 /// Duration::from_secs(10),
252 /// Duration::from_secs(60))
253 /// .custom_msg(msg)?;
254 /// let maintain = discovery::maintain(chart.clone());
255 /// let _ = tokio::spawn(maintain); // maintain task will run forever
256 /// Ok(())
257 /// }
258 /// ```
259 #[allow(clippy::missing_panics_doc)] // with generic IdSet and PortSet set service_id must be set
260 pub fn custom_msg<Msg>(self, msg: Msg) -> Result<Chart<1, Msg>, Error>
261 where
262 Msg: Debug + Serialize + Clone,
263 {
264 let sock = open_socket(self.discovery_port, self.local)?;
265 Ok(Chart {
266 header: self.header,
267 service_id: self.service_id.unwrap(),
268 msg: [msg],
269 sock: Arc::new(sock),
270 map: Arc::new(Mutex::new(HashMap::new())),
271 interval: self.rampdown.into(),
272 broadcast: broadcast::channel(256).0,
273 })
274 }
275}
276
277impl ChartBuilder<1, Yes, Yes, No> {
278 /// build a chart that has a single service ports set
279 ///
280 /// # Errors
281 /// This errors if the discovery port could not be opened. see: [`Self::with_discovery_port`].
282 ///
283 /// # Example
284 /// ```rust
285 ///use std::error::Error;
286 ///use instance_chart::{discovery, ChartBuilder};
287 ///use std::time::Duration;
288 ///
289 ///#[tokio::main]
290 ///async fn main() -> Result<(), Box<dyn Error>> {
291 /// let chart = ChartBuilder::new()
292 /// .with_id(1)
293 /// .with_service_port(8042)
294 /// .with_discovery_port(8888)
295 /// .with_header(17249479) // optional
296 /// .with_rampdown( // optional
297 /// Duration::from_millis(10),
298 /// Duration::from_secs(10),
299 /// Duration::from_secs(60))
300 /// .finish()?;
301 /// let maintain = discovery::maintain(chart.clone());
302 /// let _ = tokio::spawn(maintain); // maintain task will run forever
303 /// Ok(())
304 /// }
305 /// ```
306
307 // with generic IdSet, PortSet set service_id and service_port are always Some
308 #[allow(clippy::missing_panics_doc)]
309 pub fn finish(self) -> Result<Chart<1, Port>, Error> {
310 let sock = open_socket(self.discovery_port, self.local)?;
311 Ok(Chart {
312 header: self.header,
313 service_id: self.service_id.unwrap(),
314 msg: [self.service_port.unwrap()],
315 sock: Arc::new(sock),
316 map: Arc::new(Mutex::new(HashMap::new())),
317 interval: self.rampdown.into(),
318 broadcast: broadcast::channel(256).0,
319 })
320 }
321}
322
323impl<const N: usize> ChartBuilder<N, Yes, No, Yes> {
324 /// build a chart that has a multiple service ports set
325 ///
326 /// # Errors
327 /// This errors if the discovery port could not be opened. see: [`Self::with_discovery_port`].
328 ///
329 /// # Example
330 /// ```rust
331 ///use std::error::Error;
332 ///use instance_chart::{discovery, ChartBuilder};
333 ///use std::time::Duration;
334 ///
335 ///#[tokio::main]
336 ///async fn main() -> Result<(), Box<dyn Error>> {
337 /// let chart = ChartBuilder::new()
338 /// .with_id(1)
339 /// .with_service_ports([8042,9042])
340 /// .with_discovery_port(8888)
341 /// .with_header(17249479) // optional
342 /// .with_rampdown( // optional
343 /// Duration::from_millis(10),
344 /// Duration::from_secs(10),
345 /// Duration::from_secs(60))
346 /// .finish()?;
347 /// let maintain = discovery::maintain(chart.clone());
348 /// let _ = tokio::spawn(maintain); // maintain task will run forever
349 /// Ok(())
350 /// }
351 /// ```
352
353 // with generic IdSet, PortSets set service_id and service_ports are always Some
354 #[allow(clippy::missing_panics_doc)]
355 pub fn finish(self) -> Result<Chart<N, Port>, Error> {
356 let sock = open_socket(self.discovery_port, self.local)?;
357 Ok(Chart {
358 header: self.header,
359 service_id: self.service_id.unwrap(),
360 msg: self.service_ports,
361 sock: Arc::new(sock),
362 map: Arc::new(Mutex::new(HashMap::new())),
363 interval: self.rampdown.into(),
364 broadcast: broadcast::channel(256).0,
365 })
366 }
367}
368
369fn open_socket(port: u16, local_discovery: bool) -> Result<UdpSocket, Error> {
370 use socket2::{Domain, SockAddr, Socket, Type};
371 use Error::{
372 Bind, Construct, JoinMulticast, SetBroadcast, SetMulticast, SetNonBlocking, SetReuse,
373 SetTTL, ToTokio,
374 };
375
376 assert_ne!(port, 0);
377
378 let interface = Ipv4Addr::from([0, 0, 0, 0]);
379 let multiaddr = Ipv4Addr::from([224, 0, 0, 251]);
380
381 let sock = Socket::new(Domain::IPV4, Type::DGRAM, None).map_err(Construct)?;
382
383 if local_discovery {
384 sock.set_reuse_port(true).map_err(SetReuse)?; // allow binding to a port already in use
385 }
386 sock.set_broadcast(true).map_err(SetBroadcast)?; // enable udp broadcasting
387 sock.set_multicast_loop_v4(true).map_err(SetMulticast)?; // send broadcast to self
388 sock.set_ttl(4).map_err(SetTTL)?; // deliver to other subnetworks
389
390 let address = SocketAddr::from((interface, port));
391 let address = SockAddr::from(address);
392 sock.bind(&address).map_err(|error| Bind { error, port })?;
393 sock.join_multicast_v4(&multiaddr, &interface)
394 .map_err(JoinMulticast)?;
395
396 let sock = std::net::UdpSocket::from(sock);
397 sock.set_nonblocking(true).map_err(SetNonBlocking)?;
398 let sock = UdpSocket::from_std(sock).map_err(ToTokio)?;
399 Ok(sock)
400}
401
402#[cfg(test)]
403mod compiles {
404 use super::*;
405
406 #[tokio::test]
407 async fn with_service_port() {
408 let chart = ChartBuilder::new()
409 .with_id(0)
410 .with_service_port(15)
411 .local_discovery(true)
412 .finish()
413 .unwrap();
414 let _ = chart.our_service_port();
415 }
416
417 #[tokio::test]
418 async fn with_service_ports() {
419 let chart = ChartBuilder::new()
420 .with_id(0)
421 .with_service_ports([1, 2])
422 .local_discovery(true)
423 .finish()
424 .unwrap();
425 let _ = chart.our_service_ports();
426 }
427
428 #[tokio::test]
429 async fn custom_msg() {
430 let chart = ChartBuilder::new()
431 .with_id(0)
432 .local_discovery(true)
433 .custom_msg("hi")
434 .unwrap();
435 let _ = chart.our_msg();
436 }
437}