Skip to main content

tsoracle_server/
reporter.rs

1//
2//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6//  tsoracle — Distributed Timestamp Oracle
7//  https://www.tsoracle.rs
8//
9//  Copyright (c) 2026 Prisma Risk
10//
11//  Licensed under the Apache License, Version 2.0 (the "License");
12//  you may not use this file except in compliance with the License.
13//  You may obtain a copy of the License at
14//
15//      https://www.apache.org/licenses/LICENSE-2.0
16//
17//  Unless required by applicable law or agreed to in writing, software
18//  distributed under the License is distributed on an "AS IS" BASIS,
19//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20//  See the License for the specific language governing permissions and
21//  limitations under the License.
22//
23
24//! Typed metric facade for `tsoracle-server`.
25//!
26//! `Reporter` is the single place metric-name string literals live in this
27//! crate. Each typed counter/histogram field forwards an increment/record to
28//! both the global `metrics::` recorder (Prometheus path, unchanged) and a
29//! local `Arc<AtomicU64>` that the heartbeat task can read without depending
30//! on any installed recorder.
31//!
32//! **Scaffold note**: items are introduced here and wired up in later plan
33//! tasks (T3–T11). `#![allow(dead_code)]` is present for that reason and
34//! will become unnecessary once the call sites land in Task 7.
35#![allow(dead_code)]
36
37use std::sync::Arc;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::time::{Instant, SystemTime, UNIX_EPOCH};
40
41use tsoracle_core::IgnoreReason;
42
43pub struct ReporterCounter {
44    name: &'static str,
45    local: Arc<AtomicU64>,
46}
47
48impl ReporterCounter {
49    pub(crate) fn new(name: &'static str) -> Self {
50        Self {
51            name,
52            local: Arc::new(AtomicU64::new(0)),
53        }
54    }
55
56    pub(crate) fn increment(&self, n: u64) {
57        #[cfg(feature = "metrics")]
58        metrics::counter!(self.name).increment(n);
59        self.local.fetch_add(n, Ordering::Relaxed);
60    }
61
62    pub(crate) fn snapshot(&self) -> u64 {
63        self.local.load(Ordering::Relaxed)
64    }
65}
66
67pub struct ReporterHistogram {
68    name: &'static str,
69}
70
71impl ReporterHistogram {
72    pub(crate) fn new(name: &'static str) -> Self {
73        Self { name }
74    }
75
76    pub(crate) fn record(&self, _v: f64) {
77        #[cfg(feature = "metrics")]
78        metrics::histogram!(self.name).record(_v);
79    }
80}
81
82pub struct ReporterTimestamp {
83    local: Arc<AtomicU64>,
84}
85
86impl ReporterTimestamp {
87    pub(crate) fn new() -> Self {
88        Self {
89            local: Arc::new(AtomicU64::new(0)),
90        }
91    }
92
93    /// Record `SystemTime::now()` as unix-ms. Idempotent under concurrent
94    /// callers — the last writer wins under `Relaxed`, which matches the
95    /// "approximate last transition time" semantics the heartbeat needs.
96    pub(crate) fn touch_now(&self) {
97        self.local.store(now_unix_ms(), Ordering::Relaxed);
98    }
99
100    /// `None` if never touched, else the most recent unix-ms.
101    pub(crate) fn snapshot(&self) -> Option<u64> {
102        let v = self.local.load(Ordering::Relaxed);
103        (v != 0).then_some(v)
104    }
105}
106
107pub struct IgnoredCommitsByReason {
108    pub not_leader: ReporterCounter,
109    pub epoch_mismatch: ReporterCounter,
110    pub not_advanced: ReporterCounter,
111}
112
113impl IgnoredCommitsByReason {
114    fn new() -> Self {
115        Self {
116            not_leader: ReporterCounter::new("tsoracle.window.extensions.ignored.not_leader.total"),
117            epoch_mismatch: ReporterCounter::new(
118                "tsoracle.window.extensions.ignored.epoch_mismatch.total",
119            ),
120            not_advanced: ReporterCounter::new(
121                "tsoracle.window.extensions.ignored.not_advanced.total",
122            ),
123        }
124    }
125
126    pub(crate) fn for_reason(&self, reason: IgnoreReason) -> &ReporterCounter {
127        match reason {
128            IgnoreReason::NotLeader => &self.not_leader,
129            IgnoreReason::EpochMismatch { .. } => &self.epoch_mismatch,
130            IgnoreReason::NotAdvanced { .. } => &self.not_advanced,
131        }
132    }
133}
134
135pub struct Reporter {
136    // get_ts hot path
137    pub get_ts_requests: ReporterCounter,
138    pub get_ts_success: ReporterCounter,
139    pub timestamps_issued: ReporterCounter,
140
141    // leader / fence
142    pub not_leader: ReporterCounter,
143    pub leader_transitions: ReporterCounter,
144    pub fence_transient_retries: ReporterCounter,
145    pub fence_latency: ReporterHistogram,
146
147    // window
148    pub window_extensions: ReporterCounter,
149    pub window_extension_latency: ReporterHistogram,
150    pub ignored_commits: IgnoredCommitsByReason,
151
152    // lifecycle
153    pub shutdown_watch_aborted: ReporterCounter,
154    pub heartbeat_task_panicked: ReporterCounter,
155    pub last_leader_transition: ReporterTimestamp,
156    pub started_at: Instant,
157}
158
159impl Reporter {
160    pub fn new() -> Self {
161        Self {
162            get_ts_requests: ReporterCounter::new("tsoracle.get_ts.requests.total"),
163            get_ts_success: ReporterCounter::new("tsoracle.get_ts.success.total"),
164            timestamps_issued: ReporterCounter::new("tsoracle.get_ts.timestamps_issued"),
165            not_leader: ReporterCounter::new("tsoracle.not_leader.total"),
166            leader_transitions: ReporterCounter::new("tsoracle.leader_transition.total"),
167            fence_transient_retries: ReporterCounter::new(
168                "tsoracle.leader_transition.fence_transient_retries.total",
169            ),
170            fence_latency: ReporterHistogram::new("tsoracle.leader_transition.fence_latency"),
171            window_extensions: ReporterCounter::new("tsoracle.window.extensions.total"),
172            window_extension_latency: ReporterHistogram::new("tsoracle.window.extension_latency"),
173            ignored_commits: IgnoredCommitsByReason::new(),
174            shutdown_watch_aborted: ReporterCounter::new("tsoracle.shutdown.watch_aborted.total"),
175            heartbeat_task_panicked: ReporterCounter::new("tsoracle.heartbeat.task_panicked.total"),
176            last_leader_transition: ReporterTimestamp::new(),
177            started_at: Instant::now(),
178        }
179    }
180
181    /// Convenience constructor for unit tests. Same as `Reporter::new()`; the
182    /// dedicated name documents intent at call sites (tests that just need a
183    /// Reporter to satisfy a struct field).
184    #[cfg(any(test, feature = "test-support"))]
185    pub fn for_tests() -> Self {
186        Self::new()
187    }
188}
189
190impl Default for Reporter {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196pub(crate) fn now_unix_ms() -> u64 {
197    SystemTime::now()
198        .duration_since(UNIX_EPOCH)
199        .map(|d| d.as_millis() as u64)
200        .unwrap_or(0)
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    #[test]
208    fn counter_increment_bumps_local_snapshot() {
209        let c = ReporterCounter::new("tsoracle.test.local");
210        assert_eq!(c.snapshot(), 0);
211        c.increment(1);
212        c.increment(4);
213        assert_eq!(c.snapshot(), 5);
214    }
215
216    #[test]
217    #[cfg(feature = "metrics")]
218    fn counter_increment_forwards_to_metrics_recorder() {
219        use metrics::Key;
220        use metrics_util::CompositeKey;
221        use metrics_util::MetricKind;
222        use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
223
224        let recorder = DebuggingRecorder::new();
225        let snapshotter: Snapshotter = recorder.snapshotter();
226
227        metrics::with_local_recorder(&recorder, || {
228            let c = ReporterCounter::new("tsoracle.test.forwarded");
229            c.increment(7);
230        });
231
232        let snap = snapshotter.snapshot().into_vec();
233        let key = CompositeKey::new(
234            MetricKind::Counter,
235            Key::from_name("tsoracle.test.forwarded"),
236        );
237        let entry = snap
238            .iter()
239            .find(|(k, ..)| *k == key)
240            .expect("counter not recorded by DebuggingRecorder");
241        let (_, _, _, value) = entry;
242        assert_eq!(
243            format!("{value:?}"),
244            "Counter(7)",
245            "expected the recorder to observe Counter(7), got {value:?}"
246        );
247    }
248
249    #[test]
250    #[cfg(feature = "metrics")]
251    fn histogram_record_forwards_to_metrics_recorder() {
252        use metrics::Key;
253        use metrics_util::CompositeKey;
254        use metrics_util::MetricKind;
255        use metrics_util::debugging::DebuggingRecorder;
256
257        let recorder = DebuggingRecorder::new();
258        let snapshotter = recorder.snapshotter();
259
260        metrics::with_local_recorder(&recorder, || {
261            let h = ReporterHistogram::new("tsoracle.test.hist");
262            h.record(1.0);
263            h.record(2.0);
264            h.record(3.0);
265        });
266
267        let snap = snapshotter.snapshot().into_vec();
268        let key = CompositeKey::new(MetricKind::Histogram, Key::from_name("tsoracle.test.hist"));
269        assert!(
270            snap.iter().any(|(k, ..)| *k == key),
271            "histogram key not recorded"
272        );
273    }
274
275    #[test]
276    fn timestamp_zero_is_none() {
277        let t = ReporterTimestamp::new();
278        assert_eq!(t.snapshot(), None);
279    }
280
281    #[test]
282    fn timestamp_touch_records_now() {
283        let before = now_unix_ms();
284        let t = ReporterTimestamp::new();
285        t.touch_now();
286        let after = now_unix_ms();
287        let observed = t.snapshot().expect("touch_now should produce Some");
288        assert!(
289            observed >= before && observed <= after,
290            "timestamp {observed} not within [{before}, {after}]"
291        );
292    }
293
294    #[test]
295    #[cfg(not(feature = "metrics"))]
296    fn counter_works_without_metrics_feature() {
297        // With the `metrics` feature off, the increment body skips the
298        // recorder forward but the local atomic still bumps — the heartbeat
299        // path stays functional in default tsoracle-bin builds (which do not
300        // enable the `metrics` feature on tsoracle-server).
301        let c = ReporterCounter::new("tsoracle.test.no_metrics");
302        c.increment(3);
303        assert_eq!(c.snapshot(), 3);
304    }
305
306    #[test]
307    #[cfg(feature = "metrics")]
308    fn reporter_new_resolves_distinct_metric_names() {
309        use metrics::Key;
310        use metrics_util::CompositeKey;
311        use metrics_util::MetricKind;
312        use metrics_util::debugging::DebuggingRecorder;
313        use tsoracle_core::{Epoch, IgnoreReason};
314
315        let recorder = DebuggingRecorder::new();
316        let snapshotter = recorder.snapshotter();
317
318        metrics::with_local_recorder(&recorder, || {
319            let r = Reporter::new();
320            r.ignored_commits
321                .for_reason(IgnoreReason::NotLeader)
322                .increment(1);
323            r.ignored_commits
324                .for_reason(IgnoreReason::EpochMismatch {
325                    expected: Epoch(1),
326                    current: Epoch(2),
327                })
328                .increment(1);
329            r.ignored_commits
330                .for_reason(IgnoreReason::NotAdvanced {
331                    persisted: 1,
332                    committed: 2,
333                })
334                .increment(1);
335        });
336
337        let snap = snapshotter.snapshot().into_vec();
338        for name in [
339            "tsoracle.window.extensions.ignored.not_leader.total",
340            "tsoracle.window.extensions.ignored.epoch_mismatch.total",
341            "tsoracle.window.extensions.ignored.not_advanced.total",
342        ] {
343            let key = CompositeKey::new(MetricKind::Counter, Key::from_name(name));
344            assert!(
345                snap.iter().any(|(k, ..)| *k == key),
346                "expected metric {name} to be recorded distinctly"
347            );
348        }
349    }
350}