Skip to main content

tsoracle_server/
reporter.rs

1//
2//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6//  tsoracle — Distributed Timestamp Oracle
7//  https://www.tsoracle.rs
8//
9//  Copyright (c) 2026 Prisma Risk
10//
11//  Licensed under the Apache License, Version 2.0 (the "License");
12//  you may not use this file except in compliance with the License.
13//  You may obtain a copy of the License at
14//
15//      https://www.apache.org/licenses/LICENSE-2.0
16//
17//  Unless required by applicable law or agreed to in writing, software
18//  distributed under the License is distributed on an "AS IS" BASIS,
19//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20//  See the License for the specific language governing permissions and
21//  limitations under the License.
22//
23
24//! Typed metric facade for `tsoracle-server`.
25//!
26//! `Reporter` is the single place metric-name string literals live in this
27//! crate. Each typed counter/histogram field forwards an increment/record to
28//! both the global `metrics::` recorder (Prometheus path, unchanged) and a
29//! local `Arc<AtomicU64>` that the heartbeat task can read without depending
30//! on any installed recorder.
31//!
32//! **Scaffold note**: items are introduced here and wired up in later plan
33//! tasks (T3–T11). `#![allow(dead_code)]` is present for that reason and
34//! will become unnecessary once the call sites land in Task 7.
35#![allow(dead_code)]
36
37use std::sync::Arc;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::time::{Instant, SystemTime, UNIX_EPOCH};
40
41use tsoracle_core::IgnoreReason;
42
43pub struct ReporterCounter {
44    name: &'static str,
45    local: Arc<AtomicU64>,
46}
47
48impl ReporterCounter {
49    pub(crate) fn new(name: &'static str) -> Self {
50        Self {
51            name,
52            local: Arc::new(AtomicU64::new(0)),
53        }
54    }
55
56    pub(crate) fn increment(&self, n: u64) {
57        #[cfg(feature = "metrics")]
58        metrics::counter!(self.name).increment(n);
59        self.local.fetch_add(n, Ordering::Relaxed);
60    }
61
62    pub(crate) fn snapshot(&self) -> u64 {
63        self.local.load(Ordering::Relaxed)
64    }
65}
66
67pub struct ReporterHistogram {
68    name: &'static str,
69}
70
71impl ReporterHistogram {
72    pub(crate) fn new(name: &'static str) -> Self {
73        Self { name }
74    }
75
76    pub(crate) fn record(&self, _v: f64) {
77        #[cfg(feature = "metrics")]
78        metrics::histogram!(self.name).record(_v);
79    }
80}
81
82pub struct ReporterTimestamp {
83    local: Arc<AtomicU64>,
84}
85
86impl ReporterTimestamp {
87    pub(crate) fn new() -> Self {
88        Self {
89            local: Arc::new(AtomicU64::new(0)),
90        }
91    }
92
93    /// Record `SystemTime::now()` as unix-ms. Idempotent under concurrent
94    /// callers — the last writer wins under `Relaxed`, which matches the
95    /// "approximate last transition time" semantics the heartbeat needs.
96    pub(crate) fn touch_now(&self) {
97        self.local.store(now_unix_ms(), Ordering::Relaxed);
98    }
99
100    /// `None` if never touched, else the most recent unix-ms.
101    pub(crate) fn snapshot(&self) -> Option<u64> {
102        let v = self.local.load(Ordering::Relaxed);
103        (v != 0).then_some(v)
104    }
105}
106
107pub struct IgnoredCommitsByReason {
108    pub not_leader: ReporterCounter,
109    pub epoch_mismatch: ReporterCounter,
110    pub not_advanced: ReporterCounter,
111}
112
113impl IgnoredCommitsByReason {
114    fn new() -> Self {
115        Self {
116            not_leader: ReporterCounter::new("tsoracle.window.extensions.ignored.not_leader.total"),
117            epoch_mismatch: ReporterCounter::new(
118                "tsoracle.window.extensions.ignored.epoch_mismatch.total",
119            ),
120            not_advanced: ReporterCounter::new(
121                "tsoracle.window.extensions.ignored.not_advanced.total",
122            ),
123        }
124    }
125
126    pub(crate) fn for_reason(&self, reason: IgnoreReason) -> &ReporterCounter {
127        match reason {
128            IgnoreReason::NotLeader => &self.not_leader,
129            IgnoreReason::EpochMismatch { .. } => &self.epoch_mismatch,
130            IgnoreReason::NotAdvanced { .. } => &self.not_advanced,
131        }
132    }
133}
134
135pub struct Reporter {
136    // get_ts hot path
137    pub get_ts_requests: ReporterCounter,
138    pub get_ts_success: ReporterCounter,
139    pub timestamps_issued: ReporterCounter,
140
141    // get_seq hot path
142    pub get_seq_requests: ReporterCounter,
143    pub get_seq_success: ReporterCounter,
144    pub seq_values_issued: ReporterCounter,
145    pub seq_cardinality_rejected: ReporterCounter,
146
147    // get_seq_batch hot path. Batch keeps its OWN values/cardinality counters
148    // (rather than sharing the single-key ones) so the `tsoracle.get_seq.*`
149    // single-key series stays free of batch traffic and each RPC is separately
150    // observable.
151    pub get_seq_batch_requests: ReporterCounter,
152    pub get_seq_batch_success: ReporterCounter,
153    pub seq_batch_keys: ReporterHistogram,
154    pub seq_batch_values_issued: ReporterCounter,
155    pub seq_batch_cardinality_rejected: ReporterCounter,
156
157    // leader / fence
158    pub not_leader: ReporterCounter,
159    pub leader_transitions: ReporterCounter,
160    pub fence_transient_retries: ReporterCounter,
161    pub fence_latency: ReporterHistogram,
162
163    // window
164    pub window_extensions: ReporterCounter,
165    pub window_extension_latency: ReporterHistogram,
166    pub ignored_commits: IgnoredCommitsByReason,
167
168    // lifecycle
169    pub shutdown_watch_aborted: ReporterCounter,
170    pub heartbeat_task_panicked: ReporterCounter,
171    pub last_leader_transition: ReporterTimestamp,
172    pub started_at: Instant,
173}
174
175impl Reporter {
176    pub fn new() -> Self {
177        Self {
178            get_ts_requests: ReporterCounter::new("tsoracle.get_ts.requests.total"),
179            get_ts_success: ReporterCounter::new("tsoracle.get_ts.success.total"),
180            timestamps_issued: ReporterCounter::new("tsoracle.get_ts.timestamps_issued"),
181            get_seq_requests: ReporterCounter::new("tsoracle.get_seq.requests.total"),
182            get_seq_success: ReporterCounter::new("tsoracle.get_seq.success.total"),
183            seq_values_issued: ReporterCounter::new("tsoracle.get_seq.values_issued"),
184            seq_cardinality_rejected: ReporterCounter::new(
185                "tsoracle.get_seq.cardinality_rejected.total",
186            ),
187            get_seq_batch_requests: ReporterCounter::new("tsoracle.get_seq.batch.requests.total"),
188            get_seq_batch_success: ReporterCounter::new("tsoracle.get_seq.batch.success.total"),
189            seq_batch_keys: ReporterHistogram::new("tsoracle.get_seq.batch.keys"),
190            seq_batch_values_issued: ReporterCounter::new("tsoracle.get_seq.batch.values_issued"),
191            seq_batch_cardinality_rejected: ReporterCounter::new(
192                "tsoracle.get_seq.batch.cardinality_rejected.total",
193            ),
194            not_leader: ReporterCounter::new("tsoracle.not_leader.total"),
195            leader_transitions: ReporterCounter::new("tsoracle.leader_transition.total"),
196            fence_transient_retries: ReporterCounter::new(
197                "tsoracle.leader_transition.fence_transient_retries.total",
198            ),
199            fence_latency: ReporterHistogram::new("tsoracle.leader_transition.fence_latency"),
200            window_extensions: ReporterCounter::new("tsoracle.window.extensions.total"),
201            window_extension_latency: ReporterHistogram::new("tsoracle.window.extension_latency"),
202            ignored_commits: IgnoredCommitsByReason::new(),
203            shutdown_watch_aborted: ReporterCounter::new("tsoracle.shutdown.watch_aborted.total"),
204            heartbeat_task_panicked: ReporterCounter::new("tsoracle.heartbeat.task_panicked.total"),
205            last_leader_transition: ReporterTimestamp::new(),
206            started_at: Instant::now(),
207        }
208    }
209
210    /// Convenience constructor for unit tests. Same as `Reporter::new()`; the
211    /// dedicated name documents intent at call sites (tests that just need a
212    /// Reporter to satisfy a struct field).
213    #[cfg(any(test, feature = "test-support"))]
214    pub fn for_tests() -> Self {
215        Self::new()
216    }
217}
218
219impl Default for Reporter {
220    fn default() -> Self {
221        Self::new()
222    }
223}
224
225pub(crate) fn now_unix_ms() -> u64 {
226    SystemTime::now()
227        .duration_since(UNIX_EPOCH)
228        .map(|d| d.as_millis() as u64)
229        .unwrap_or(0)
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn counter_increment_bumps_local_snapshot() {
238        let c = ReporterCounter::new("tsoracle.test.local");
239        assert_eq!(c.snapshot(), 0);
240        c.increment(1);
241        c.increment(4);
242        assert_eq!(c.snapshot(), 5);
243    }
244
245    #[test]
246    #[cfg(feature = "metrics")]
247    fn counter_increment_forwards_to_metrics_recorder() {
248        use metrics::Key;
249        use metrics_util::CompositeKey;
250        use metrics_util::MetricKind;
251        use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
252
253        let recorder = DebuggingRecorder::new();
254        let snapshotter: Snapshotter = recorder.snapshotter();
255
256        metrics::with_local_recorder(&recorder, || {
257            let c = ReporterCounter::new("tsoracle.test.forwarded");
258            c.increment(7);
259        });
260
261        let snap = snapshotter.snapshot().into_vec();
262        let key = CompositeKey::new(
263            MetricKind::Counter,
264            Key::from_name("tsoracle.test.forwarded"),
265        );
266        let entry = snap
267            .iter()
268            .find(|(k, ..)| *k == key)
269            .expect("counter not recorded by DebuggingRecorder");
270        let (_, _, _, value) = entry;
271        assert_eq!(
272            format!("{value:?}"),
273            "Counter(7)",
274            "expected the recorder to observe Counter(7), got {value:?}"
275        );
276    }
277
278    #[test]
279    #[cfg(feature = "metrics")]
280    fn histogram_record_forwards_to_metrics_recorder() {
281        use metrics::Key;
282        use metrics_util::CompositeKey;
283        use metrics_util::MetricKind;
284        use metrics_util::debugging::DebuggingRecorder;
285
286        let recorder = DebuggingRecorder::new();
287        let snapshotter = recorder.snapshotter();
288
289        metrics::with_local_recorder(&recorder, || {
290            let h = ReporterHistogram::new("tsoracle.test.hist");
291            h.record(1.0);
292            h.record(2.0);
293            h.record(3.0);
294        });
295
296        let snap = snapshotter.snapshot().into_vec();
297        let key = CompositeKey::new(MetricKind::Histogram, Key::from_name("tsoracle.test.hist"));
298        assert!(
299            snap.iter().any(|(k, ..)| *k == key),
300            "histogram key not recorded"
301        );
302    }
303
304    #[test]
305    fn timestamp_zero_is_none() {
306        let t = ReporterTimestamp::new();
307        assert_eq!(t.snapshot(), None);
308    }
309
310    #[test]
311    fn timestamp_touch_records_now() {
312        let before = now_unix_ms();
313        let t = ReporterTimestamp::new();
314        t.touch_now();
315        let after = now_unix_ms();
316        let observed = t.snapshot().expect("touch_now should produce Some");
317        assert!(
318            observed >= before && observed <= after,
319            "timestamp {observed} not within [{before}, {after}]"
320        );
321    }
322
323    #[test]
324    #[cfg(not(feature = "metrics"))]
325    fn counter_works_without_metrics_feature() {
326        // With the `metrics` feature off, the increment body skips the
327        // recorder forward but the local atomic still bumps — the heartbeat
328        // path stays functional in default tsoracle-bin builds (which do not
329        // enable the `metrics` feature on tsoracle-server).
330        let c = ReporterCounter::new("tsoracle.test.no_metrics");
331        c.increment(3);
332        assert_eq!(c.snapshot(), 3);
333    }
334
335    #[test]
336    #[cfg(feature = "metrics")]
337    fn reporter_new_resolves_distinct_metric_names() {
338        use metrics::Key;
339        use metrics_util::CompositeKey;
340        use metrics_util::MetricKind;
341        use metrics_util::debugging::DebuggingRecorder;
342        use tsoracle_core::{Epoch, IgnoreReason};
343
344        let recorder = DebuggingRecorder::new();
345        let snapshotter = recorder.snapshotter();
346
347        metrics::with_local_recorder(&recorder, || {
348            let r = Reporter::new();
349            r.ignored_commits
350                .for_reason(IgnoreReason::NotLeader)
351                .increment(1);
352            r.ignored_commits
353                .for_reason(IgnoreReason::EpochMismatch {
354                    expected: Epoch(1),
355                    current: Epoch(2),
356                })
357                .increment(1);
358            r.ignored_commits
359                .for_reason(IgnoreReason::NotAdvanced {
360                    persisted: 1,
361                    committed: 2,
362                })
363                .increment(1);
364        });
365
366        let snap = snapshotter.snapshot().into_vec();
367        for name in [
368            "tsoracle.window.extensions.ignored.not_leader.total",
369            "tsoracle.window.extensions.ignored.epoch_mismatch.total",
370            "tsoracle.window.extensions.ignored.not_advanced.total",
371        ] {
372            let key = CompositeKey::new(MetricKind::Counter, Key::from_name(name));
373            assert!(
374                snap.iter().any(|(k, ..)| *k == key),
375                "expected metric {name} to be recorded distinctly"
376            );
377        }
378    }
379}