1use std::{
2 net::SocketAddr,
3 sync::atomic::{AtomicU64, Ordering},
4};
5
6use ntp_daemon::{ObservablePeerState, ObservableState};
7use ntp_os_clock::DefaultNtpClock;
8use ntp_proto::NtpClock;
9use prometheus_client::{
10 encoding::{EncodeLabelSet, EncodeLabelValue},
11 metrics::{counter::Counter, family::Family, gauge::Gauge},
12 registry::{Registry, Unit},
13};
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)]
16struct PeerLabels {
17 address: String,
18}
19
20#[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)]
21struct ServerLabels {
22 listen_address: WrappedSocketAddr,
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub struct WrappedSocketAddr(SocketAddr);
27
28impl From<SocketAddr> for WrappedSocketAddr {
29 fn from(s: SocketAddr) -> Self {
30 WrappedSocketAddr(s)
31 }
32}
33
34impl EncodeLabelValue for WrappedSocketAddr {
35 fn encode(
36 &self,
37 encoder: &mut prometheus_client::encoding::LabelValueEncoder,
38 ) -> Result<(), std::fmt::Error> {
39 use std::fmt::Write;
40
41 encoder.write_fmt(format_args!("{}", &self.0))
42 }
43}
44
45#[derive(Default)]
46pub struct Metrics {
47 system_poll_interval: Gauge<f64, AtomicU64>,
48 system_poll_interval_exp: Gauge,
49 system_precision: Gauge<f64, AtomicU64>,
50 system_accumulated_steps: Gauge<f64, AtomicU64>,
51 system_accumulated_steps_threshold: Gauge<f64, AtomicU64>,
52 system_leap_indicator: Gauge,
53 peer_last_update: Family<PeerLabels, Gauge<f64, AtomicU64>>,
54 peer_poll_interval: Family<PeerLabels, Gauge<f64, AtomicU64>>,
55 peer_poll_interval_exp: Family<PeerLabels, Gauge>,
56 peer_reachability_status: Family<PeerLabels, Gauge>,
57 peer_offset: Family<PeerLabels, Gauge<f64, AtomicU64>>,
58 peer_uncertainty: Family<PeerLabels, Gauge<f64, AtomicU64>>,
59 peer_delay: Family<PeerLabels, Gauge<f64, AtomicU64>>,
60 server_received_packets: Family<ServerLabels, Counter>,
61 server_accepted_packets: Family<ServerLabels, Counter>,
62 server_denied_packets: Family<ServerLabels, Counter>,
63 server_ignored_packets: Family<ServerLabels, Counter>,
64 server_rate_limited_packets: Family<ServerLabels, Counter>,
65 server_response_send_errors: Family<ServerLabels, Counter>,
66}
67
68impl Metrics {
69 pub fn fill(&self, data: &ObservableState) {
70 let clock = DefaultNtpClock::realtime();
71
72 self.system_poll_interval.set(
73 data.system
74 .time_snapshot
75 .poll_interval
76 .as_duration()
77 .to_seconds(),
78 );
79 self.system_poll_interval_exp
80 .set(data.system.time_snapshot.poll_interval.as_log() as i64);
81 self.system_precision
82 .set(data.system.time_snapshot.precision.to_seconds());
83 self.system_accumulated_steps
84 .set(data.system.time_snapshot.accumulated_steps.to_seconds());
85 self.system_accumulated_steps_threshold.set(
86 data.system
87 .accumulated_steps_threshold
88 .map(|v| v.to_seconds())
89 .unwrap_or(-1.0),
90 );
91 self.system_leap_indicator
92 .set(data.system.time_snapshot.leap_indicator as i64);
93
94 for peer in &data.peers {
95 if let ObservablePeerState::Observable {
96 timedata,
97 reachability,
98 poll_interval,
99 address,
100 ..
101 } = peer
102 {
103 let labels = PeerLabels {
104 address: address.clone(),
105 };
106 self.peer_last_update.get_or_create(&labels).set(
107 (timedata.last_update
108 - clock.now().expect("Unable to get current system time"))
109 .to_seconds(),
110 );
111 self.peer_poll_interval
112 .get_or_create(&labels)
113 .set(poll_interval.as_duration().to_seconds());
114 self.peer_poll_interval_exp
115 .get_or_create(&labels)
116 .set(poll_interval.as_log() as i64);
117 self.peer_reachability_status
118 .get_or_create(&labels)
119 .set(reachability.reachability_score() as i64);
120 self.peer_offset
121 .get_or_create(&labels)
122 .set(timedata.offset.to_seconds());
123 self.peer_delay
124 .get_or_create(&labels)
125 .set(timedata.delay.to_seconds());
126 self.peer_uncertainty
127 .get_or_create(&labels)
128 .set(timedata.uncertainty.to_seconds());
129 }
130 }
131
132 for server in &data.servers {
133 let labels = ServerLabels {
134 listen_address: WrappedSocketAddr(server.address),
135 };
136
137 self.server_received_packets
138 .get_or_create(&labels)
139 .inner()
140 .store(server.stats.received_packets.get(), Ordering::Relaxed);
141 self.server_accepted_packets
142 .get_or_create(&labels)
143 .inner()
144 .store(server.stats.accepted_packets.get(), Ordering::Relaxed);
145 self.server_denied_packets
146 .get_or_create(&labels)
147 .inner()
148 .store(server.stats.denied_packets.get(), Ordering::Relaxed);
149 self.server_ignored_packets
150 .get_or_create(&labels)
151 .inner()
152 .store(server.stats.ignored_packets.get(), Ordering::Relaxed);
153 self.server_rate_limited_packets
154 .get_or_create(&labels)
155 .inner()
156 .store(server.stats.rate_limited_packets.get(), Ordering::Relaxed);
157 self.server_response_send_errors
158 .get_or_create(&labels)
159 .inner()
160 .store(server.stats.response_send_errors.get(), Ordering::Relaxed);
161 }
162 }
163
164 pub fn registry(&self) -> Registry {
165 let mut registry = <Registry>::with_prefix("ntp");
166
167 let system = registry.sub_registry_with_prefix("system");
168
169 system.register_with_unit(
170 "poll_interval",
171 "Time between polls of the system",
172 Unit::Seconds,
173 self.system_poll_interval.clone(),
174 );
175 system.register(
176 "poll_interval",
177 "Exponent of time between poll intervals",
178 self.system_poll_interval_exp.clone(),
179 );
180 system.register_with_unit(
181 "precision",
182 "Precision of the local clock",
183 Unit::Seconds,
184 self.system_precision.clone(),
185 );
186 system.register_with_unit(
187 "accumulated_steps",
188 "Accumulated amount of seconds that the system needed to jump the time",
189 Unit::Seconds,
190 self.system_accumulated_steps.clone(),
191 );
192 system.register_with_unit(
193 "accumulated_steps_threshold",
194 "Threshold for the accumulated step amount at which the NTP daemon will exit (or -1 if no threshold was set)",
195 Unit::Seconds,
196 self.system_accumulated_steps_threshold.clone(),
197 );
198 system.register(
199 "leap_indicator",
200 "Indicates that a leap second will take place",
201 self.system_leap_indicator.clone(),
202 );
203
204 let peer = registry.sub_registry_with_prefix("peer");
205
206 peer.register_with_unit(
207 "uptime",
208 "Time since the peer was started",
209 Unit::Seconds,
210 self.peer_last_update.clone(),
211 );
212
213 peer.register_with_unit(
214 "poll_interval",
215 "Time between polls of the peer",
216 Unit::Seconds,
217 self.peer_poll_interval.clone(),
218 );
219
220 peer.register(
221 "poll_interval",
222 "Exponent of time between polls of the peer",
223 self.peer_poll_interval_exp.clone(),
224 );
225
226 peer.register(
227 "reachability_status",
228 "Number of polls until the upstream server is unreachable, zero if it is",
229 self.peer_reachability_status.clone(),
230 );
231
232 peer.register_with_unit(
233 "offset",
234 "Offset between the upstream server and system time",
235 Unit::Seconds,
236 self.peer_offset.clone(),
237 );
238
239 peer.register_with_unit(
240 "delay",
241 "Current round-trip delay to the upstream server",
242 Unit::Seconds,
243 self.peer_delay.clone(),
244 );
245
246 peer.register_with_unit(
247 "uncertainty",
248 "Estimated error of the clock",
249 Unit::Seconds,
250 self.peer_uncertainty.clone(),
251 );
252
253 let server = registry.sub_registry_with_prefix("server");
254
255 server.register(
256 "received_packets",
257 "Number of incoming received packets",
258 self.server_received_packets.clone(),
259 );
260
261 server.register(
262 "accepted_packets",
263 "Number of packets accepted",
264 self.server_accepted_packets.clone(),
265 );
266
267 server.register(
268 "denied_packets",
269 "Number of denied packets",
270 self.server_denied_packets.clone(),
271 );
272
273 server.register(
274 "ignored_packets",
275 "Number of packets ignored",
276 self.server_ignored_packets.clone(),
277 );
278
279 server.register(
280 "rate_limited_packets",
281 "Number of rate limited packets",
282 self.server_rate_limited_packets.clone(),
283 );
284
285 server.register(
286 "response_send_errors",
287 "Number of packets where there was an error responding",
288 self.server_response_send_errors.clone(),
289 );
290
291 registry
292 }
293}