Skip to main content

tsoracle_server/
reporter.rs

1//
2//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6//  tsoracle — Distributed Timestamp Oracle
7//  https://www.tsoracle.rs
8//
9//  Copyright (c) 2026 Prisma Risk
10//
11//  Licensed under the Apache License, Version 2.0 (the "License");
12//  you may not use this file except in compliance with the License.
13//  You may obtain a copy of the License at
14//
15//      https://www.apache.org/licenses/LICENSE-2.0
16//
17//  Unless required by applicable law or agreed to in writing, software
18//  distributed under the License is distributed on an "AS IS" BASIS,
19//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20//  See the License for the specific language governing permissions and
21//  limitations under the License.
22//
23
24//! Typed metric facade for `tsoracle-server`.
25//!
26//! `Reporter` is the single place metric-name string literals live in this
27//! crate. Each typed counter/histogram field forwards an increment/record to
28//! both the global `metrics::` recorder (Prometheus path, unchanged) and a
29//! local `Arc<AtomicU64>` that the heartbeat task can read without depending
30//! on any installed recorder.
31//!
32//! **Scaffold note**: items are introduced here and wired up in later plan
33//! tasks (T3–T11). `#![allow(dead_code)]` is present for that reason and
34//! will become unnecessary once the call sites land in Task 7.
35#![allow(dead_code)]
36
37use std::sync::Arc;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::time::{Instant, SystemTime, UNIX_EPOCH};
40
41use tsoracle_core::IgnoreReason;
42
43pub struct ReporterCounter {
44    name: &'static str,
45    local: Arc<AtomicU64>,
46}
47
48impl ReporterCounter {
49    pub(crate) fn new(name: &'static str) -> Self {
50        Self {
51            name,
52            local: Arc::new(AtomicU64::new(0)),
53        }
54    }
55
56    pub(crate) fn increment(&self, n: u64) {
57        #[cfg(feature = "metrics")]
58        metrics::counter!(self.name).increment(n);
59        self.local.fetch_add(n, Ordering::Relaxed);
60    }
61
62    pub(crate) fn snapshot(&self) -> u64 {
63        self.local.load(Ordering::Relaxed)
64    }
65}
66
67pub struct ReporterHistogram {
68    name: &'static str,
69}
70
71impl ReporterHistogram {
72    pub(crate) fn new(name: &'static str) -> Self {
73        Self { name }
74    }
75
76    pub(crate) fn record(&self, _v: f64) {
77        #[cfg(feature = "metrics")]
78        metrics::histogram!(self.name).record(_v);
79    }
80}
81
82pub struct ReporterTimestamp {
83    local: Arc<AtomicU64>,
84}
85
86impl ReporterTimestamp {
87    pub(crate) fn new() -> Self {
88        Self {
89            local: Arc::new(AtomicU64::new(0)),
90        }
91    }
92
93    /// Record `SystemTime::now()` as unix-ms. Idempotent under concurrent
94    /// callers — the last writer wins under `Relaxed`, which matches the
95    /// "approximate last transition time" semantics the heartbeat needs.
96    pub(crate) fn touch_now(&self) {
97        self.local.store(now_unix_ms(), Ordering::Relaxed);
98    }
99
100    /// `None` if never touched, else the most recent unix-ms.
101    pub(crate) fn snapshot(&self) -> Option<u64> {
102        let v = self.local.load(Ordering::Relaxed);
103        (v != 0).then_some(v)
104    }
105}
106
107pub struct IgnoredCommitsByReason {
108    pub not_leader: ReporterCounter,
109    pub epoch_mismatch: ReporterCounter,
110    pub not_advanced: ReporterCounter,
111}
112
113impl IgnoredCommitsByReason {
114    fn new() -> Self {
115        Self {
116            not_leader: ReporterCounter::new("tsoracle.window.extensions.ignored.not_leader.total"),
117            epoch_mismatch: ReporterCounter::new(
118                "tsoracle.window.extensions.ignored.epoch_mismatch.total",
119            ),
120            not_advanced: ReporterCounter::new(
121                "tsoracle.window.extensions.ignored.not_advanced.total",
122            ),
123        }
124    }
125
126    pub(crate) fn for_reason(&self, reason: IgnoreReason) -> &ReporterCounter {
127        match reason {
128            IgnoreReason::NotLeader => &self.not_leader,
129            IgnoreReason::EpochMismatch { .. } => &self.epoch_mismatch,
130            IgnoreReason::NotAdvanced { .. } => &self.not_advanced,
131        }
132    }
133}
134
135pub struct Reporter {
136    // get_ts hot path
137    pub get_ts_requests: ReporterCounter,
138    pub get_ts_success: ReporterCounter,
139    pub timestamps_issued: ReporterCounter,
140
141    // get_seq hot path
142    pub get_seq_requests: ReporterCounter,
143    pub get_seq_success: ReporterCounter,
144    pub seq_values_issued: ReporterCounter,
145    pub seq_cardinality_rejected: ReporterCounter,
146
147    // leader / fence
148    pub not_leader: ReporterCounter,
149    pub leader_transitions: ReporterCounter,
150    pub fence_transient_retries: ReporterCounter,
151    pub fence_latency: ReporterHistogram,
152
153    // window
154    pub window_extensions: ReporterCounter,
155    pub window_extension_latency: ReporterHistogram,
156    pub ignored_commits: IgnoredCommitsByReason,
157
158    // lifecycle
159    pub shutdown_watch_aborted: ReporterCounter,
160    pub heartbeat_task_panicked: ReporterCounter,
161    pub last_leader_transition: ReporterTimestamp,
162    pub started_at: Instant,
163}
164
165impl Reporter {
166    pub fn new() -> Self {
167        Self {
168            get_ts_requests: ReporterCounter::new("tsoracle.get_ts.requests.total"),
169            get_ts_success: ReporterCounter::new("tsoracle.get_ts.success.total"),
170            timestamps_issued: ReporterCounter::new("tsoracle.get_ts.timestamps_issued"),
171            get_seq_requests: ReporterCounter::new("tsoracle.get_seq.requests.total"),
172            get_seq_success: ReporterCounter::new("tsoracle.get_seq.success.total"),
173            seq_values_issued: ReporterCounter::new("tsoracle.get_seq.values_issued"),
174            seq_cardinality_rejected: ReporterCounter::new(
175                "tsoracle.get_seq.cardinality_rejected.total",
176            ),
177            not_leader: ReporterCounter::new("tsoracle.not_leader.total"),
178            leader_transitions: ReporterCounter::new("tsoracle.leader_transition.total"),
179            fence_transient_retries: ReporterCounter::new(
180                "tsoracle.leader_transition.fence_transient_retries.total",
181            ),
182            fence_latency: ReporterHistogram::new("tsoracle.leader_transition.fence_latency"),
183            window_extensions: ReporterCounter::new("tsoracle.window.extensions.total"),
184            window_extension_latency: ReporterHistogram::new("tsoracle.window.extension_latency"),
185            ignored_commits: IgnoredCommitsByReason::new(),
186            shutdown_watch_aborted: ReporterCounter::new("tsoracle.shutdown.watch_aborted.total"),
187            heartbeat_task_panicked: ReporterCounter::new("tsoracle.heartbeat.task_panicked.total"),
188            last_leader_transition: ReporterTimestamp::new(),
189            started_at: Instant::now(),
190        }
191    }
192
193    /// Convenience constructor for unit tests. Same as `Reporter::new()`; the
194    /// dedicated name documents intent at call sites (tests that just need a
195    /// Reporter to satisfy a struct field).
196    #[cfg(any(test, feature = "test-support"))]
197    pub fn for_tests() -> Self {
198        Self::new()
199    }
200}
201
202impl Default for Reporter {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208pub(crate) fn now_unix_ms() -> u64 {
209    SystemTime::now()
210        .duration_since(UNIX_EPOCH)
211        .map(|d| d.as_millis() as u64)
212        .unwrap_or(0)
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[test]
220    fn counter_increment_bumps_local_snapshot() {
221        let c = ReporterCounter::new("tsoracle.test.local");
222        assert_eq!(c.snapshot(), 0);
223        c.increment(1);
224        c.increment(4);
225        assert_eq!(c.snapshot(), 5);
226    }
227
228    #[test]
229    #[cfg(feature = "metrics")]
230    fn counter_increment_forwards_to_metrics_recorder() {
231        use metrics::Key;
232        use metrics_util::CompositeKey;
233        use metrics_util::MetricKind;
234        use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
235
236        let recorder = DebuggingRecorder::new();
237        let snapshotter: Snapshotter = recorder.snapshotter();
238
239        metrics::with_local_recorder(&recorder, || {
240            let c = ReporterCounter::new("tsoracle.test.forwarded");
241            c.increment(7);
242        });
243
244        let snap = snapshotter.snapshot().into_vec();
245        let key = CompositeKey::new(
246            MetricKind::Counter,
247            Key::from_name("tsoracle.test.forwarded"),
248        );
249        let entry = snap
250            .iter()
251            .find(|(k, ..)| *k == key)
252            .expect("counter not recorded by DebuggingRecorder");
253        let (_, _, _, value) = entry;
254        assert_eq!(
255            format!("{value:?}"),
256            "Counter(7)",
257            "expected the recorder to observe Counter(7), got {value:?}"
258        );
259    }
260
261    #[test]
262    #[cfg(feature = "metrics")]
263    fn histogram_record_forwards_to_metrics_recorder() {
264        use metrics::Key;
265        use metrics_util::CompositeKey;
266        use metrics_util::MetricKind;
267        use metrics_util::debugging::DebuggingRecorder;
268
269        let recorder = DebuggingRecorder::new();
270        let snapshotter = recorder.snapshotter();
271
272        metrics::with_local_recorder(&recorder, || {
273            let h = ReporterHistogram::new("tsoracle.test.hist");
274            h.record(1.0);
275            h.record(2.0);
276            h.record(3.0);
277        });
278
279        let snap = snapshotter.snapshot().into_vec();
280        let key = CompositeKey::new(MetricKind::Histogram, Key::from_name("tsoracle.test.hist"));
281        assert!(
282            snap.iter().any(|(k, ..)| *k == key),
283            "histogram key not recorded"
284        );
285    }
286
287    #[test]
288    fn timestamp_zero_is_none() {
289        let t = ReporterTimestamp::new();
290        assert_eq!(t.snapshot(), None);
291    }
292
293    #[test]
294    fn timestamp_touch_records_now() {
295        let before = now_unix_ms();
296        let t = ReporterTimestamp::new();
297        t.touch_now();
298        let after = now_unix_ms();
299        let observed = t.snapshot().expect("touch_now should produce Some");
300        assert!(
301            observed >= before && observed <= after,
302            "timestamp {observed} not within [{before}, {after}]"
303        );
304    }
305
306    #[test]
307    #[cfg(not(feature = "metrics"))]
308    fn counter_works_without_metrics_feature() {
309        // With the `metrics` feature off, the increment body skips the
310        // recorder forward but the local atomic still bumps — the heartbeat
311        // path stays functional in default tsoracle-bin builds (which do not
312        // enable the `metrics` feature on tsoracle-server).
313        let c = ReporterCounter::new("tsoracle.test.no_metrics");
314        c.increment(3);
315        assert_eq!(c.snapshot(), 3);
316    }
317
318    #[test]
319    #[cfg(feature = "metrics")]
320    fn reporter_new_resolves_distinct_metric_names() {
321        use metrics::Key;
322        use metrics_util::CompositeKey;
323        use metrics_util::MetricKind;
324        use metrics_util::debugging::DebuggingRecorder;
325        use tsoracle_core::{Epoch, IgnoreReason};
326
327        let recorder = DebuggingRecorder::new();
328        let snapshotter = recorder.snapshotter();
329
330        metrics::with_local_recorder(&recorder, || {
331            let r = Reporter::new();
332            r.ignored_commits
333                .for_reason(IgnoreReason::NotLeader)
334                .increment(1);
335            r.ignored_commits
336                .for_reason(IgnoreReason::EpochMismatch {
337                    expected: Epoch(1),
338                    current: Epoch(2),
339                })
340                .increment(1);
341            r.ignored_commits
342                .for_reason(IgnoreReason::NotAdvanced {
343                    persisted: 1,
344                    committed: 2,
345                })
346                .increment(1);
347        });
348
349        let snap = snapshotter.snapshot().into_vec();
350        for name in [
351            "tsoracle.window.extensions.ignored.not_leader.total",
352            "tsoracle.window.extensions.ignored.epoch_mismatch.total",
353            "tsoracle.window.extensions.ignored.not_advanced.total",
354        ] {
355            let key = CompositeKey::new(MetricKind::Counter, Key::from_name(name));
356            assert!(
357                snap.iter().any(|(k, ..)| *k == key),
358                "expected metric {name} to be recorded distinctly"
359            );
360        }
361    }
362}