ntp_metrics_exporter/
metrics.rs

1use std::{
2    net::SocketAddr,
3    sync::atomic::{AtomicU64, Ordering},
4};
5
6use ntp_daemon::{ObservablePeerState, ObservableState};
7use ntp_os_clock::DefaultNtpClock;
8use ntp_proto::NtpClock;
9use prometheus_client::{
10    encoding::{EncodeLabelSet, EncodeLabelValue},
11    metrics::{counter::Counter, family::Family, gauge::Gauge},
12    registry::{Registry, Unit},
13};
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)]
16struct PeerLabels {
17    address: String,
18}
19
20#[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)]
21struct ServerLabels {
22    listen_address: WrappedSocketAddr,
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub struct WrappedSocketAddr(SocketAddr);
27
28impl From<SocketAddr> for WrappedSocketAddr {
29    fn from(s: SocketAddr) -> Self {
30        WrappedSocketAddr(s)
31    }
32}
33
34impl EncodeLabelValue for WrappedSocketAddr {
35    fn encode(
36        &self,
37        encoder: &mut prometheus_client::encoding::LabelValueEncoder,
38    ) -> Result<(), std::fmt::Error> {
39        use std::fmt::Write;
40
41        encoder.write_fmt(format_args!("{}", &self.0))
42    }
43}
44
45#[derive(Default)]
46pub struct Metrics {
47    system_poll_interval: Gauge<f64, AtomicU64>,
48    system_poll_interval_exp: Gauge,
49    system_precision: Gauge<f64, AtomicU64>,
50    system_accumulated_steps: Gauge<f64, AtomicU64>,
51    system_accumulated_steps_threshold: Gauge<f64, AtomicU64>,
52    system_leap_indicator: Gauge,
53    peer_last_update: Family<PeerLabels, Gauge<f64, AtomicU64>>,
54    peer_poll_interval: Family<PeerLabels, Gauge<f64, AtomicU64>>,
55    peer_poll_interval_exp: Family<PeerLabels, Gauge>,
56    peer_reachability_status: Family<PeerLabels, Gauge>,
57    peer_offset: Family<PeerLabels, Gauge<f64, AtomicU64>>,
58    peer_uncertainty: Family<PeerLabels, Gauge<f64, AtomicU64>>,
59    peer_delay: Family<PeerLabels, Gauge<f64, AtomicU64>>,
60    server_received_packets: Family<ServerLabels, Counter>,
61    server_accepted_packets: Family<ServerLabels, Counter>,
62    server_denied_packets: Family<ServerLabels, Counter>,
63    server_ignored_packets: Family<ServerLabels, Counter>,
64    server_rate_limited_packets: Family<ServerLabels, Counter>,
65    server_response_send_errors: Family<ServerLabels, Counter>,
66}
67
68impl Metrics {
69    pub fn fill(&self, data: &ObservableState) {
70        let clock = DefaultNtpClock::realtime();
71
72        self.system_poll_interval.set(
73            data.system
74                .time_snapshot
75                .poll_interval
76                .as_duration()
77                .to_seconds(),
78        );
79        self.system_poll_interval_exp
80            .set(data.system.time_snapshot.poll_interval.as_log() as i64);
81        self.system_precision
82            .set(data.system.time_snapshot.precision.to_seconds());
83        self.system_accumulated_steps
84            .set(data.system.time_snapshot.accumulated_steps.to_seconds());
85        self.system_accumulated_steps_threshold.set(
86            data.system
87                .accumulated_steps_threshold
88                .map(|v| v.to_seconds())
89                .unwrap_or(-1.0),
90        );
91        self.system_leap_indicator
92            .set(data.system.time_snapshot.leap_indicator as i64);
93
94        for peer in &data.peers {
95            if let ObservablePeerState::Observable {
96                timedata,
97                reachability,
98                poll_interval,
99                address,
100                ..
101            } = peer
102            {
103                let labels = PeerLabels {
104                    address: address.clone(),
105                };
106                self.peer_last_update.get_or_create(&labels).set(
107                    (timedata.last_update
108                        - clock.now().expect("Unable to get current system time"))
109                    .to_seconds(),
110                );
111                self.peer_poll_interval
112                    .get_or_create(&labels)
113                    .set(poll_interval.as_duration().to_seconds());
114                self.peer_poll_interval_exp
115                    .get_or_create(&labels)
116                    .set(poll_interval.as_log() as i64);
117                self.peer_reachability_status
118                    .get_or_create(&labels)
119                    .set(reachability.reachability_score() as i64);
120                self.peer_offset
121                    .get_or_create(&labels)
122                    .set(timedata.offset.to_seconds());
123                self.peer_delay
124                    .get_or_create(&labels)
125                    .set(timedata.delay.to_seconds());
126                self.peer_uncertainty
127                    .get_or_create(&labels)
128                    .set(timedata.uncertainty.to_seconds());
129            }
130        }
131
132        for server in &data.servers {
133            let labels = ServerLabels {
134                listen_address: WrappedSocketAddr(server.address),
135            };
136
137            self.server_received_packets
138                .get_or_create(&labels)
139                .inner()
140                .store(server.stats.received_packets.get(), Ordering::Relaxed);
141            self.server_accepted_packets
142                .get_or_create(&labels)
143                .inner()
144                .store(server.stats.accepted_packets.get(), Ordering::Relaxed);
145            self.server_denied_packets
146                .get_or_create(&labels)
147                .inner()
148                .store(server.stats.denied_packets.get(), Ordering::Relaxed);
149            self.server_ignored_packets
150                .get_or_create(&labels)
151                .inner()
152                .store(server.stats.ignored_packets.get(), Ordering::Relaxed);
153            self.server_rate_limited_packets
154                .get_or_create(&labels)
155                .inner()
156                .store(server.stats.rate_limited_packets.get(), Ordering::Relaxed);
157            self.server_response_send_errors
158                .get_or_create(&labels)
159                .inner()
160                .store(server.stats.response_send_errors.get(), Ordering::Relaxed);
161        }
162    }
163
164    pub fn registry(&self) -> Registry {
165        let mut registry = <Registry>::with_prefix("ntp");
166
167        let system = registry.sub_registry_with_prefix("system");
168
169        system.register_with_unit(
170            "poll_interval",
171            "Time between polls of the system",
172            Unit::Seconds,
173            self.system_poll_interval.clone(),
174        );
175        system.register(
176            "poll_interval",
177            "Exponent of time between poll intervals",
178            self.system_poll_interval_exp.clone(),
179        );
180        system.register_with_unit(
181            "precision",
182            "Precision of the local clock",
183            Unit::Seconds,
184            self.system_precision.clone(),
185        );
186        system.register_with_unit(
187            "accumulated_steps",
188            "Accumulated amount of seconds that the system needed to jump the time",
189            Unit::Seconds,
190            self.system_accumulated_steps.clone(),
191        );
192        system.register_with_unit(
193            "accumulated_steps_threshold",
194            "Threshold for the accumulated step amount at which the NTP daemon will exit (or -1 if no threshold was set)",
195            Unit::Seconds,
196            self.system_accumulated_steps_threshold.clone(),
197        );
198        system.register(
199            "leap_indicator",
200            "Indicates that a leap second will take place",
201            self.system_leap_indicator.clone(),
202        );
203
204        let peer = registry.sub_registry_with_prefix("peer");
205
206        peer.register_with_unit(
207            "uptime",
208            "Time since the peer was started",
209            Unit::Seconds,
210            self.peer_last_update.clone(),
211        );
212
213        peer.register_with_unit(
214            "poll_interval",
215            "Time between polls of the peer",
216            Unit::Seconds,
217            self.peer_poll_interval.clone(),
218        );
219
220        peer.register(
221            "poll_interval",
222            "Exponent of time between polls of the peer",
223            self.peer_poll_interval_exp.clone(),
224        );
225
226        peer.register(
227            "reachability_status",
228            "Number of polls until the upstream server is unreachable, zero if it is",
229            self.peer_reachability_status.clone(),
230        );
231
232        peer.register_with_unit(
233            "offset",
234            "Offset between the upstream server and system time",
235            Unit::Seconds,
236            self.peer_offset.clone(),
237        );
238
239        peer.register_with_unit(
240            "delay",
241            "Current round-trip delay to the upstream server",
242            Unit::Seconds,
243            self.peer_delay.clone(),
244        );
245
246        peer.register_with_unit(
247            "uncertainty",
248            "Estimated error of the clock",
249            Unit::Seconds,
250            self.peer_uncertainty.clone(),
251        );
252
253        let server = registry.sub_registry_with_prefix("server");
254
255        server.register(
256            "received_packets",
257            "Number of incoming received packets",
258            self.server_received_packets.clone(),
259        );
260
261        server.register(
262            "accepted_packets",
263            "Number of packets accepted",
264            self.server_accepted_packets.clone(),
265        );
266
267        server.register(
268            "denied_packets",
269            "Number of denied packets",
270            self.server_denied_packets.clone(),
271        );
272
273        server.register(
274            "ignored_packets",
275            "Number of packets ignored",
276            self.server_ignored_packets.clone(),
277        );
278
279        server.register(
280            "rate_limited_packets",
281            "Number of rate limited packets",
282            self.server_rate_limited_packets.clone(),
283        );
284
285        server.register(
286            "response_send_errors",
287            "Number of packets where there was an error responding",
288            self.server_response_send_errors.clone(),
289        );
290
291        registry
292    }
293}