ntp_daemon/spawn/
mod.rs

1use std::{net::SocketAddr, sync::atomic::AtomicU64};
2
3use ntp_proto::PeerNtsData;
4use tokio::sync::mpsc;
5
6use crate::config::NormalizedAddress;
7
8#[cfg(test)]
9pub mod dummy;
10pub mod nts;
11pub mod pool;
12pub mod standard;
13
14/// Unique identifier for a spawner.
15/// This is used to identify which spawner was used to create a peer
16#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
17pub struct SpawnerId(u64);
18
19impl SpawnerId {
20    pub fn new() -> SpawnerId {
21        static COUNTER: AtomicU64 = AtomicU64::new(1);
22        SpawnerId(COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
23    }
24}
25
26impl Default for SpawnerId {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32/// Unique identifier for a peer.
33/// This peer id makes sure that even if the network address is the same
34/// that we always know which specific spawned peer we are talking about.
35#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
36pub struct PeerId(u64);
37
38impl PeerId {
39    pub fn new() -> PeerId {
40        static COUNTER: AtomicU64 = AtomicU64::new(1);
41        PeerId(COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
42    }
43}
44
45impl Default for PeerId {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51/// A SpawnEvent is an event created by the spawner for the system
52///
53/// The action that the system should execute is encoded in the `action` field.
54/// The spawner should make sure that it only ever sends events with its own
55/// spawner id.
56#[derive(Debug)]
57pub struct SpawnEvent {
58    pub id: SpawnerId,
59    pub action: SpawnAction,
60}
61
62impl SpawnEvent {
63    pub fn new(id: SpawnerId, action: SpawnAction) -> SpawnEvent {
64        SpawnEvent { id, action }
65    }
66}
67
68/// Events coming from the system are encoded in this enum
69#[derive(Debug)]
70pub enum SystemEvent {
71    PeerRemoved(PeerRemovedEvent),
72    PeerRegistered(PeerCreateParameters),
73    Shutdown,
74}
75
76impl SystemEvent {
77    pub fn peer_removed(id: PeerId, reason: PeerRemovalReason) -> SystemEvent {
78        SystemEvent::PeerRemoved(PeerRemovedEvent { id, reason })
79    }
80}
81
82#[derive(Debug)]
83pub struct PeerRemovedEvent {
84    pub id: PeerId,
85    pub reason: PeerRemovalReason,
86}
87
88/// This indicates what the reason was that a peer was removed.
89#[derive(Debug, PartialEq, Eq)]
90pub enum PeerRemovalReason {
91    Demobilized,
92    NetworkIssue,
93    Unreachable,
94}
95
96/// The kind of action that the spawner requests to the system.
97/// Currently a spawner can only create peers
98#[derive(Debug)]
99pub enum SpawnAction {
100    Create(PeerCreateParameters),
101    // Remove(()),
102}
103
104impl SpawnAction {
105    pub fn create(
106        id: PeerId,
107        addr: SocketAddr,
108        normalized_addr: NormalizedAddress,
109        nts: Option<Box<PeerNtsData>>,
110    ) -> SpawnAction {
111        SpawnAction::Create(PeerCreateParameters {
112            id,
113            addr,
114            normalized_addr,
115            nts,
116        })
117    }
118}
119
120#[derive(Debug)]
121pub struct PeerCreateParameters {
122    pub id: PeerId,
123    pub addr: SocketAddr,
124    pub normalized_addr: NormalizedAddress,
125    pub nts: Option<Box<PeerNtsData>>,
126}
127
128#[cfg(test)]
129impl PeerCreateParameters {
130    pub fn from_new_addr(addr: SocketAddr) -> PeerCreateParameters {
131        Self::from_addr(PeerId::new(), addr)
132    }
133
134    pub fn from_addr(id: PeerId, addr: SocketAddr) -> PeerCreateParameters {
135        PeerCreateParameters {
136            id,
137            addr,
138            normalized_addr: NormalizedAddress::from_string_ntp(format!(
139                "{}:{}",
140                addr.ip(),
141                addr.port()
142            ))
143            .unwrap(),
144            nts: None,
145        }
146    }
147
148    pub fn from_ip_and_port(id: PeerId, ip: impl Into<String>, port: u16) -> PeerCreateParameters {
149        Self::from_addr(
150            id,
151            SocketAddr::new(
152                ip.into().parse().expect("Invalid ip address specified"),
153                port,
154            ),
155        )
156    }
157
158    pub fn from_new_ip_and_port(ip: impl Into<String>, port: u16) -> PeerCreateParameters {
159        Self::from_ip_and_port(PeerId::new(), ip, port)
160    }
161
162    pub async fn from_normalized(id: PeerId, addr: NormalizedAddress) -> PeerCreateParameters {
163        let socket_addr = addr
164            .lookup_host()
165            .await
166            .expect("Lookup failed")
167            .next()
168            .expect("Lookup unexpectedly returned zero responses");
169        PeerCreateParameters {
170            id,
171            addr: socket_addr,
172            normalized_addr: addr,
173            nts: None,
174        }
175    }
176}
177
178#[async_trait::async_trait]
179pub trait Spawner {
180    type Error: std::error::Error + Send;
181
182    /// Run a spawner
183    ///
184    /// Actions that the system has to execute can be sent through the
185    /// `action_tx` channel and event coming in from the system that the spawner
186    /// should know about will be sent through the `system_notify` channel.
187    async fn run(
188        self,
189        action_tx: mpsc::Sender<SpawnEvent>,
190        system_notify: mpsc::Receiver<SystemEvent>,
191    ) -> Result<(), Self::Error>;
192
193    /// Returns the id of this spawner
194    fn get_id(&self) -> SpawnerId;
195
196    /// Get a description of the address that this spawner is connected to
197    fn get_addr_description(&self) -> String;
198
199    /// Get a description of the type of spawner
200    fn get_description(&self) -> &str;
201}
202
203#[async_trait::async_trait]
204pub trait BasicSpawner {
205    type Error: std::error::Error + Send;
206
207    /// Handle initial spawning.
208    ///
209    /// This is called on startup of the spawner and is meant to setup the
210    /// initial set of peers when nothing else would have been spawned by this
211    /// spawner. Once this function completes the system should be aware of at
212    /// least one peer for this spawner, otherwise no events will ever be sent
213    /// to the spawner and nothing will ever happen.
214    async fn handle_init(
215        &mut self,
216        action_tx: &mpsc::Sender<SpawnEvent>,
217    ) -> Result<(), Self::Error>;
218
219    /// Event handler for when a peer is removed.
220    ///
221    /// This is called each time the system notifies this spawner that one of
222    /// the spawned peers was removed from the system. The spawner can then add
223    /// additional peers or do nothing, depending on its configuration and
224    /// algorithm.
225    async fn handle_peer_removed(
226        &mut self,
227        event: PeerRemovedEvent,
228        action_tx: &mpsc::Sender<SpawnEvent>,
229    ) -> Result<(), Self::Error>;
230
231    /// Event handler for when a peer is succesfully registered in the system
232    ///
233    /// Every time the spawner sends a peer to the system this handler will
234    /// eventually be called when the system has sucessfully registered the peer
235    /// and will start polling it for ntp packets.
236    async fn handle_registered(
237        &mut self,
238        _event: PeerCreateParameters,
239        _action_tx: &mpsc::Sender<SpawnEvent>,
240    ) -> Result<(), Self::Error> {
241        Ok(())
242    }
243
244    /// Get the id of the spawner
245    fn get_id(&self) -> SpawnerId;
246
247    /// Get a description of the address this spawner is connected to
248    fn get_addr_description(&self) -> String;
249
250    /// Get a description of the type of spawner
251    fn get_description(&self) -> &str;
252}
253
254#[async_trait::async_trait]
255impl<T, E> Spawner for T
256where
257    T: BasicSpawner<Error = E> + Send + 'static,
258    E: std::error::Error + Send + 'static,
259{
260    type Error = E;
261
262    async fn run(
263        mut self,
264        action_tx: mpsc::Sender<SpawnEvent>,
265        mut system_notify: mpsc::Receiver<SystemEvent>,
266    ) -> Result<(), E> {
267        // basic event loop where init is called on startup and then wait for
268        // events from the system before doing anything
269        self.handle_init(&action_tx).await?;
270        while let Some(event) = system_notify.recv().await {
271            match event {
272                SystemEvent::PeerRegistered(peer_params) => {
273                    self.handle_registered(peer_params, &action_tx).await?;
274                }
275                SystemEvent::PeerRemoved(removed_peer) => {
276                    self.handle_peer_removed(removed_peer, &action_tx).await?;
277                }
278                SystemEvent::Shutdown => {
279                    break;
280                }
281            }
282        }
283
284        Ok(())
285    }
286
287    fn get_id(&self) -> SpawnerId {
288        self.get_id()
289    }
290
291    fn get_addr_description(&self) -> String {
292        self.get_addr_description()
293    }
294
295    fn get_description(&self) -> &str {
296        self.get_description()
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::{PeerCreateParameters, SpawnAction, SpawnEvent};
303
304    pub fn get_create_params(res: SpawnEvent) -> PeerCreateParameters {
305        let SpawnAction::Create(params) = res.action;
306        params
307    }
308}