1#![allow(dead_code)]
36
37use std::sync::Arc;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::time::{Instant, SystemTime, UNIX_EPOCH};
40
41use tsoracle_core::IgnoreReason;
42
43pub struct ReporterCounter {
44 name: &'static str,
45 local: Arc<AtomicU64>,
46}
47
48impl ReporterCounter {
49 pub(crate) fn new(name: &'static str) -> Self {
50 Self {
51 name,
52 local: Arc::new(AtomicU64::new(0)),
53 }
54 }
55
56 pub(crate) fn increment(&self, n: u64) {
57 #[cfg(feature = "metrics")]
58 metrics::counter!(self.name).increment(n);
59 self.local.fetch_add(n, Ordering::Relaxed);
60 }
61
62 pub(crate) fn snapshot(&self) -> u64 {
63 self.local.load(Ordering::Relaxed)
64 }
65}
66
67pub struct ReporterHistogram {
68 name: &'static str,
69}
70
71impl ReporterHistogram {
72 pub(crate) fn new(name: &'static str) -> Self {
73 Self { name }
74 }
75
76 pub(crate) fn record(&self, _v: f64) {
77 #[cfg(feature = "metrics")]
78 metrics::histogram!(self.name).record(_v);
79 }
80}
81
82pub struct ReporterTimestamp {
83 local: Arc<AtomicU64>,
84}
85
86impl ReporterTimestamp {
87 pub(crate) fn new() -> Self {
88 Self {
89 local: Arc::new(AtomicU64::new(0)),
90 }
91 }
92
93 pub(crate) fn touch_now(&self) {
97 self.local.store(now_unix_ms(), Ordering::Relaxed);
98 }
99
100 pub(crate) fn snapshot(&self) -> Option<u64> {
102 let v = self.local.load(Ordering::Relaxed);
103 (v != 0).then_some(v)
104 }
105}
106
107pub struct IgnoredCommitsByReason {
108 pub not_leader: ReporterCounter,
109 pub epoch_mismatch: ReporterCounter,
110 pub not_advanced: ReporterCounter,
111}
112
113impl IgnoredCommitsByReason {
114 fn new() -> Self {
115 Self {
116 not_leader: ReporterCounter::new("tsoracle.window.extensions.ignored.not_leader.total"),
117 epoch_mismatch: ReporterCounter::new(
118 "tsoracle.window.extensions.ignored.epoch_mismatch.total",
119 ),
120 not_advanced: ReporterCounter::new(
121 "tsoracle.window.extensions.ignored.not_advanced.total",
122 ),
123 }
124 }
125
126 pub(crate) fn for_reason(&self, reason: IgnoreReason) -> &ReporterCounter {
127 match reason {
128 IgnoreReason::NotLeader => &self.not_leader,
129 IgnoreReason::EpochMismatch { .. } => &self.epoch_mismatch,
130 IgnoreReason::NotAdvanced { .. } => &self.not_advanced,
131 }
132 }
133}
134
135pub struct Reporter {
136 pub get_ts_requests: ReporterCounter,
138 pub get_ts_success: ReporterCounter,
139 pub timestamps_issued: ReporterCounter,
140
141 pub get_seq_requests: ReporterCounter,
143 pub get_seq_success: ReporterCounter,
144 pub seq_values_issued: ReporterCounter,
145 pub seq_cardinality_rejected: ReporterCounter,
146
147 pub get_seq_batch_requests: ReporterCounter,
152 pub get_seq_batch_success: ReporterCounter,
153 pub seq_batch_keys: ReporterHistogram,
154 pub seq_batch_values_issued: ReporterCounter,
155 pub seq_batch_cardinality_rejected: ReporterCounter,
156
157 pub not_leader: ReporterCounter,
159 pub leader_transitions: ReporterCounter,
160 pub fence_transient_retries: ReporterCounter,
161 pub fence_latency: ReporterHistogram,
162
163 pub window_extensions: ReporterCounter,
165 pub window_extension_latency: ReporterHistogram,
166 pub ignored_commits: IgnoredCommitsByReason,
167
168 pub shutdown_watch_aborted: ReporterCounter,
170 pub heartbeat_task_panicked: ReporterCounter,
171 pub last_leader_transition: ReporterTimestamp,
172 pub started_at: Instant,
173}
174
175impl Reporter {
176 pub fn new() -> Self {
177 Self {
178 get_ts_requests: ReporterCounter::new("tsoracle.get_ts.requests.total"),
179 get_ts_success: ReporterCounter::new("tsoracle.get_ts.success.total"),
180 timestamps_issued: ReporterCounter::new("tsoracle.get_ts.timestamps_issued"),
181 get_seq_requests: ReporterCounter::new("tsoracle.get_seq.requests.total"),
182 get_seq_success: ReporterCounter::new("tsoracle.get_seq.success.total"),
183 seq_values_issued: ReporterCounter::new("tsoracle.get_seq.values_issued"),
184 seq_cardinality_rejected: ReporterCounter::new(
185 "tsoracle.get_seq.cardinality_rejected.total",
186 ),
187 get_seq_batch_requests: ReporterCounter::new("tsoracle.get_seq.batch.requests.total"),
188 get_seq_batch_success: ReporterCounter::new("tsoracle.get_seq.batch.success.total"),
189 seq_batch_keys: ReporterHistogram::new("tsoracle.get_seq.batch.keys"),
190 seq_batch_values_issued: ReporterCounter::new("tsoracle.get_seq.batch.values_issued"),
191 seq_batch_cardinality_rejected: ReporterCounter::new(
192 "tsoracle.get_seq.batch.cardinality_rejected.total",
193 ),
194 not_leader: ReporterCounter::new("tsoracle.not_leader.total"),
195 leader_transitions: ReporterCounter::new("tsoracle.leader_transition.total"),
196 fence_transient_retries: ReporterCounter::new(
197 "tsoracle.leader_transition.fence_transient_retries.total",
198 ),
199 fence_latency: ReporterHistogram::new("tsoracle.leader_transition.fence_latency"),
200 window_extensions: ReporterCounter::new("tsoracle.window.extensions.total"),
201 window_extension_latency: ReporterHistogram::new("tsoracle.window.extension_latency"),
202 ignored_commits: IgnoredCommitsByReason::new(),
203 shutdown_watch_aborted: ReporterCounter::new("tsoracle.shutdown.watch_aborted.total"),
204 heartbeat_task_panicked: ReporterCounter::new("tsoracle.heartbeat.task_panicked.total"),
205 last_leader_transition: ReporterTimestamp::new(),
206 started_at: Instant::now(),
207 }
208 }
209
210 #[cfg(any(test, feature = "test-support"))]
214 pub fn for_tests() -> Self {
215 Self::new()
216 }
217}
218
219impl Default for Reporter {
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225pub(crate) fn now_unix_ms() -> u64 {
226 SystemTime::now()
227 .duration_since(UNIX_EPOCH)
228 .map(|d| d.as_millis() as u64)
229 .unwrap_or(0)
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn counter_increment_bumps_local_snapshot() {
238 let c = ReporterCounter::new("tsoracle.test.local");
239 assert_eq!(c.snapshot(), 0);
240 c.increment(1);
241 c.increment(4);
242 assert_eq!(c.snapshot(), 5);
243 }
244
245 #[test]
246 #[cfg(feature = "metrics")]
247 fn counter_increment_forwards_to_metrics_recorder() {
248 use metrics::Key;
249 use metrics_util::CompositeKey;
250 use metrics_util::MetricKind;
251 use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
252
253 let recorder = DebuggingRecorder::new();
254 let snapshotter: Snapshotter = recorder.snapshotter();
255
256 metrics::with_local_recorder(&recorder, || {
257 let c = ReporterCounter::new("tsoracle.test.forwarded");
258 c.increment(7);
259 });
260
261 let snap = snapshotter.snapshot().into_vec();
262 let key = CompositeKey::new(
263 MetricKind::Counter,
264 Key::from_name("tsoracle.test.forwarded"),
265 );
266 let entry = snap
267 .iter()
268 .find(|(k, ..)| *k == key)
269 .expect("counter not recorded by DebuggingRecorder");
270 let (_, _, _, value) = entry;
271 assert_eq!(
272 format!("{value:?}"),
273 "Counter(7)",
274 "expected the recorder to observe Counter(7), got {value:?}"
275 );
276 }
277
278 #[test]
279 #[cfg(feature = "metrics")]
280 fn histogram_record_forwards_to_metrics_recorder() {
281 use metrics::Key;
282 use metrics_util::CompositeKey;
283 use metrics_util::MetricKind;
284 use metrics_util::debugging::DebuggingRecorder;
285
286 let recorder = DebuggingRecorder::new();
287 let snapshotter = recorder.snapshotter();
288
289 metrics::with_local_recorder(&recorder, || {
290 let h = ReporterHistogram::new("tsoracle.test.hist");
291 h.record(1.0);
292 h.record(2.0);
293 h.record(3.0);
294 });
295
296 let snap = snapshotter.snapshot().into_vec();
297 let key = CompositeKey::new(MetricKind::Histogram, Key::from_name("tsoracle.test.hist"));
298 assert!(
299 snap.iter().any(|(k, ..)| *k == key),
300 "histogram key not recorded"
301 );
302 }
303
304 #[test]
305 fn timestamp_zero_is_none() {
306 let t = ReporterTimestamp::new();
307 assert_eq!(t.snapshot(), None);
308 }
309
310 #[test]
311 fn timestamp_touch_records_now() {
312 let before = now_unix_ms();
313 let t = ReporterTimestamp::new();
314 t.touch_now();
315 let after = now_unix_ms();
316 let observed = t.snapshot().expect("touch_now should produce Some");
317 assert!(
318 observed >= before && observed <= after,
319 "timestamp {observed} not within [{before}, {after}]"
320 );
321 }
322
323 #[test]
324 #[cfg(not(feature = "metrics"))]
325 fn counter_works_without_metrics_feature() {
326 let c = ReporterCounter::new("tsoracle.test.no_metrics");
331 c.increment(3);
332 assert_eq!(c.snapshot(), 3);
333 }
334
335 #[test]
336 #[cfg(feature = "metrics")]
337 fn reporter_new_resolves_distinct_metric_names() {
338 use metrics::Key;
339 use metrics_util::CompositeKey;
340 use metrics_util::MetricKind;
341 use metrics_util::debugging::DebuggingRecorder;
342 use tsoracle_core::{Epoch, IgnoreReason};
343
344 let recorder = DebuggingRecorder::new();
345 let snapshotter = recorder.snapshotter();
346
347 metrics::with_local_recorder(&recorder, || {
348 let r = Reporter::new();
349 r.ignored_commits
350 .for_reason(IgnoreReason::NotLeader)
351 .increment(1);
352 r.ignored_commits
353 .for_reason(IgnoreReason::EpochMismatch {
354 expected: Epoch(1),
355 current: Epoch(2),
356 })
357 .increment(1);
358 r.ignored_commits
359 .for_reason(IgnoreReason::NotAdvanced {
360 persisted: 1,
361 committed: 2,
362 })
363 .increment(1);
364 });
365
366 let snap = snapshotter.snapshot().into_vec();
367 for name in [
368 "tsoracle.window.extensions.ignored.not_leader.total",
369 "tsoracle.window.extensions.ignored.epoch_mismatch.total",
370 "tsoracle.window.extensions.ignored.not_advanced.total",
371 ] {
372 let key = CompositeKey::new(MetricKind::Counter, Key::from_name(name));
373 assert!(
374 snap.iter().any(|(k, ..)| *k == key),
375 "expected metric {name} to be recorded distinctly"
376 );
377 }
378 }
379}