ntp_daemon/
observer.rs

1use crate::server::ServerStats;
2use crate::{sockets::create_unix_socket, system::ServerData};
3use ntp_proto::{ObservablePeerTimedata, PollInterval, Reach, ReferenceId, SystemSnapshot};
4use std::net::SocketAddr;
5use std::os::unix::fs::PermissionsExt;
6use tokio::task::JoinHandle;
7use tracing::warn;
8
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Serialize, Deserialize)]
12pub struct ObservableState {
13    pub system: SystemSnapshot,
14    pub peers: Vec<ObservablePeerState>,
15    pub servers: Vec<ObservableServerState>,
16}
17
18#[derive(Debug, Serialize, Deserialize)]
19pub struct ObservableServerState {
20    pub address: SocketAddr,
21    pub stats: ServerStats,
22}
23
24impl From<&ServerData> for ObservableServerState {
25    fn from(data: &ServerData) -> Self {
26        ObservableServerState {
27            address: data.config.addr,
28            stats: data.stats.clone(),
29        }
30    }
31}
32
33#[derive(Debug, Serialize, Deserialize, Clone)]
34pub enum ObservablePeerState {
35    Nothing,
36    Observable {
37        #[serde(flatten)]
38        timedata: ObservablePeerTimedata,
39        reachability: Reach,
40        poll_interval: PollInterval,
41        peer_id: ReferenceId,
42        address: String,
43    },
44}
45
46pub async fn spawn(
47    config: &crate::config::ObserveConfig,
48    peers_reader: tokio::sync::watch::Receiver<Vec<ObservablePeerState>>,
49    server_reader: tokio::sync::watch::Receiver<Vec<ServerData>>,
50    system_reader: tokio::sync::watch::Receiver<SystemSnapshot>,
51) -> JoinHandle<std::io::Result<()>> {
52    let config = config.clone();
53    tokio::spawn(async move {
54        let result = observer(config, peers_reader, server_reader, system_reader).await;
55        if let Err(ref e) = result {
56            warn!("Abnormal termination of the state observer: {}", e);
57            warn!("The state observer will not be available");
58        }
59        result
60    })
61}
62
63async fn observer(
64    config: crate::config::ObserveConfig,
65    peers_reader: tokio::sync::watch::Receiver<Vec<ObservablePeerState>>,
66    server_reader: tokio::sync::watch::Receiver<Vec<ServerData>>,
67    system_reader: tokio::sync::watch::Receiver<SystemSnapshot>,
68) -> std::io::Result<()> {
69    let path = match config.path {
70        Some(path) => path,
71        None => return Ok(()),
72    };
73
74    let peers_listener = create_unix_socket(&path)?;
75
76    // this binary needs to run as root to be able to adjust the system clock.
77    // by default, the socket inherits root permissions, but the client should not need
78    // elevated permissions to read from the socket. So we explicitly set the permissions
79    let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
80    std::fs::set_permissions(&path, permissions)?;
81
82    loop {
83        let (mut stream, _addr) = peers_listener.accept().await?;
84
85        let observe = ObservableState {
86            peers: peers_reader.borrow().to_owned(),
87            system: *system_reader.borrow(),
88            servers: server_reader.borrow().iter().map(|s| s.into()).collect(),
89        };
90
91        crate::sockets::write_json(&mut stream, &observe).await?;
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use std::{borrow::BorrowMut, time::Duration};
98
99    use ntp_proto::{
100        NtpClock, NtpDuration, NtpLeapIndicator, NtpTimestamp, PollInterval, PollIntervalLimits,
101        Reach, ReferenceId, TimeSnapshot,
102    };
103    use tokio::{io::AsyncReadExt, net::UnixStream};
104
105    use super::*;
106
107    #[derive(Debug, Clone, Default)]
108    struct TestClock {}
109
110    impl NtpClock for TestClock {
111        type Error = std::io::Error;
112
113        fn now(&self) -> std::result::Result<NtpTimestamp, Self::Error> {
114            Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
115        }
116
117        fn set_frequency(&self, _freq: f64) -> Result<NtpTimestamp, Self::Error> {
118            Ok(NtpTimestamp::default())
119        }
120
121        fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
122            Ok(NtpTimestamp::default())
123        }
124
125        fn enable_ntp_algorithm(&self) -> Result<(), Self::Error> {
126            Ok(())
127        }
128
129        fn disable_ntp_algorithm(&self) -> Result<(), Self::Error> {
130            Ok(())
131        }
132
133        fn ntp_algorithm_update(
134            &self,
135            _offset: NtpDuration,
136            _poll_interval: PollInterval,
137        ) -> Result<(), Self::Error> {
138            Ok(())
139        }
140
141        fn error_estimate_update(
142            &self,
143            _est_error: NtpDuration,
144            _max_error: NtpDuration,
145        ) -> Result<(), Self::Error> {
146            Ok(())
147        }
148
149        fn status_update(&self, _leap_status: NtpLeapIndicator) -> Result<(), Self::Error> {
150            Ok(())
151        }
152    }
153
154    #[tokio::test]
155    async fn test_observation() {
156        // be careful with copying: tests run concurrently and should use a unique socket name!
157        let path = std::env::temp_dir().join("ntp-test-stream-2");
158        let config = crate::config::ObserveConfig {
159            path: Some(path.clone()),
160            mode: 0o700,
161        };
162
163        let (_, peers_reader) = tokio::sync::watch::channel(vec![
164            ObservablePeerState::Nothing,
165            ObservablePeerState::Nothing,
166            ObservablePeerState::Observable {
167                timedata: Default::default(),
168                reachability: Reach::default(),
169                poll_interval: PollIntervalLimits::default().min,
170                peer_id: ReferenceId::from_ip("127.0.0.1".parse().unwrap()),
171                address: "127.0.0.3:123".into(),
172            },
173        ]);
174
175        let (_, servers_reader) = tokio::sync::watch::channel(vec![]);
176
177        let (_, system_reader) = tokio::sync::watch::channel(SystemSnapshot {
178            stratum: 1,
179            reference_id: ReferenceId::NONE,
180            accumulated_steps_threshold: None,
181            time_snapshot: TimeSnapshot {
182                poll_interval: PollIntervalLimits::default().min,
183                precision: NtpDuration::from_seconds(1e-3),
184                root_delay: NtpDuration::ZERO,
185                root_dispersion: NtpDuration::ZERO,
186                leap_indicator: NtpLeapIndicator::Leap59,
187                accumulated_steps: NtpDuration::ZERO,
188            },
189        });
190
191        let handle = tokio::spawn(async move {
192            observer(config, peers_reader, servers_reader, system_reader)
193                .await
194                .unwrap();
195        });
196
197        tokio::time::sleep(Duration::from_millis(10)).await;
198
199        let mut reader = UnixStream::connect(path).await.unwrap();
200
201        let mut buf = vec![];
202        while reader.read_buf(&mut buf).await.unwrap() != 0 {}
203        let result: ObservableState = serde_json::from_slice(&buf).unwrap();
204
205        // Deal with randomized order
206        let mut count = 0;
207        for peer in &result.peers {
208            if matches!(peer, ObservablePeerState::Observable { .. }) {
209                count += 1;
210            }
211        }
212        assert_eq!(count, 1);
213
214        handle.abort();
215    }
216
217    #[tokio::test]
218    async fn test_block_during_read() {
219        // be careful with copying: tests run concurrently and should use a unique socket name!
220        let path = std::env::temp_dir().join("ntp-test-stream-3");
221        let config = crate::config::ObserveConfig {
222            path: Some(path.clone()),
223            mode: 0o700,
224        };
225
226        let (mut peers_writer, peers_reader) = tokio::sync::watch::channel(vec![
227            ObservablePeerState::Nothing,
228            ObservablePeerState::Nothing,
229            ObservablePeerState::Observable {
230                timedata: Default::default(),
231                reachability: Reach::default(),
232                poll_interval: PollIntervalLimits::default().min,
233                peer_id: ReferenceId::from_ip("127.0.0.1".parse().unwrap()),
234                address: "127.0.0.3:123".into(),
235            },
236        ]);
237
238        let (mut server_writer, servers_reader) = tokio::sync::watch::channel(vec![]);
239
240        let (mut system_writer, system_reader) = tokio::sync::watch::channel(SystemSnapshot {
241            stratum: 1,
242            reference_id: ReferenceId::NONE,
243            accumulated_steps_threshold: None,
244            time_snapshot: TimeSnapshot {
245                poll_interval: PollIntervalLimits::default().min,
246                precision: NtpDuration::from_seconds(1e-3),
247                root_delay: NtpDuration::ZERO,
248                root_dispersion: NtpDuration::ZERO,
249                leap_indicator: NtpLeapIndicator::Leap59,
250                accumulated_steps: NtpDuration::ZERO,
251            },
252        });
253
254        let handle = tokio::spawn(async move {
255            observer(config, peers_reader, servers_reader, system_reader)
256                .await
257                .unwrap();
258        });
259
260        tokio::time::sleep(Duration::from_millis(10)).await;
261
262        let mut reader = UnixStream::connect(path).await.unwrap();
263
264        // We do a small partial read of the data to test that whatever
265        // happens, the observer doesnt keep a lock alive on either of
266        // of the RwLocks.
267        let mut buf = [0_u8; 12];
268        let mut bufref: &mut [u8] = &mut buf;
269        reader.read_buf(&mut bufref).await.unwrap();
270
271        // Ensure none of the locks is held long term
272        let _ = system_writer.borrow_mut();
273        let _ = peers_writer.borrow_mut();
274        let _ = server_writer.borrow_mut();
275
276        handle.abort();
277    }
278}