1#![allow(dead_code)]
36
37use std::sync::Arc;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::time::{Instant, SystemTime, UNIX_EPOCH};
40
41use tsoracle_core::IgnoreReason;
42
43pub struct ReporterCounter {
44 name: &'static str,
45 local: Arc<AtomicU64>,
46}
47
48impl ReporterCounter {
49 pub(crate) fn new(name: &'static str) -> Self {
50 Self {
51 name,
52 local: Arc::new(AtomicU64::new(0)),
53 }
54 }
55
56 pub(crate) fn increment(&self, n: u64) {
57 #[cfg(feature = "metrics")]
58 metrics::counter!(self.name).increment(n);
59 self.local.fetch_add(n, Ordering::Relaxed);
60 }
61
62 pub(crate) fn snapshot(&self) -> u64 {
63 self.local.load(Ordering::Relaxed)
64 }
65}
66
67pub struct ReporterHistogram {
68 name: &'static str,
69}
70
71impl ReporterHistogram {
72 pub(crate) fn new(name: &'static str) -> Self {
73 Self { name }
74 }
75
76 pub(crate) fn record(&self, _v: f64) {
77 #[cfg(feature = "metrics")]
78 metrics::histogram!(self.name).record(_v);
79 }
80}
81
82pub struct ReporterTimestamp {
83 local: Arc<AtomicU64>,
84}
85
86impl ReporterTimestamp {
87 pub(crate) fn new() -> Self {
88 Self {
89 local: Arc::new(AtomicU64::new(0)),
90 }
91 }
92
93 pub(crate) fn touch_now(&self) {
97 self.local.store(now_unix_ms(), Ordering::Relaxed);
98 }
99
100 pub(crate) fn snapshot(&self) -> Option<u64> {
102 let v = self.local.load(Ordering::Relaxed);
103 (v != 0).then_some(v)
104 }
105}
106
107pub struct IgnoredCommitsByReason {
108 pub not_leader: ReporterCounter,
109 pub epoch_mismatch: ReporterCounter,
110 pub not_advanced: ReporterCounter,
111}
112
113impl IgnoredCommitsByReason {
114 fn new() -> Self {
115 Self {
116 not_leader: ReporterCounter::new("tsoracle.window.extensions.ignored.not_leader.total"),
117 epoch_mismatch: ReporterCounter::new(
118 "tsoracle.window.extensions.ignored.epoch_mismatch.total",
119 ),
120 not_advanced: ReporterCounter::new(
121 "tsoracle.window.extensions.ignored.not_advanced.total",
122 ),
123 }
124 }
125
126 pub(crate) fn for_reason(&self, reason: IgnoreReason) -> &ReporterCounter {
127 match reason {
128 IgnoreReason::NotLeader => &self.not_leader,
129 IgnoreReason::EpochMismatch { .. } => &self.epoch_mismatch,
130 IgnoreReason::NotAdvanced { .. } => &self.not_advanced,
131 }
132 }
133}
134
135pub struct Reporter {
136 pub get_ts_requests: ReporterCounter,
138 pub get_ts_success: ReporterCounter,
139 pub timestamps_issued: ReporterCounter,
140
141 pub get_seq_requests: ReporterCounter,
143 pub get_seq_success: ReporterCounter,
144 pub seq_values_issued: ReporterCounter,
145 pub seq_cardinality_rejected: ReporterCounter,
146
147 pub not_leader: ReporterCounter,
149 pub leader_transitions: ReporterCounter,
150 pub fence_transient_retries: ReporterCounter,
151 pub fence_latency: ReporterHistogram,
152
153 pub window_extensions: ReporterCounter,
155 pub window_extension_latency: ReporterHistogram,
156 pub ignored_commits: IgnoredCommitsByReason,
157
158 pub shutdown_watch_aborted: ReporterCounter,
160 pub heartbeat_task_panicked: ReporterCounter,
161 pub last_leader_transition: ReporterTimestamp,
162 pub started_at: Instant,
163}
164
165impl Reporter {
166 pub fn new() -> Self {
167 Self {
168 get_ts_requests: ReporterCounter::new("tsoracle.get_ts.requests.total"),
169 get_ts_success: ReporterCounter::new("tsoracle.get_ts.success.total"),
170 timestamps_issued: ReporterCounter::new("tsoracle.get_ts.timestamps_issued"),
171 get_seq_requests: ReporterCounter::new("tsoracle.get_seq.requests.total"),
172 get_seq_success: ReporterCounter::new("tsoracle.get_seq.success.total"),
173 seq_values_issued: ReporterCounter::new("tsoracle.get_seq.values_issued"),
174 seq_cardinality_rejected: ReporterCounter::new(
175 "tsoracle.get_seq.cardinality_rejected.total",
176 ),
177 not_leader: ReporterCounter::new("tsoracle.not_leader.total"),
178 leader_transitions: ReporterCounter::new("tsoracle.leader_transition.total"),
179 fence_transient_retries: ReporterCounter::new(
180 "tsoracle.leader_transition.fence_transient_retries.total",
181 ),
182 fence_latency: ReporterHistogram::new("tsoracle.leader_transition.fence_latency"),
183 window_extensions: ReporterCounter::new("tsoracle.window.extensions.total"),
184 window_extension_latency: ReporterHistogram::new("tsoracle.window.extension_latency"),
185 ignored_commits: IgnoredCommitsByReason::new(),
186 shutdown_watch_aborted: ReporterCounter::new("tsoracle.shutdown.watch_aborted.total"),
187 heartbeat_task_panicked: ReporterCounter::new("tsoracle.heartbeat.task_panicked.total"),
188 last_leader_transition: ReporterTimestamp::new(),
189 started_at: Instant::now(),
190 }
191 }
192
193 #[cfg(any(test, feature = "test-support"))]
197 pub fn for_tests() -> Self {
198 Self::new()
199 }
200}
201
202impl Default for Reporter {
203 fn default() -> Self {
204 Self::new()
205 }
206}
207
208pub(crate) fn now_unix_ms() -> u64 {
209 SystemTime::now()
210 .duration_since(UNIX_EPOCH)
211 .map(|d| d.as_millis() as u64)
212 .unwrap_or(0)
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218
219 #[test]
220 fn counter_increment_bumps_local_snapshot() {
221 let c = ReporterCounter::new("tsoracle.test.local");
222 assert_eq!(c.snapshot(), 0);
223 c.increment(1);
224 c.increment(4);
225 assert_eq!(c.snapshot(), 5);
226 }
227
228 #[test]
229 #[cfg(feature = "metrics")]
230 fn counter_increment_forwards_to_metrics_recorder() {
231 use metrics::Key;
232 use metrics_util::CompositeKey;
233 use metrics_util::MetricKind;
234 use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
235
236 let recorder = DebuggingRecorder::new();
237 let snapshotter: Snapshotter = recorder.snapshotter();
238
239 metrics::with_local_recorder(&recorder, || {
240 let c = ReporterCounter::new("tsoracle.test.forwarded");
241 c.increment(7);
242 });
243
244 let snap = snapshotter.snapshot().into_vec();
245 let key = CompositeKey::new(
246 MetricKind::Counter,
247 Key::from_name("tsoracle.test.forwarded"),
248 );
249 let entry = snap
250 .iter()
251 .find(|(k, ..)| *k == key)
252 .expect("counter not recorded by DebuggingRecorder");
253 let (_, _, _, value) = entry;
254 assert_eq!(
255 format!("{value:?}"),
256 "Counter(7)",
257 "expected the recorder to observe Counter(7), got {value:?}"
258 );
259 }
260
261 #[test]
262 #[cfg(feature = "metrics")]
263 fn histogram_record_forwards_to_metrics_recorder() {
264 use metrics::Key;
265 use metrics_util::CompositeKey;
266 use metrics_util::MetricKind;
267 use metrics_util::debugging::DebuggingRecorder;
268
269 let recorder = DebuggingRecorder::new();
270 let snapshotter = recorder.snapshotter();
271
272 metrics::with_local_recorder(&recorder, || {
273 let h = ReporterHistogram::new("tsoracle.test.hist");
274 h.record(1.0);
275 h.record(2.0);
276 h.record(3.0);
277 });
278
279 let snap = snapshotter.snapshot().into_vec();
280 let key = CompositeKey::new(MetricKind::Histogram, Key::from_name("tsoracle.test.hist"));
281 assert!(
282 snap.iter().any(|(k, ..)| *k == key),
283 "histogram key not recorded"
284 );
285 }
286
287 #[test]
288 fn timestamp_zero_is_none() {
289 let t = ReporterTimestamp::new();
290 assert_eq!(t.snapshot(), None);
291 }
292
293 #[test]
294 fn timestamp_touch_records_now() {
295 let before = now_unix_ms();
296 let t = ReporterTimestamp::new();
297 t.touch_now();
298 let after = now_unix_ms();
299 let observed = t.snapshot().expect("touch_now should produce Some");
300 assert!(
301 observed >= before && observed <= after,
302 "timestamp {observed} not within [{before}, {after}]"
303 );
304 }
305
306 #[test]
307 #[cfg(not(feature = "metrics"))]
308 fn counter_works_without_metrics_feature() {
309 let c = ReporterCounter::new("tsoracle.test.no_metrics");
314 c.increment(3);
315 assert_eq!(c.snapshot(), 3);
316 }
317
318 #[test]
319 #[cfg(feature = "metrics")]
320 fn reporter_new_resolves_distinct_metric_names() {
321 use metrics::Key;
322 use metrics_util::CompositeKey;
323 use metrics_util::MetricKind;
324 use metrics_util::debugging::DebuggingRecorder;
325 use tsoracle_core::{Epoch, IgnoreReason};
326
327 let recorder = DebuggingRecorder::new();
328 let snapshotter = recorder.snapshotter();
329
330 metrics::with_local_recorder(&recorder, || {
331 let r = Reporter::new();
332 r.ignored_commits
333 .for_reason(IgnoreReason::NotLeader)
334 .increment(1);
335 r.ignored_commits
336 .for_reason(IgnoreReason::EpochMismatch {
337 expected: Epoch(1),
338 current: Epoch(2),
339 })
340 .increment(1);
341 r.ignored_commits
342 .for_reason(IgnoreReason::NotAdvanced {
343 persisted: 1,
344 committed: 2,
345 })
346 .increment(1);
347 });
348
349 let snap = snapshotter.snapshot().into_vec();
350 for name in [
351 "tsoracle.window.extensions.ignored.not_leader.total",
352 "tsoracle.window.extensions.ignored.epoch_mismatch.total",
353 "tsoracle.window.extensions.ignored.not_advanced.total",
354 ] {
355 let key = CompositeKey::new(MetricKind::Counter, Key::from_name(name));
356 assert!(
357 snap.iter().any(|(k, ..)| *k == key),
358 "expected metric {name} to be recorded distinctly"
359 );
360 }
361 }
362}