1use std::{net::SocketAddr, sync::atomic::AtomicU64};
2
3use ntp_proto::PeerNtsData;
4use tokio::sync::mpsc;
5
6use crate::config::NormalizedAddress;
7
8#[cfg(test)]
9pub mod dummy;
10pub mod nts;
11pub mod pool;
12pub mod standard;
13
14#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
17pub struct SpawnerId(u64);
18
19impl SpawnerId {
20 pub fn new() -> SpawnerId {
21 static COUNTER: AtomicU64 = AtomicU64::new(1);
22 SpawnerId(COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
23 }
24}
25
26impl Default for SpawnerId {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
36pub struct PeerId(u64);
37
38impl PeerId {
39 pub fn new() -> PeerId {
40 static COUNTER: AtomicU64 = AtomicU64::new(1);
41 PeerId(COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
42 }
43}
44
45impl Default for PeerId {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51#[derive(Debug)]
57pub struct SpawnEvent {
58 pub id: SpawnerId,
59 pub action: SpawnAction,
60}
61
62impl SpawnEvent {
63 pub fn new(id: SpawnerId, action: SpawnAction) -> SpawnEvent {
64 SpawnEvent { id, action }
65 }
66}
67
68#[derive(Debug)]
70pub enum SystemEvent {
71 PeerRemoved(PeerRemovedEvent),
72 PeerRegistered(PeerCreateParameters),
73 Shutdown,
74}
75
76impl SystemEvent {
77 pub fn peer_removed(id: PeerId, reason: PeerRemovalReason) -> SystemEvent {
78 SystemEvent::PeerRemoved(PeerRemovedEvent { id, reason })
79 }
80}
81
82#[derive(Debug)]
83pub struct PeerRemovedEvent {
84 pub id: PeerId,
85 pub reason: PeerRemovalReason,
86}
87
88#[derive(Debug, PartialEq, Eq)]
90pub enum PeerRemovalReason {
91 Demobilized,
92 NetworkIssue,
93 Unreachable,
94}
95
96#[derive(Debug)]
99pub enum SpawnAction {
100 Create(PeerCreateParameters),
101 }
103
104impl SpawnAction {
105 pub fn create(
106 id: PeerId,
107 addr: SocketAddr,
108 normalized_addr: NormalizedAddress,
109 nts: Option<Box<PeerNtsData>>,
110 ) -> SpawnAction {
111 SpawnAction::Create(PeerCreateParameters {
112 id,
113 addr,
114 normalized_addr,
115 nts,
116 })
117 }
118}
119
120#[derive(Debug)]
121pub struct PeerCreateParameters {
122 pub id: PeerId,
123 pub addr: SocketAddr,
124 pub normalized_addr: NormalizedAddress,
125 pub nts: Option<Box<PeerNtsData>>,
126}
127
128#[cfg(test)]
129impl PeerCreateParameters {
130 pub fn from_new_addr(addr: SocketAddr) -> PeerCreateParameters {
131 Self::from_addr(PeerId::new(), addr)
132 }
133
134 pub fn from_addr(id: PeerId, addr: SocketAddr) -> PeerCreateParameters {
135 PeerCreateParameters {
136 id,
137 addr,
138 normalized_addr: NormalizedAddress::from_string_ntp(format!(
139 "{}:{}",
140 addr.ip(),
141 addr.port()
142 ))
143 .unwrap(),
144 nts: None,
145 }
146 }
147
148 pub fn from_ip_and_port(id: PeerId, ip: impl Into<String>, port: u16) -> PeerCreateParameters {
149 Self::from_addr(
150 id,
151 SocketAddr::new(
152 ip.into().parse().expect("Invalid ip address specified"),
153 port,
154 ),
155 )
156 }
157
158 pub fn from_new_ip_and_port(ip: impl Into<String>, port: u16) -> PeerCreateParameters {
159 Self::from_ip_and_port(PeerId::new(), ip, port)
160 }
161
162 pub async fn from_normalized(id: PeerId, addr: NormalizedAddress) -> PeerCreateParameters {
163 let socket_addr = addr
164 .lookup_host()
165 .await
166 .expect("Lookup failed")
167 .next()
168 .expect("Lookup unexpectedly returned zero responses");
169 PeerCreateParameters {
170 id,
171 addr: socket_addr,
172 normalized_addr: addr,
173 nts: None,
174 }
175 }
176}
177
178#[async_trait::async_trait]
179pub trait Spawner {
180 type Error: std::error::Error + Send;
181
182 async fn run(
188 self,
189 action_tx: mpsc::Sender<SpawnEvent>,
190 system_notify: mpsc::Receiver<SystemEvent>,
191 ) -> Result<(), Self::Error>;
192
193 fn get_id(&self) -> SpawnerId;
195
196 fn get_addr_description(&self) -> String;
198
199 fn get_description(&self) -> &str;
201}
202
203#[async_trait::async_trait]
204pub trait BasicSpawner {
205 type Error: std::error::Error + Send;
206
207 async fn handle_init(
215 &mut self,
216 action_tx: &mpsc::Sender<SpawnEvent>,
217 ) -> Result<(), Self::Error>;
218
219 async fn handle_peer_removed(
226 &mut self,
227 event: PeerRemovedEvent,
228 action_tx: &mpsc::Sender<SpawnEvent>,
229 ) -> Result<(), Self::Error>;
230
231 async fn handle_registered(
237 &mut self,
238 _event: PeerCreateParameters,
239 _action_tx: &mpsc::Sender<SpawnEvent>,
240 ) -> Result<(), Self::Error> {
241 Ok(())
242 }
243
244 fn get_id(&self) -> SpawnerId;
246
247 fn get_addr_description(&self) -> String;
249
250 fn get_description(&self) -> &str;
252}
253
254#[async_trait::async_trait]
255impl<T, E> Spawner for T
256where
257 T: BasicSpawner<Error = E> + Send + 'static,
258 E: std::error::Error + Send + 'static,
259{
260 type Error = E;
261
262 async fn run(
263 mut self,
264 action_tx: mpsc::Sender<SpawnEvent>,
265 mut system_notify: mpsc::Receiver<SystemEvent>,
266 ) -> Result<(), E> {
267 self.handle_init(&action_tx).await?;
270 while let Some(event) = system_notify.recv().await {
271 match event {
272 SystemEvent::PeerRegistered(peer_params) => {
273 self.handle_registered(peer_params, &action_tx).await?;
274 }
275 SystemEvent::PeerRemoved(removed_peer) => {
276 self.handle_peer_removed(removed_peer, &action_tx).await?;
277 }
278 SystemEvent::Shutdown => {
279 break;
280 }
281 }
282 }
283
284 Ok(())
285 }
286
287 fn get_id(&self) -> SpawnerId {
288 self.get_id()
289 }
290
291 fn get_addr_description(&self) -> String {
292 self.get_addr_description()
293 }
294
295 fn get_description(&self) -> &str {
296 self.get_description()
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::{PeerCreateParameters, SpawnAction, SpawnEvent};
303
304 pub fn get_create_params(res: SpawnEvent) -> PeerCreateParameters {
305 let SpawnAction::Create(params) = res.action;
306 params
307 }
308}