ntp_daemon/
system.rs

1use crate::{
2    config::{ClockConfig, CombinedSystemConfig, NormalizedAddress, PeerConfig, ServerConfig},
3    peer::{MsgForSystem, PeerChannels},
4    peer::{PeerTask, Wait},
5    server::{ServerStats, ServerTask},
6    spawn::{
7        nts::NtsSpawner, pool::PoolSpawner, standard::StandardSpawner, PeerCreateParameters,
8        PeerId, PeerRemovalReason, SpawnAction, SpawnEvent, Spawner, SpawnerId, SystemEvent,
9    },
10    ObservablePeerState,
11};
12
13use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin, sync::Arc};
14
15use ntp_proto::{
16    DefaultTimeSyncController, KeySet, NtpClock, NtpDuration, PeerSnapshot, SystemSnapshot,
17    TimeSyncController,
18};
19use ntp_udp::{EnableTimestamps, InterfaceName};
20use tokio::{sync::mpsc, task::JoinHandle};
21use tracing::info;
22
23pub const NETWORK_WAIT_PERIOD: std::time::Duration = std::time::Duration::from_secs(1);
24
25pub const MESSAGE_BUFFER_SIZE: usize = 32;
26
27struct SingleshotSleep<T> {
28    enabled: bool,
29    sleep: Pin<Box<T>>,
30}
31
32impl<T: Wait> SingleshotSleep<T> {
33    fn new_disabled(t: T) -> Self {
34        SingleshotSleep {
35            enabled: false,
36            sleep: Box::pin(t),
37        }
38    }
39}
40
41impl<T: Wait> Future for SingleshotSleep<T> {
42    type Output = ();
43
44    fn poll(
45        self: Pin<&mut Self>,
46        cx: &mut std::task::Context<'_>,
47    ) -> std::task::Poll<Self::Output> {
48        let this = self.get_mut();
49        if !this.enabled {
50            return std::task::Poll::Pending;
51        }
52        match this.sleep.as_mut().poll(cx) {
53            std::task::Poll::Ready(v) => {
54                this.enabled = false;
55                std::task::Poll::Ready(v)
56            }
57            u => u,
58        }
59    }
60}
61
62impl<T: Wait> Wait for SingleshotSleep<T> {
63    fn reset(self: Pin<&mut Self>, deadline: tokio::time::Instant) {
64        let this = self.get_mut();
65        this.enabled = true;
66        this.sleep.as_mut().reset(deadline);
67    }
68}
69
70pub struct DaemonChannels {
71    pub config_receiver: tokio::sync::watch::Receiver<CombinedSystemConfig>,
72    pub config_sender: tokio::sync::watch::Sender<CombinedSystemConfig>,
73    pub peer_snapshots_receiver: tokio::sync::watch::Receiver<Vec<ObservablePeerState>>,
74    pub server_data_receiver: tokio::sync::watch::Receiver<Vec<ServerData>>,
75    pub system_snapshot_receiver: tokio::sync::watch::Receiver<SystemSnapshot>,
76    pub keyset: tokio::sync::watch::Receiver<Arc<KeySet>>,
77}
78
79/// Spawn the NTP daemon
80pub async fn spawn(
81    system_config: CombinedSystemConfig,
82    clock_config: ClockConfig,
83    peer_configs: &[PeerConfig],
84    server_configs: &[ServerConfig],
85    keyset: tokio::sync::watch::Receiver<Arc<KeySet>>,
86) -> std::io::Result<(JoinHandle<std::io::Result<()>>, DaemonChannels)> {
87    let (mut system, channels) = System::new(
88        clock_config.clock,
89        clock_config.interface,
90        clock_config.enable_timestamps,
91        system_config,
92        keyset,
93    );
94
95    for peer_config in peer_configs {
96        match peer_config {
97            PeerConfig::Standard(cfg) => {
98                system.add_spawner(StandardSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD));
99            }
100            PeerConfig::Nts(cfg) => {
101                system.add_spawner(NtsSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD));
102            }
103            PeerConfig::Pool(cfg) => {
104                system.add_spawner(PoolSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD));
105            }
106        }
107    }
108
109    for server_config in server_configs.iter() {
110        system.add_server(server_config.to_owned()).await;
111    }
112
113    let handle = tokio::spawn(async move {
114        let sleep =
115            SingleshotSleep::new_disabled(tokio::time::sleep_until(tokio::time::Instant::now()));
116        tokio::pin!(sleep);
117        system.run(sleep).await
118    });
119
120    Ok((handle, channels))
121}
122
123struct SystemSpawnerData {
124    id: SpawnerId,
125    notify_tx: mpsc::Sender<SystemEvent>,
126}
127
128struct System<C: NtpClock, T: Wait> {
129    _wait: PhantomData<SingleshotSleep<T>>,
130    config: CombinedSystemConfig,
131    system: SystemSnapshot,
132
133    config_receiver: tokio::sync::watch::Receiver<CombinedSystemConfig>,
134    system_snapshot_sender: tokio::sync::watch::Sender<SystemSnapshot>,
135    peer_snapshots_sender: tokio::sync::watch::Sender<Vec<ObservablePeerState>>,
136    server_data_sender: tokio::sync::watch::Sender<Vec<ServerData>>,
137    keyset: tokio::sync::watch::Receiver<Arc<KeySet>>,
138
139    msg_for_system_rx: mpsc::Receiver<MsgForSystem>,
140    spawn_tx: mpsc::Sender<SpawnEvent>,
141    spawn_rx: mpsc::Receiver<SpawnEvent>,
142
143    peers: HashMap<PeerId, PeerState>,
144    servers: Vec<ServerData>,
145    spawners: Vec<SystemSpawnerData>,
146
147    peer_channels: PeerChannels,
148
149    clock: C,
150    controller: DefaultTimeSyncController<C, PeerId>,
151
152    // which timestamps to use (this is a hint, OS or hardware may ignore)
153    enable_timestamps: EnableTimestamps,
154
155    // bind the socket to a specific interface. This is relevant for hardware timestamping,
156    // because the interface determines which clock is used to produce the timestamps.
157    interface: Option<InterfaceName>,
158}
159
160impl<C: NtpClock, T: Wait> System<C, T> {
161    fn new(
162        clock: C,
163        interface: Option<InterfaceName>,
164        enable_timestamps: EnableTimestamps,
165        config: CombinedSystemConfig,
166        keyset: tokio::sync::watch::Receiver<Arc<KeySet>>,
167    ) -> (Self, DaemonChannels) {
168        // Setup system snapshot
169        let system = SystemSnapshot {
170            stratum: config.system.local_stratum,
171            ..Default::default()
172        };
173
174        // Create communication channels
175        let (config_sender, config_receiver) = tokio::sync::watch::channel(config);
176        let (system_snapshot_sender, system_snapshot_receiver) =
177            tokio::sync::watch::channel(system);
178        let (peer_snapshots_sender, peer_snapshots_receiver) = tokio::sync::watch::channel(vec![]);
179        let (server_data_sender, server_data_receiver) = tokio::sync::watch::channel(vec![]);
180        let (msg_for_system_sender, msg_for_system_receiver) =
181            tokio::sync::mpsc::channel(MESSAGE_BUFFER_SIZE);
182        let (spawn_tx, spawn_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE);
183
184        // Build System and its channels
185        (
186            System {
187                _wait: PhantomData,
188                config,
189                system,
190
191                config_receiver: config_receiver.clone(),
192                system_snapshot_sender,
193                peer_snapshots_sender,
194                server_data_sender,
195                keyset: keyset.clone(),
196
197                msg_for_system_rx: msg_for_system_receiver,
198                spawn_rx,
199                spawn_tx,
200
201                peers: Default::default(),
202                servers: Default::default(),
203                spawners: Default::default(),
204                peer_channels: PeerChannels {
205                    msg_for_system_sender,
206                    system_snapshot_receiver: system_snapshot_receiver.clone(),
207                    system_config_receiver: config_receiver.clone(),
208                },
209                clock: clock.clone(),
210                controller: DefaultTimeSyncController::new(clock, config.system, config.algorithm),
211                enable_timestamps,
212                interface,
213            },
214            DaemonChannels {
215                config_receiver,
216                config_sender,
217                peer_snapshots_receiver,
218                server_data_receiver,
219                system_snapshot_receiver,
220                keyset,
221            },
222        )
223    }
224
225    fn add_spawner(&mut self, spawner: impl Spawner + Send + Sync + 'static) -> SpawnerId {
226        let (notify_tx, notify_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE);
227        let id = spawner.get_id();
228        let spawner_data = SystemSpawnerData { id, notify_tx };
229        info!(id=?spawner_data.id, ty=spawner.get_description(), addr=spawner.get_addr_description(), "Running spawner");
230        self.spawners.push(spawner_data);
231        let spawn_tx = self.spawn_tx.clone();
232        tokio::spawn(async move { spawner.run(spawn_tx, notify_rx).await });
233        id
234    }
235
236    async fn run(&mut self, mut wait: Pin<&mut SingleshotSleep<T>>) -> std::io::Result<()> {
237        loop {
238            tokio::select! {
239                opt_msg_for_system = self.msg_for_system_rx.recv() => {
240                    match opt_msg_for_system {
241                        None => {
242                            // the channel closed and has no more messages in it
243                            break
244                        }
245                        Some(msg_for_system) => {
246                            self.handle_peer_update(msg_for_system, &mut wait)
247                                .await?;
248                        }
249                    }
250                }
251                opt_spawn_event = self.spawn_rx.recv() => {
252                    match opt_spawn_event {
253                        None => {
254                            tracing::warn!("the spawn channel closed unexpectedly");
255                        }
256                        Some(spawn_event) => {
257                            self.handle_spawn_event(spawn_event).await;
258                        }
259                    }
260                }
261                () = &mut wait => {
262                    self.handle_timer(&mut wait);
263                }
264                _ = self.config_receiver.changed(), if self.config_receiver.has_changed().is_ok() => {
265                    self.handle_config_update();
266                }
267            }
268        }
269
270        // the channel closed and has no more messages in it
271        Ok(())
272    }
273
274    fn handle_config_update(&mut self) {
275        let config = *self.config_receiver.borrow_and_update();
276        self.controller
277            .update_config(config.system, config.algorithm);
278        self.config = config;
279    }
280
281    fn handle_timer(&mut self, wait: &mut Pin<&mut SingleshotSleep<T>>) {
282        tracing::debug!("Timer expired");
283        // note: local needed for borrow checker
284        let update = self.controller.time_update();
285        self.handle_algorithm_state_update(update, wait);
286    }
287
288    async fn handle_peer_update(
289        &mut self,
290        msg: MsgForSystem,
291        wait: &mut Pin<&mut SingleshotSleep<T>>,
292    ) -> std::io::Result<()> {
293        tracing::debug!(?msg, "updating peer");
294
295        match msg {
296            MsgForSystem::MustDemobilize(index) => {
297                self.handle_peer_demobilize(index).await;
298            }
299            MsgForSystem::NewMeasurement(index, snapshot, measurement) => {
300                self.handle_peer_measurement(index, snapshot, measurement, wait);
301            }
302            MsgForSystem::UpdatedSnapshot(index, snapshot) => {
303                self.handle_peer_snapshot(index, snapshot);
304            }
305            MsgForSystem::NetworkIssue(index) => {
306                self.handle_peer_network_issue(index).await?;
307            }
308            MsgForSystem::Unreachable(index) => {
309                self.handle_peer_unreachable(index).await?;
310            }
311        }
312
313        // Don't care if there is no receiver for peer snapshots (which might happen if
314        // we don't enable observing in the configuration)
315        let _ = self
316            .peer_snapshots_sender
317            .send(self.observe_peers().collect());
318
319        Ok(())
320    }
321
322    async fn handle_peer_network_issue(&mut self, index: PeerId) -> std::io::Result<()> {
323        self.controller.peer_remove(index);
324
325        // Restart the peer reusing its configuration.
326        let state = self.peers.remove(&index).unwrap();
327        let spawner_id = state.spawner_id;
328        let peer_id = state.peer_id;
329        let opt_spawner = self.spawners.iter().find(|s| s.id == spawner_id);
330        if let Some(spawner) = opt_spawner {
331            spawner
332                .notify_tx
333                .send(SystemEvent::peer_removed(
334                    peer_id,
335                    PeerRemovalReason::NetworkIssue,
336                ))
337                .await
338                .expect("Could not notify spawner");
339        }
340
341        Ok(())
342    }
343
344    async fn handle_peer_unreachable(&mut self, index: PeerId) -> std::io::Result<()> {
345        self.controller.peer_remove(index);
346
347        // Restart the peer reusing its configuration.
348        let state = self.peers.remove(&index).unwrap();
349        let spawner_id = state.spawner_id;
350        let peer_id = state.peer_id;
351        let opt_spawner = self.spawners.iter().find(|s| s.id == spawner_id);
352        if let Some(spawner) = opt_spawner {
353            spawner
354                .notify_tx
355                .send(SystemEvent::peer_removed(
356                    peer_id,
357                    PeerRemovalReason::Unreachable,
358                ))
359                .await
360                .expect("Could not notify spawner");
361        }
362
363        Ok(())
364    }
365
366    fn handle_peer_snapshot(&mut self, index: PeerId, snapshot: PeerSnapshot) {
367        self.controller.peer_update(
368            index,
369            snapshot
370                .accept_synchronization(self.config.system.local_stratum)
371                .is_ok(),
372        );
373        self.peers.get_mut(&index).unwrap().snapshot = Some(snapshot);
374    }
375
376    fn handle_peer_measurement(
377        &mut self,
378        index: PeerId,
379        snapshot: PeerSnapshot,
380        measurement: ntp_proto::Measurement,
381        wait: &mut Pin<&mut SingleshotSleep<T>>,
382    ) {
383        self.handle_peer_snapshot(index, snapshot);
384        // note: local needed for borrow checker
385        let update = self.controller.peer_measurement(index, measurement);
386        self.handle_algorithm_state_update(update, wait);
387    }
388
389    fn handle_algorithm_state_update(
390        &mut self,
391        update: ntp_proto::StateUpdate<PeerId>,
392        wait: &mut Pin<&mut SingleshotSleep<T>>,
393    ) {
394        if let Some(ref used_peers) = update.used_peers {
395            self.system.update_used_peers(used_peers.iter().map(|v| {
396                self.peers.get(v).and_then(|data| data.snapshot).expect(
397                    "Critical error: Peer used for synchronization that is not known to system",
398                )
399            }));
400        }
401        if let Some(time_snapshot) = update.time_snapshot {
402            self.system
403                .update_timedata(time_snapshot, &self.config.system);
404        }
405        if let Some(timestamp) = update.next_update {
406            let duration = timestamp - self.clock.now().expect("Could not get current time");
407            let duration =
408                std::time::Duration::from_secs_f64(duration.max(NtpDuration::ZERO).to_seconds());
409            wait.as_mut().reset(tokio::time::Instant::now() + duration);
410        }
411        if update.used_peers.is_some() || update.time_snapshot.is_some() {
412            // Don't care if there is no receiver.
413            let _ = self.system_snapshot_sender.send(self.system);
414        }
415    }
416
417    async fn handle_peer_demobilize(&mut self, index: PeerId) {
418        self.controller.peer_remove(index);
419        let state = self.peers.remove(&index).unwrap();
420
421        // Restart the peer reusing its configuration.
422        let spawner_id = state.spawner_id;
423        let peer_id = state.peer_id;
424        let opt_spawner = self.spawners.iter().find(|s| s.id == spawner_id);
425        if let Some(spawner) = opt_spawner {
426            spawner
427                .notify_tx
428                .send(SystemEvent::peer_removed(
429                    peer_id,
430                    PeerRemovalReason::Demobilized,
431                ))
432                .await
433                .expect("Could not notify spawner");
434        }
435    }
436
437    async fn create_peer(
438        &mut self,
439        spawner_id: SpawnerId,
440        mut params: PeerCreateParameters,
441    ) -> PeerId {
442        let peer_id = params.id;
443        info!(peer_id=?peer_id, addr=?params.addr, spawner=?spawner_id, "new peer");
444        self.peers.insert(
445            peer_id,
446            PeerState {
447                snapshot: None,
448                peer_address: params.normalized_addr.clone(),
449                peer_id,
450                spawner_id,
451            },
452        );
453        self.controller.peer_add(peer_id);
454
455        PeerTask::spawn(
456            peer_id,
457            params.addr,
458            self.interface,
459            self.clock.clone(),
460            self.enable_timestamps,
461            NETWORK_WAIT_PERIOD,
462            self.peer_channels.clone(),
463            params.nts.take(),
464        );
465
466        // Don't care if there is no receiver
467        let _ = self
468            .peer_snapshots_sender
469            .send(self.observe_peers().collect());
470
471        // Try and find a related spawner and notify that spawner.
472        // This makes sure that the spawner that initially sent the create event
473        // is now aware that the peer was added to the system.
474        if let Some(s) = self.spawners.iter().find(|s| s.id == spawner_id) {
475            let _ = s.notify_tx.send(SystemEvent::PeerRegistered(params)).await;
476        }
477
478        peer_id
479    }
480
481    async fn handle_spawn_event(&mut self, event: SpawnEvent) {
482        match event.action {
483            SpawnAction::Create(params) => {
484                self.create_peer(event.id, params).await;
485            }
486        }
487    }
488
489    async fn add_server(&mut self, config: ServerConfig) {
490        let stats = ServerStats::default();
491        self.servers.push(ServerData {
492            stats: stats.clone(),
493            config: config.clone(),
494        });
495        ServerTask::spawn(
496            config,
497            stats,
498            self.peer_channels.system_snapshot_receiver.clone(),
499            self.keyset.clone(),
500            self.clock.clone(),
501            self.interface,
502            NETWORK_WAIT_PERIOD,
503        );
504        let _ = self.server_data_sender.send(self.servers.clone());
505    }
506
507    fn observe_peers(&self) -> impl Iterator<Item = ObservablePeerState> + '_ {
508        self.peers.iter().map(|(index, data)| {
509            data.snapshot
510                .map(|snapshot| {
511                    if let Some(timedata) = self.controller.peer_snapshot(*index) {
512                        ObservablePeerState::Observable {
513                            timedata,
514                            reachability: snapshot.reach,
515                            poll_interval: snapshot.poll_interval,
516                            peer_id: snapshot.peer_id,
517                            address: data.peer_address.to_string(),
518                        }
519                    } else {
520                        ObservablePeerState::Nothing
521                    }
522                })
523                .unwrap_or(ObservablePeerState::Nothing)
524        })
525    }
526}
527
528#[derive(Debug)]
529struct PeerState {
530    snapshot: Option<PeerSnapshot>,
531    peer_address: NormalizedAddress,
532    spawner_id: SpawnerId,
533    peer_id: PeerId,
534}
535
536#[derive(Debug, Clone)]
537pub struct ServerData {
538    pub stats: ServerStats,
539    pub config: ServerConfig,
540}
541
542#[cfg(test)]
543mod tests {
544    use ntp_proto::{
545        peer_snapshot, KeySetProvider, Measurement, NtpDuration, NtpInstant, NtpLeapIndicator,
546        NtpTimestamp, PollInterval,
547    };
548
549    use crate::spawn::dummy::DummySpawner;
550
551    use super::*;
552
553    #[derive(Debug, Clone, Default)]
554    struct TestClock {}
555
556    impl NtpClock for TestClock {
557        type Error = std::io::Error;
558
559        fn now(&self) -> std::result::Result<NtpTimestamp, Self::Error> {
560            // Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
561            Ok(NtpTimestamp::default())
562        }
563
564        fn set_frequency(&self, _freq: f64) -> Result<NtpTimestamp, Self::Error> {
565            Ok(NtpTimestamp::default())
566        }
567
568        fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
569            Ok(NtpTimestamp::default())
570        }
571
572        fn enable_ntp_algorithm(&self) -> Result<(), Self::Error> {
573            Ok(())
574        }
575
576        fn disable_ntp_algorithm(&self) -> Result<(), Self::Error> {
577            Ok(())
578        }
579
580        fn ntp_algorithm_update(
581            &self,
582            _offset: NtpDuration,
583            _poll_interval: PollInterval,
584        ) -> Result<(), Self::Error> {
585            Ok(())
586        }
587
588        fn error_estimate_update(
589            &self,
590            _est_error: NtpDuration,
591            _max_error: NtpDuration,
592        ) -> Result<(), Self::Error> {
593            Ok(())
594        }
595
596        fn status_update(&self, _leap_status: NtpLeapIndicator) -> Result<(), Self::Error> {
597            Ok(())
598        }
599    }
600
601    #[tokio::test]
602    async fn test_peers() {
603        // we always generate the keyset (even if NTS is not used)
604        let (_, keyset) = tokio::sync::watch::channel(KeySetProvider::new(1).get());
605
606        let (mut system, _) = System::new(
607            TestClock {},
608            InterfaceName::DEFAULT,
609            EnableTimestamps::default(),
610            CombinedSystemConfig::default(),
611            keyset,
612        );
613        let wait =
614            SingleshotSleep::new_disabled(tokio::time::sleep(std::time::Duration::from_secs(0)));
615        tokio::pin!(wait);
616
617        let id = system.add_spawner(DummySpawner::empty());
618
619        let mut indices = vec![];
620
621        for i in 0..4 {
622            indices.push(
623                system
624                    .create_peer(
625                        id,
626                        PeerCreateParameters::from_new_ip_and_port(format!("127.0.0.{i}"), 123),
627                    )
628                    .await,
629            );
630        }
631
632        let base = NtpInstant::now();
633        assert_eq!(
634            system
635                .peers
636                .values()
637                .map(|v| match v.snapshot {
638                    Some(_) => 1,
639                    None => 0,
640                })
641                .sum::<i32>(),
642            0
643        );
644
645        system
646            .handle_peer_update(
647                MsgForSystem::NewMeasurement(
648                    indices[0],
649                    peer_snapshot(),
650                    Measurement {
651                        delay: NtpDuration::from_seconds(0.1),
652                        offset: NtpDuration::from_seconds(0.),
653                        transmit_timestamp: NtpTimestamp::default(),
654                        receive_timestamp: NtpTimestamp::default(),
655                        localtime: NtpTimestamp::from_seconds_nanos_since_ntp_era(0, 0),
656                        monotime: base,
657
658                        stratum: 0,
659                        root_delay: NtpDuration::default(),
660                        root_dispersion: NtpDuration::default(),
661                        leap: NtpLeapIndicator::NoWarning,
662                        precision: 0,
663                    },
664                ),
665                &mut wait,
666            )
667            .await
668            .unwrap();
669        assert_eq!(
670            system
671                .peers
672                .values()
673                .map(|v| match v.snapshot {
674                    Some(_) => 1,
675                    None => 0,
676                })
677                .sum::<i32>(),
678            1
679        );
680
681        system
682            .handle_peer_update(
683                MsgForSystem::NewMeasurement(
684                    indices[0],
685                    peer_snapshot(),
686                    Measurement {
687                        delay: NtpDuration::from_seconds(0.1),
688                        offset: NtpDuration::from_seconds(0.),
689                        transmit_timestamp: NtpTimestamp::default(),
690                        receive_timestamp: NtpTimestamp::default(),
691                        localtime: NtpTimestamp::from_seconds_nanos_since_ntp_era(0, 0),
692                        monotime: base,
693
694                        stratum: 0,
695                        root_delay: NtpDuration::default(),
696                        root_dispersion: NtpDuration::default(),
697                        leap: NtpLeapIndicator::NoWarning,
698                        precision: 0,
699                    },
700                ),
701                &mut wait,
702            )
703            .await
704            .unwrap();
705        assert_eq!(
706            system
707                .peers
708                .values()
709                .map(|v| match v.snapshot {
710                    Some(_) => 1,
711                    None => 0,
712                })
713                .sum::<i32>(),
714            1
715        );
716
717        system
718            .handle_peer_update(
719                MsgForSystem::UpdatedSnapshot(indices[1], peer_snapshot()),
720                &mut wait,
721            )
722            .await
723            .unwrap();
724        assert_eq!(
725            system
726                .peers
727                .values()
728                .map(|v| match v.snapshot {
729                    Some(_) => 1,
730                    None => 0,
731                })
732                .sum::<i32>(),
733            2
734        );
735
736        system
737            .handle_peer_update(MsgForSystem::MustDemobilize(indices[1]), &mut wait)
738            .await
739            .unwrap();
740        assert_eq!(
741            system
742                .peers
743                .values()
744                .map(|v| match v.snapshot {
745                    Some(_) => 1,
746                    None => 0,
747                })
748                .sum::<i32>(),
749            1
750        );
751    }
752}