1#![allow(dead_code)]
36
37use std::sync::Arc;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::time::{Instant, SystemTime, UNIX_EPOCH};
40
41use tsoracle_core::IgnoreReason;
42
43pub struct ReporterCounter {
44 name: &'static str,
45 local: Arc<AtomicU64>,
46}
47
48impl ReporterCounter {
49 pub(crate) fn new(name: &'static str) -> Self {
50 Self {
51 name,
52 local: Arc::new(AtomicU64::new(0)),
53 }
54 }
55
56 pub(crate) fn increment(&self, n: u64) {
57 #[cfg(feature = "metrics")]
58 metrics::counter!(self.name).increment(n);
59 self.local.fetch_add(n, Ordering::Relaxed);
60 }
61
62 pub(crate) fn snapshot(&self) -> u64 {
63 self.local.load(Ordering::Relaxed)
64 }
65}
66
67pub struct ReporterHistogram {
68 name: &'static str,
69}
70
71impl ReporterHistogram {
72 pub(crate) fn new(name: &'static str) -> Self {
73 Self { name }
74 }
75
76 pub(crate) fn record(&self, _v: f64) {
77 #[cfg(feature = "metrics")]
78 metrics::histogram!(self.name).record(_v);
79 }
80}
81
82pub struct ReporterTimestamp {
83 local: Arc<AtomicU64>,
84}
85
86impl ReporterTimestamp {
87 pub(crate) fn new() -> Self {
88 Self {
89 local: Arc::new(AtomicU64::new(0)),
90 }
91 }
92
93 pub(crate) fn touch_now(&self) {
97 self.local.store(now_unix_ms(), Ordering::Relaxed);
98 }
99
100 pub(crate) fn snapshot(&self) -> Option<u64> {
102 let v = self.local.load(Ordering::Relaxed);
103 (v != 0).then_some(v)
104 }
105}
106
107pub struct IgnoredCommitsByReason {
108 pub not_leader: ReporterCounter,
109 pub epoch_mismatch: ReporterCounter,
110 pub not_advanced: ReporterCounter,
111}
112
113impl IgnoredCommitsByReason {
114 fn new() -> Self {
115 Self {
116 not_leader: ReporterCounter::new("tsoracle.window.extensions.ignored.not_leader.total"),
117 epoch_mismatch: ReporterCounter::new(
118 "tsoracle.window.extensions.ignored.epoch_mismatch.total",
119 ),
120 not_advanced: ReporterCounter::new(
121 "tsoracle.window.extensions.ignored.not_advanced.total",
122 ),
123 }
124 }
125
126 pub(crate) fn for_reason(&self, reason: IgnoreReason) -> &ReporterCounter {
127 match reason {
128 IgnoreReason::NotLeader => &self.not_leader,
129 IgnoreReason::EpochMismatch { .. } => &self.epoch_mismatch,
130 IgnoreReason::NotAdvanced { .. } => &self.not_advanced,
131 }
132 }
133}
134
135pub struct Reporter {
136 pub get_ts_requests: ReporterCounter,
138 pub get_ts_success: ReporterCounter,
139 pub timestamps_issued: ReporterCounter,
140
141 pub not_leader: ReporterCounter,
143 pub leader_transitions: ReporterCounter,
144 pub fence_transient_retries: ReporterCounter,
145 pub fence_latency: ReporterHistogram,
146
147 pub window_extensions: ReporterCounter,
149 pub window_extension_latency: ReporterHistogram,
150 pub ignored_commits: IgnoredCommitsByReason,
151
152 pub shutdown_watch_aborted: ReporterCounter,
154 pub heartbeat_task_panicked: ReporterCounter,
155 pub last_leader_transition: ReporterTimestamp,
156 pub started_at: Instant,
157}
158
159impl Reporter {
160 pub fn new() -> Self {
161 Self {
162 get_ts_requests: ReporterCounter::new("tsoracle.get_ts.requests.total"),
163 get_ts_success: ReporterCounter::new("tsoracle.get_ts.success.total"),
164 timestamps_issued: ReporterCounter::new("tsoracle.get_ts.timestamps_issued"),
165 not_leader: ReporterCounter::new("tsoracle.not_leader.total"),
166 leader_transitions: ReporterCounter::new("tsoracle.leader_transition.total"),
167 fence_transient_retries: ReporterCounter::new(
168 "tsoracle.leader_transition.fence_transient_retries.total",
169 ),
170 fence_latency: ReporterHistogram::new("tsoracle.leader_transition.fence_latency"),
171 window_extensions: ReporterCounter::new("tsoracle.window.extensions.total"),
172 window_extension_latency: ReporterHistogram::new("tsoracle.window.extension_latency"),
173 ignored_commits: IgnoredCommitsByReason::new(),
174 shutdown_watch_aborted: ReporterCounter::new("tsoracle.shutdown.watch_aborted.total"),
175 heartbeat_task_panicked: ReporterCounter::new("tsoracle.heartbeat.task_panicked.total"),
176 last_leader_transition: ReporterTimestamp::new(),
177 started_at: Instant::now(),
178 }
179 }
180
181 #[cfg(any(test, feature = "test-support"))]
185 pub fn for_tests() -> Self {
186 Self::new()
187 }
188}
189
190impl Default for Reporter {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196pub(crate) fn now_unix_ms() -> u64 {
197 SystemTime::now()
198 .duration_since(UNIX_EPOCH)
199 .map(|d| d.as_millis() as u64)
200 .unwrap_or(0)
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 #[test]
208 fn counter_increment_bumps_local_snapshot() {
209 let c = ReporterCounter::new("tsoracle.test.local");
210 assert_eq!(c.snapshot(), 0);
211 c.increment(1);
212 c.increment(4);
213 assert_eq!(c.snapshot(), 5);
214 }
215
216 #[test]
217 #[cfg(feature = "metrics")]
218 fn counter_increment_forwards_to_metrics_recorder() {
219 use metrics::Key;
220 use metrics_util::CompositeKey;
221 use metrics_util::MetricKind;
222 use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
223
224 let recorder = DebuggingRecorder::new();
225 let snapshotter: Snapshotter = recorder.snapshotter();
226
227 metrics::with_local_recorder(&recorder, || {
228 let c = ReporterCounter::new("tsoracle.test.forwarded");
229 c.increment(7);
230 });
231
232 let snap = snapshotter.snapshot().into_vec();
233 let key = CompositeKey::new(
234 MetricKind::Counter,
235 Key::from_name("tsoracle.test.forwarded"),
236 );
237 let entry = snap
238 .iter()
239 .find(|(k, ..)| *k == key)
240 .expect("counter not recorded by DebuggingRecorder");
241 let (_, _, _, value) = entry;
242 assert_eq!(
243 format!("{value:?}"),
244 "Counter(7)",
245 "expected the recorder to observe Counter(7), got {value:?}"
246 );
247 }
248
249 #[test]
250 #[cfg(feature = "metrics")]
251 fn histogram_record_forwards_to_metrics_recorder() {
252 use metrics::Key;
253 use metrics_util::CompositeKey;
254 use metrics_util::MetricKind;
255 use metrics_util::debugging::DebuggingRecorder;
256
257 let recorder = DebuggingRecorder::new();
258 let snapshotter = recorder.snapshotter();
259
260 metrics::with_local_recorder(&recorder, || {
261 let h = ReporterHistogram::new("tsoracle.test.hist");
262 h.record(1.0);
263 h.record(2.0);
264 h.record(3.0);
265 });
266
267 let snap = snapshotter.snapshot().into_vec();
268 let key = CompositeKey::new(MetricKind::Histogram, Key::from_name("tsoracle.test.hist"));
269 assert!(
270 snap.iter().any(|(k, ..)| *k == key),
271 "histogram key not recorded"
272 );
273 }
274
275 #[test]
276 fn timestamp_zero_is_none() {
277 let t = ReporterTimestamp::new();
278 assert_eq!(t.snapshot(), None);
279 }
280
281 #[test]
282 fn timestamp_touch_records_now() {
283 let before = now_unix_ms();
284 let t = ReporterTimestamp::new();
285 t.touch_now();
286 let after = now_unix_ms();
287 let observed = t.snapshot().expect("touch_now should produce Some");
288 assert!(
289 observed >= before && observed <= after,
290 "timestamp {observed} not within [{before}, {after}]"
291 );
292 }
293
294 #[test]
295 #[cfg(not(feature = "metrics"))]
296 fn counter_works_without_metrics_feature() {
297 let c = ReporterCounter::new("tsoracle.test.no_metrics");
302 c.increment(3);
303 assert_eq!(c.snapshot(), 3);
304 }
305
306 #[test]
307 #[cfg(feature = "metrics")]
308 fn reporter_new_resolves_distinct_metric_names() {
309 use metrics::Key;
310 use metrics_util::CompositeKey;
311 use metrics_util::MetricKind;
312 use metrics_util::debugging::DebuggingRecorder;
313 use tsoracle_core::{Epoch, IgnoreReason};
314
315 let recorder = DebuggingRecorder::new();
316 let snapshotter = recorder.snapshotter();
317
318 metrics::with_local_recorder(&recorder, || {
319 let r = Reporter::new();
320 r.ignored_commits
321 .for_reason(IgnoreReason::NotLeader)
322 .increment(1);
323 r.ignored_commits
324 .for_reason(IgnoreReason::EpochMismatch {
325 expected: Epoch(1),
326 current: Epoch(2),
327 })
328 .increment(1);
329 r.ignored_commits
330 .for_reason(IgnoreReason::NotAdvanced {
331 persisted: 1,
332 committed: 2,
333 })
334 .increment(1);
335 });
336
337 let snap = snapshotter.snapshot().into_vec();
338 for name in [
339 "tsoracle.window.extensions.ignored.not_leader.total",
340 "tsoracle.window.extensions.ignored.epoch_mismatch.total",
341 "tsoracle.window.extensions.ignored.not_advanced.total",
342 ] {
343 let key = CompositeKey::new(MetricKind::Counter, Key::from_name(name));
344 assert!(
345 snap.iter().any(|(k, ..)| *k == key),
346 "expected metric {name} to be recorded distinctly"
347 );
348 }
349 }
350}