1use crate::{
2 config::{ClockConfig, CombinedSystemConfig, NormalizedAddress, PeerConfig, ServerConfig},
3 peer::{MsgForSystem, PeerChannels},
4 peer::{PeerTask, Wait},
5 server::{ServerStats, ServerTask},
6 spawn::{
7 nts::NtsSpawner, pool::PoolSpawner, standard::StandardSpawner, PeerCreateParameters,
8 PeerId, PeerRemovalReason, SpawnAction, SpawnEvent, Spawner, SpawnerId, SystemEvent,
9 },
10 ObservablePeerState,
11};
12
13use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin, sync::Arc};
14
15use ntp_proto::{
16 DefaultTimeSyncController, KeySet, NtpClock, NtpDuration, PeerSnapshot, SystemSnapshot,
17 TimeSyncController,
18};
19use ntp_udp::{EnableTimestamps, InterfaceName};
20use tokio::{sync::mpsc, task::JoinHandle};
21use tracing::info;
22
23pub const NETWORK_WAIT_PERIOD: std::time::Duration = std::time::Duration::from_secs(1);
24
25pub const MESSAGE_BUFFER_SIZE: usize = 32;
26
27struct SingleshotSleep<T> {
28 enabled: bool,
29 sleep: Pin<Box<T>>,
30}
31
32impl<T: Wait> SingleshotSleep<T> {
33 fn new_disabled(t: T) -> Self {
34 SingleshotSleep {
35 enabled: false,
36 sleep: Box::pin(t),
37 }
38 }
39}
40
41impl<T: Wait> Future for SingleshotSleep<T> {
42 type Output = ();
43
44 fn poll(
45 self: Pin<&mut Self>,
46 cx: &mut std::task::Context<'_>,
47 ) -> std::task::Poll<Self::Output> {
48 let this = self.get_mut();
49 if !this.enabled {
50 return std::task::Poll::Pending;
51 }
52 match this.sleep.as_mut().poll(cx) {
53 std::task::Poll::Ready(v) => {
54 this.enabled = false;
55 std::task::Poll::Ready(v)
56 }
57 u => u,
58 }
59 }
60}
61
62impl<T: Wait> Wait for SingleshotSleep<T> {
63 fn reset(self: Pin<&mut Self>, deadline: tokio::time::Instant) {
64 let this = self.get_mut();
65 this.enabled = true;
66 this.sleep.as_mut().reset(deadline);
67 }
68}
69
70pub struct DaemonChannels {
71 pub config_receiver: tokio::sync::watch::Receiver<CombinedSystemConfig>,
72 pub config_sender: tokio::sync::watch::Sender<CombinedSystemConfig>,
73 pub peer_snapshots_receiver: tokio::sync::watch::Receiver<Vec<ObservablePeerState>>,
74 pub server_data_receiver: tokio::sync::watch::Receiver<Vec<ServerData>>,
75 pub system_snapshot_receiver: tokio::sync::watch::Receiver<SystemSnapshot>,
76 pub keyset: tokio::sync::watch::Receiver<Arc<KeySet>>,
77}
78
79pub async fn spawn(
81 system_config: CombinedSystemConfig,
82 clock_config: ClockConfig,
83 peer_configs: &[PeerConfig],
84 server_configs: &[ServerConfig],
85 keyset: tokio::sync::watch::Receiver<Arc<KeySet>>,
86) -> std::io::Result<(JoinHandle<std::io::Result<()>>, DaemonChannels)> {
87 let (mut system, channels) = System::new(
88 clock_config.clock,
89 clock_config.interface,
90 clock_config.enable_timestamps,
91 system_config,
92 keyset,
93 );
94
95 for peer_config in peer_configs {
96 match peer_config {
97 PeerConfig::Standard(cfg) => {
98 system.add_spawner(StandardSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD));
99 }
100 PeerConfig::Nts(cfg) => {
101 system.add_spawner(NtsSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD));
102 }
103 PeerConfig::Pool(cfg) => {
104 system.add_spawner(PoolSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD));
105 }
106 }
107 }
108
109 for server_config in server_configs.iter() {
110 system.add_server(server_config.to_owned()).await;
111 }
112
113 let handle = tokio::spawn(async move {
114 let sleep =
115 SingleshotSleep::new_disabled(tokio::time::sleep_until(tokio::time::Instant::now()));
116 tokio::pin!(sleep);
117 system.run(sleep).await
118 });
119
120 Ok((handle, channels))
121}
122
123struct SystemSpawnerData {
124 id: SpawnerId,
125 notify_tx: mpsc::Sender<SystemEvent>,
126}
127
128struct System<C: NtpClock, T: Wait> {
129 _wait: PhantomData<SingleshotSleep<T>>,
130 config: CombinedSystemConfig,
131 system: SystemSnapshot,
132
133 config_receiver: tokio::sync::watch::Receiver<CombinedSystemConfig>,
134 system_snapshot_sender: tokio::sync::watch::Sender<SystemSnapshot>,
135 peer_snapshots_sender: tokio::sync::watch::Sender<Vec<ObservablePeerState>>,
136 server_data_sender: tokio::sync::watch::Sender<Vec<ServerData>>,
137 keyset: tokio::sync::watch::Receiver<Arc<KeySet>>,
138
139 msg_for_system_rx: mpsc::Receiver<MsgForSystem>,
140 spawn_tx: mpsc::Sender<SpawnEvent>,
141 spawn_rx: mpsc::Receiver<SpawnEvent>,
142
143 peers: HashMap<PeerId, PeerState>,
144 servers: Vec<ServerData>,
145 spawners: Vec<SystemSpawnerData>,
146
147 peer_channels: PeerChannels,
148
149 clock: C,
150 controller: DefaultTimeSyncController<C, PeerId>,
151
152 enable_timestamps: EnableTimestamps,
154
155 interface: Option<InterfaceName>,
158}
159
160impl<C: NtpClock, T: Wait> System<C, T> {
161 fn new(
162 clock: C,
163 interface: Option<InterfaceName>,
164 enable_timestamps: EnableTimestamps,
165 config: CombinedSystemConfig,
166 keyset: tokio::sync::watch::Receiver<Arc<KeySet>>,
167 ) -> (Self, DaemonChannels) {
168 let system = SystemSnapshot {
170 stratum: config.system.local_stratum,
171 ..Default::default()
172 };
173
174 let (config_sender, config_receiver) = tokio::sync::watch::channel(config);
176 let (system_snapshot_sender, system_snapshot_receiver) =
177 tokio::sync::watch::channel(system);
178 let (peer_snapshots_sender, peer_snapshots_receiver) = tokio::sync::watch::channel(vec![]);
179 let (server_data_sender, server_data_receiver) = tokio::sync::watch::channel(vec![]);
180 let (msg_for_system_sender, msg_for_system_receiver) =
181 tokio::sync::mpsc::channel(MESSAGE_BUFFER_SIZE);
182 let (spawn_tx, spawn_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE);
183
184 (
186 System {
187 _wait: PhantomData,
188 config,
189 system,
190
191 config_receiver: config_receiver.clone(),
192 system_snapshot_sender,
193 peer_snapshots_sender,
194 server_data_sender,
195 keyset: keyset.clone(),
196
197 msg_for_system_rx: msg_for_system_receiver,
198 spawn_rx,
199 spawn_tx,
200
201 peers: Default::default(),
202 servers: Default::default(),
203 spawners: Default::default(),
204 peer_channels: PeerChannels {
205 msg_for_system_sender,
206 system_snapshot_receiver: system_snapshot_receiver.clone(),
207 system_config_receiver: config_receiver.clone(),
208 },
209 clock: clock.clone(),
210 controller: DefaultTimeSyncController::new(clock, config.system, config.algorithm),
211 enable_timestamps,
212 interface,
213 },
214 DaemonChannels {
215 config_receiver,
216 config_sender,
217 peer_snapshots_receiver,
218 server_data_receiver,
219 system_snapshot_receiver,
220 keyset,
221 },
222 )
223 }
224
225 fn add_spawner(&mut self, spawner: impl Spawner + Send + Sync + 'static) -> SpawnerId {
226 let (notify_tx, notify_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE);
227 let id = spawner.get_id();
228 let spawner_data = SystemSpawnerData { id, notify_tx };
229 info!(id=?spawner_data.id, ty=spawner.get_description(), addr=spawner.get_addr_description(), "Running spawner");
230 self.spawners.push(spawner_data);
231 let spawn_tx = self.spawn_tx.clone();
232 tokio::spawn(async move { spawner.run(spawn_tx, notify_rx).await });
233 id
234 }
235
236 async fn run(&mut self, mut wait: Pin<&mut SingleshotSleep<T>>) -> std::io::Result<()> {
237 loop {
238 tokio::select! {
239 opt_msg_for_system = self.msg_for_system_rx.recv() => {
240 match opt_msg_for_system {
241 None => {
242 break
244 }
245 Some(msg_for_system) => {
246 self.handle_peer_update(msg_for_system, &mut wait)
247 .await?;
248 }
249 }
250 }
251 opt_spawn_event = self.spawn_rx.recv() => {
252 match opt_spawn_event {
253 None => {
254 tracing::warn!("the spawn channel closed unexpectedly");
255 }
256 Some(spawn_event) => {
257 self.handle_spawn_event(spawn_event).await;
258 }
259 }
260 }
261 () = &mut wait => {
262 self.handle_timer(&mut wait);
263 }
264 _ = self.config_receiver.changed(), if self.config_receiver.has_changed().is_ok() => {
265 self.handle_config_update();
266 }
267 }
268 }
269
270 Ok(())
272 }
273
274 fn handle_config_update(&mut self) {
275 let config = *self.config_receiver.borrow_and_update();
276 self.controller
277 .update_config(config.system, config.algorithm);
278 self.config = config;
279 }
280
281 fn handle_timer(&mut self, wait: &mut Pin<&mut SingleshotSleep<T>>) {
282 tracing::debug!("Timer expired");
283 let update = self.controller.time_update();
285 self.handle_algorithm_state_update(update, wait);
286 }
287
288 async fn handle_peer_update(
289 &mut self,
290 msg: MsgForSystem,
291 wait: &mut Pin<&mut SingleshotSleep<T>>,
292 ) -> std::io::Result<()> {
293 tracing::debug!(?msg, "updating peer");
294
295 match msg {
296 MsgForSystem::MustDemobilize(index) => {
297 self.handle_peer_demobilize(index).await;
298 }
299 MsgForSystem::NewMeasurement(index, snapshot, measurement) => {
300 self.handle_peer_measurement(index, snapshot, measurement, wait);
301 }
302 MsgForSystem::UpdatedSnapshot(index, snapshot) => {
303 self.handle_peer_snapshot(index, snapshot);
304 }
305 MsgForSystem::NetworkIssue(index) => {
306 self.handle_peer_network_issue(index).await?;
307 }
308 MsgForSystem::Unreachable(index) => {
309 self.handle_peer_unreachable(index).await?;
310 }
311 }
312
313 let _ = self
316 .peer_snapshots_sender
317 .send(self.observe_peers().collect());
318
319 Ok(())
320 }
321
322 async fn handle_peer_network_issue(&mut self, index: PeerId) -> std::io::Result<()> {
323 self.controller.peer_remove(index);
324
325 let state = self.peers.remove(&index).unwrap();
327 let spawner_id = state.spawner_id;
328 let peer_id = state.peer_id;
329 let opt_spawner = self.spawners.iter().find(|s| s.id == spawner_id);
330 if let Some(spawner) = opt_spawner {
331 spawner
332 .notify_tx
333 .send(SystemEvent::peer_removed(
334 peer_id,
335 PeerRemovalReason::NetworkIssue,
336 ))
337 .await
338 .expect("Could not notify spawner");
339 }
340
341 Ok(())
342 }
343
344 async fn handle_peer_unreachable(&mut self, index: PeerId) -> std::io::Result<()> {
345 self.controller.peer_remove(index);
346
347 let state = self.peers.remove(&index).unwrap();
349 let spawner_id = state.spawner_id;
350 let peer_id = state.peer_id;
351 let opt_spawner = self.spawners.iter().find(|s| s.id == spawner_id);
352 if let Some(spawner) = opt_spawner {
353 spawner
354 .notify_tx
355 .send(SystemEvent::peer_removed(
356 peer_id,
357 PeerRemovalReason::Unreachable,
358 ))
359 .await
360 .expect("Could not notify spawner");
361 }
362
363 Ok(())
364 }
365
366 fn handle_peer_snapshot(&mut self, index: PeerId, snapshot: PeerSnapshot) {
367 self.controller.peer_update(
368 index,
369 snapshot
370 .accept_synchronization(self.config.system.local_stratum)
371 .is_ok(),
372 );
373 self.peers.get_mut(&index).unwrap().snapshot = Some(snapshot);
374 }
375
376 fn handle_peer_measurement(
377 &mut self,
378 index: PeerId,
379 snapshot: PeerSnapshot,
380 measurement: ntp_proto::Measurement,
381 wait: &mut Pin<&mut SingleshotSleep<T>>,
382 ) {
383 self.handle_peer_snapshot(index, snapshot);
384 let update = self.controller.peer_measurement(index, measurement);
386 self.handle_algorithm_state_update(update, wait);
387 }
388
389 fn handle_algorithm_state_update(
390 &mut self,
391 update: ntp_proto::StateUpdate<PeerId>,
392 wait: &mut Pin<&mut SingleshotSleep<T>>,
393 ) {
394 if let Some(ref used_peers) = update.used_peers {
395 self.system.update_used_peers(used_peers.iter().map(|v| {
396 self.peers.get(v).and_then(|data| data.snapshot).expect(
397 "Critical error: Peer used for synchronization that is not known to system",
398 )
399 }));
400 }
401 if let Some(time_snapshot) = update.time_snapshot {
402 self.system
403 .update_timedata(time_snapshot, &self.config.system);
404 }
405 if let Some(timestamp) = update.next_update {
406 let duration = timestamp - self.clock.now().expect("Could not get current time");
407 let duration =
408 std::time::Duration::from_secs_f64(duration.max(NtpDuration::ZERO).to_seconds());
409 wait.as_mut().reset(tokio::time::Instant::now() + duration);
410 }
411 if update.used_peers.is_some() || update.time_snapshot.is_some() {
412 let _ = self.system_snapshot_sender.send(self.system);
414 }
415 }
416
417 async fn handle_peer_demobilize(&mut self, index: PeerId) {
418 self.controller.peer_remove(index);
419 let state = self.peers.remove(&index).unwrap();
420
421 let spawner_id = state.spawner_id;
423 let peer_id = state.peer_id;
424 let opt_spawner = self.spawners.iter().find(|s| s.id == spawner_id);
425 if let Some(spawner) = opt_spawner {
426 spawner
427 .notify_tx
428 .send(SystemEvent::peer_removed(
429 peer_id,
430 PeerRemovalReason::Demobilized,
431 ))
432 .await
433 .expect("Could not notify spawner");
434 }
435 }
436
437 async fn create_peer(
438 &mut self,
439 spawner_id: SpawnerId,
440 mut params: PeerCreateParameters,
441 ) -> PeerId {
442 let peer_id = params.id;
443 info!(peer_id=?peer_id, addr=?params.addr, spawner=?spawner_id, "new peer");
444 self.peers.insert(
445 peer_id,
446 PeerState {
447 snapshot: None,
448 peer_address: params.normalized_addr.clone(),
449 peer_id,
450 spawner_id,
451 },
452 );
453 self.controller.peer_add(peer_id);
454
455 PeerTask::spawn(
456 peer_id,
457 params.addr,
458 self.interface,
459 self.clock.clone(),
460 self.enable_timestamps,
461 NETWORK_WAIT_PERIOD,
462 self.peer_channels.clone(),
463 params.nts.take(),
464 );
465
466 let _ = self
468 .peer_snapshots_sender
469 .send(self.observe_peers().collect());
470
471 if let Some(s) = self.spawners.iter().find(|s| s.id == spawner_id) {
475 let _ = s.notify_tx.send(SystemEvent::PeerRegistered(params)).await;
476 }
477
478 peer_id
479 }
480
481 async fn handle_spawn_event(&mut self, event: SpawnEvent) {
482 match event.action {
483 SpawnAction::Create(params) => {
484 self.create_peer(event.id, params).await;
485 }
486 }
487 }
488
489 async fn add_server(&mut self, config: ServerConfig) {
490 let stats = ServerStats::default();
491 self.servers.push(ServerData {
492 stats: stats.clone(),
493 config: config.clone(),
494 });
495 ServerTask::spawn(
496 config,
497 stats,
498 self.peer_channels.system_snapshot_receiver.clone(),
499 self.keyset.clone(),
500 self.clock.clone(),
501 self.interface,
502 NETWORK_WAIT_PERIOD,
503 );
504 let _ = self.server_data_sender.send(self.servers.clone());
505 }
506
507 fn observe_peers(&self) -> impl Iterator<Item = ObservablePeerState> + '_ {
508 self.peers.iter().map(|(index, data)| {
509 data.snapshot
510 .map(|snapshot| {
511 if let Some(timedata) = self.controller.peer_snapshot(*index) {
512 ObservablePeerState::Observable {
513 timedata,
514 reachability: snapshot.reach,
515 poll_interval: snapshot.poll_interval,
516 peer_id: snapshot.peer_id,
517 address: data.peer_address.to_string(),
518 }
519 } else {
520 ObservablePeerState::Nothing
521 }
522 })
523 .unwrap_or(ObservablePeerState::Nothing)
524 })
525 }
526}
527
528#[derive(Debug)]
529struct PeerState {
530 snapshot: Option<PeerSnapshot>,
531 peer_address: NormalizedAddress,
532 spawner_id: SpawnerId,
533 peer_id: PeerId,
534}
535
536#[derive(Debug, Clone)]
537pub struct ServerData {
538 pub stats: ServerStats,
539 pub config: ServerConfig,
540}
541
542#[cfg(test)]
543mod tests {
544 use ntp_proto::{
545 peer_snapshot, KeySetProvider, Measurement, NtpDuration, NtpInstant, NtpLeapIndicator,
546 NtpTimestamp, PollInterval,
547 };
548
549 use crate::spawn::dummy::DummySpawner;
550
551 use super::*;
552
553 #[derive(Debug, Clone, Default)]
554 struct TestClock {}
555
556 impl NtpClock for TestClock {
557 type Error = std::io::Error;
558
559 fn now(&self) -> std::result::Result<NtpTimestamp, Self::Error> {
560 Ok(NtpTimestamp::default())
562 }
563
564 fn set_frequency(&self, _freq: f64) -> Result<NtpTimestamp, Self::Error> {
565 Ok(NtpTimestamp::default())
566 }
567
568 fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
569 Ok(NtpTimestamp::default())
570 }
571
572 fn enable_ntp_algorithm(&self) -> Result<(), Self::Error> {
573 Ok(())
574 }
575
576 fn disable_ntp_algorithm(&self) -> Result<(), Self::Error> {
577 Ok(())
578 }
579
580 fn ntp_algorithm_update(
581 &self,
582 _offset: NtpDuration,
583 _poll_interval: PollInterval,
584 ) -> Result<(), Self::Error> {
585 Ok(())
586 }
587
588 fn error_estimate_update(
589 &self,
590 _est_error: NtpDuration,
591 _max_error: NtpDuration,
592 ) -> Result<(), Self::Error> {
593 Ok(())
594 }
595
596 fn status_update(&self, _leap_status: NtpLeapIndicator) -> Result<(), Self::Error> {
597 Ok(())
598 }
599 }
600
601 #[tokio::test]
602 async fn test_peers() {
603 let (_, keyset) = tokio::sync::watch::channel(KeySetProvider::new(1).get());
605
606 let (mut system, _) = System::new(
607 TestClock {},
608 InterfaceName::DEFAULT,
609 EnableTimestamps::default(),
610 CombinedSystemConfig::default(),
611 keyset,
612 );
613 let wait =
614 SingleshotSleep::new_disabled(tokio::time::sleep(std::time::Duration::from_secs(0)));
615 tokio::pin!(wait);
616
617 let id = system.add_spawner(DummySpawner::empty());
618
619 let mut indices = vec![];
620
621 for i in 0..4 {
622 indices.push(
623 system
624 .create_peer(
625 id,
626 PeerCreateParameters::from_new_ip_and_port(format!("127.0.0.{i}"), 123),
627 )
628 .await,
629 );
630 }
631
632 let base = NtpInstant::now();
633 assert_eq!(
634 system
635 .peers
636 .values()
637 .map(|v| match v.snapshot {
638 Some(_) => 1,
639 None => 0,
640 })
641 .sum::<i32>(),
642 0
643 );
644
645 system
646 .handle_peer_update(
647 MsgForSystem::NewMeasurement(
648 indices[0],
649 peer_snapshot(),
650 Measurement {
651 delay: NtpDuration::from_seconds(0.1),
652 offset: NtpDuration::from_seconds(0.),
653 transmit_timestamp: NtpTimestamp::default(),
654 receive_timestamp: NtpTimestamp::default(),
655 localtime: NtpTimestamp::from_seconds_nanos_since_ntp_era(0, 0),
656 monotime: base,
657
658 stratum: 0,
659 root_delay: NtpDuration::default(),
660 root_dispersion: NtpDuration::default(),
661 leap: NtpLeapIndicator::NoWarning,
662 precision: 0,
663 },
664 ),
665 &mut wait,
666 )
667 .await
668 .unwrap();
669 assert_eq!(
670 system
671 .peers
672 .values()
673 .map(|v| match v.snapshot {
674 Some(_) => 1,
675 None => 0,
676 })
677 .sum::<i32>(),
678 1
679 );
680
681 system
682 .handle_peer_update(
683 MsgForSystem::NewMeasurement(
684 indices[0],
685 peer_snapshot(),
686 Measurement {
687 delay: NtpDuration::from_seconds(0.1),
688 offset: NtpDuration::from_seconds(0.),
689 transmit_timestamp: NtpTimestamp::default(),
690 receive_timestamp: NtpTimestamp::default(),
691 localtime: NtpTimestamp::from_seconds_nanos_since_ntp_era(0, 0),
692 monotime: base,
693
694 stratum: 0,
695 root_delay: NtpDuration::default(),
696 root_dispersion: NtpDuration::default(),
697 leap: NtpLeapIndicator::NoWarning,
698 precision: 0,
699 },
700 ),
701 &mut wait,
702 )
703 .await
704 .unwrap();
705 assert_eq!(
706 system
707 .peers
708 .values()
709 .map(|v| match v.snapshot {
710 Some(_) => 1,
711 None => 0,
712 })
713 .sum::<i32>(),
714 1
715 );
716
717 system
718 .handle_peer_update(
719 MsgForSystem::UpdatedSnapshot(indices[1], peer_snapshot()),
720 &mut wait,
721 )
722 .await
723 .unwrap();
724 assert_eq!(
725 system
726 .peers
727 .values()
728 .map(|v| match v.snapshot {
729 Some(_) => 1,
730 None => 0,
731 })
732 .sum::<i32>(),
733 2
734 );
735
736 system
737 .handle_peer_update(MsgForSystem::MustDemobilize(indices[1]), &mut wait)
738 .await
739 .unwrap();
740 assert_eq!(
741 system
742 .peers
743 .values()
744 .map(|v| match v.snapshot {
745 Some(_) => 1,
746 None => 0,
747 })
748 .sum::<i32>(),
749 1
750 );
751 }
752}