1use crate::server::ServerStats;
2use crate::{sockets::create_unix_socket, system::ServerData};
3use ntp_proto::{ObservablePeerTimedata, PollInterval, Reach, ReferenceId, SystemSnapshot};
4use std::net::SocketAddr;
5use std::os::unix::fs::PermissionsExt;
6use tokio::task::JoinHandle;
7use tracing::warn;
8
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Serialize, Deserialize)]
12pub struct ObservableState {
13 pub system: SystemSnapshot,
14 pub peers: Vec<ObservablePeerState>,
15 pub servers: Vec<ObservableServerState>,
16}
17
18#[derive(Debug, Serialize, Deserialize)]
19pub struct ObservableServerState {
20 pub address: SocketAddr,
21 pub stats: ServerStats,
22}
23
24impl From<&ServerData> for ObservableServerState {
25 fn from(data: &ServerData) -> Self {
26 ObservableServerState {
27 address: data.config.addr,
28 stats: data.stats.clone(),
29 }
30 }
31}
32
33#[derive(Debug, Serialize, Deserialize, Clone)]
34pub enum ObservablePeerState {
35 Nothing,
36 Observable {
37 #[serde(flatten)]
38 timedata: ObservablePeerTimedata,
39 reachability: Reach,
40 poll_interval: PollInterval,
41 peer_id: ReferenceId,
42 address: String,
43 },
44}
45
46pub async fn spawn(
47 config: &crate::config::ObserveConfig,
48 peers_reader: tokio::sync::watch::Receiver<Vec<ObservablePeerState>>,
49 server_reader: tokio::sync::watch::Receiver<Vec<ServerData>>,
50 system_reader: tokio::sync::watch::Receiver<SystemSnapshot>,
51) -> JoinHandle<std::io::Result<()>> {
52 let config = config.clone();
53 tokio::spawn(async move {
54 let result = observer(config, peers_reader, server_reader, system_reader).await;
55 if let Err(ref e) = result {
56 warn!("Abnormal termination of the state observer: {}", e);
57 warn!("The state observer will not be available");
58 }
59 result
60 })
61}
62
63async fn observer(
64 config: crate::config::ObserveConfig,
65 peers_reader: tokio::sync::watch::Receiver<Vec<ObservablePeerState>>,
66 server_reader: tokio::sync::watch::Receiver<Vec<ServerData>>,
67 system_reader: tokio::sync::watch::Receiver<SystemSnapshot>,
68) -> std::io::Result<()> {
69 let path = match config.path {
70 Some(path) => path,
71 None => return Ok(()),
72 };
73
74 let peers_listener = create_unix_socket(&path)?;
75
76 let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
80 std::fs::set_permissions(&path, permissions)?;
81
82 loop {
83 let (mut stream, _addr) = peers_listener.accept().await?;
84
85 let observe = ObservableState {
86 peers: peers_reader.borrow().to_owned(),
87 system: *system_reader.borrow(),
88 servers: server_reader.borrow().iter().map(|s| s.into()).collect(),
89 };
90
91 crate::sockets::write_json(&mut stream, &observe).await?;
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use std::{borrow::BorrowMut, time::Duration};
98
99 use ntp_proto::{
100 NtpClock, NtpDuration, NtpLeapIndicator, NtpTimestamp, PollInterval, PollIntervalLimits,
101 Reach, ReferenceId, TimeSnapshot,
102 };
103 use tokio::{io::AsyncReadExt, net::UnixStream};
104
105 use super::*;
106
107 #[derive(Debug, Clone, Default)]
108 struct TestClock {}
109
110 impl NtpClock for TestClock {
111 type Error = std::io::Error;
112
113 fn now(&self) -> std::result::Result<NtpTimestamp, Self::Error> {
114 Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
115 }
116
117 fn set_frequency(&self, _freq: f64) -> Result<NtpTimestamp, Self::Error> {
118 Ok(NtpTimestamp::default())
119 }
120
121 fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
122 Ok(NtpTimestamp::default())
123 }
124
125 fn enable_ntp_algorithm(&self) -> Result<(), Self::Error> {
126 Ok(())
127 }
128
129 fn disable_ntp_algorithm(&self) -> Result<(), Self::Error> {
130 Ok(())
131 }
132
133 fn ntp_algorithm_update(
134 &self,
135 _offset: NtpDuration,
136 _poll_interval: PollInterval,
137 ) -> Result<(), Self::Error> {
138 Ok(())
139 }
140
141 fn error_estimate_update(
142 &self,
143 _est_error: NtpDuration,
144 _max_error: NtpDuration,
145 ) -> Result<(), Self::Error> {
146 Ok(())
147 }
148
149 fn status_update(&self, _leap_status: NtpLeapIndicator) -> Result<(), Self::Error> {
150 Ok(())
151 }
152 }
153
154 #[tokio::test]
155 async fn test_observation() {
156 let path = std::env::temp_dir().join("ntp-test-stream-2");
158 let config = crate::config::ObserveConfig {
159 path: Some(path.clone()),
160 mode: 0o700,
161 };
162
163 let (_, peers_reader) = tokio::sync::watch::channel(vec![
164 ObservablePeerState::Nothing,
165 ObservablePeerState::Nothing,
166 ObservablePeerState::Observable {
167 timedata: Default::default(),
168 reachability: Reach::default(),
169 poll_interval: PollIntervalLimits::default().min,
170 peer_id: ReferenceId::from_ip("127.0.0.1".parse().unwrap()),
171 address: "127.0.0.3:123".into(),
172 },
173 ]);
174
175 let (_, servers_reader) = tokio::sync::watch::channel(vec![]);
176
177 let (_, system_reader) = tokio::sync::watch::channel(SystemSnapshot {
178 stratum: 1,
179 reference_id: ReferenceId::NONE,
180 accumulated_steps_threshold: None,
181 time_snapshot: TimeSnapshot {
182 poll_interval: PollIntervalLimits::default().min,
183 precision: NtpDuration::from_seconds(1e-3),
184 root_delay: NtpDuration::ZERO,
185 root_dispersion: NtpDuration::ZERO,
186 leap_indicator: NtpLeapIndicator::Leap59,
187 accumulated_steps: NtpDuration::ZERO,
188 },
189 });
190
191 let handle = tokio::spawn(async move {
192 observer(config, peers_reader, servers_reader, system_reader)
193 .await
194 .unwrap();
195 });
196
197 tokio::time::sleep(Duration::from_millis(10)).await;
198
199 let mut reader = UnixStream::connect(path).await.unwrap();
200
201 let mut buf = vec![];
202 while reader.read_buf(&mut buf).await.unwrap() != 0 {}
203 let result: ObservableState = serde_json::from_slice(&buf).unwrap();
204
205 let mut count = 0;
207 for peer in &result.peers {
208 if matches!(peer, ObservablePeerState::Observable { .. }) {
209 count += 1;
210 }
211 }
212 assert_eq!(count, 1);
213
214 handle.abort();
215 }
216
217 #[tokio::test]
218 async fn test_block_during_read() {
219 let path = std::env::temp_dir().join("ntp-test-stream-3");
221 let config = crate::config::ObserveConfig {
222 path: Some(path.clone()),
223 mode: 0o700,
224 };
225
226 let (mut peers_writer, peers_reader) = tokio::sync::watch::channel(vec![
227 ObservablePeerState::Nothing,
228 ObservablePeerState::Nothing,
229 ObservablePeerState::Observable {
230 timedata: Default::default(),
231 reachability: Reach::default(),
232 poll_interval: PollIntervalLimits::default().min,
233 peer_id: ReferenceId::from_ip("127.0.0.1".parse().unwrap()),
234 address: "127.0.0.3:123".into(),
235 },
236 ]);
237
238 let (mut server_writer, servers_reader) = tokio::sync::watch::channel(vec![]);
239
240 let (mut system_writer, system_reader) = tokio::sync::watch::channel(SystemSnapshot {
241 stratum: 1,
242 reference_id: ReferenceId::NONE,
243 accumulated_steps_threshold: None,
244 time_snapshot: TimeSnapshot {
245 poll_interval: PollIntervalLimits::default().min,
246 precision: NtpDuration::from_seconds(1e-3),
247 root_delay: NtpDuration::ZERO,
248 root_dispersion: NtpDuration::ZERO,
249 leap_indicator: NtpLeapIndicator::Leap59,
250 accumulated_steps: NtpDuration::ZERO,
251 },
252 });
253
254 let handle = tokio::spawn(async move {
255 observer(config, peers_reader, servers_reader, system_reader)
256 .await
257 .unwrap();
258 });
259
260 tokio::time::sleep(Duration::from_millis(10)).await;
261
262 let mut reader = UnixStream::connect(path).await.unwrap();
263
264 let mut buf = [0_u8; 12];
268 let mut bufref: &mut [u8] = &mut buf;
269 reader.read_buf(&mut bufref).await.unwrap();
270
271 let _ = system_writer.borrow_mut();
273 let _ = peers_writer.borrow_mut();
274 let _ = server_writer.borrow_mut();
275
276 handle.abort();
277 }
278}