Skip to main content

icydb_core/obs/
sink.rs

1//! Metrics sink boundary.
2//!
3//! Core DB logic MUST NOT depend on obs::metrics directly.
4//! All instrumentation flows through MetricsEvent and MetricsSink.
5//!
6//! This module is the only allowed bridge between execution logic
7//! and the global metrics state.
8use crate::{obs::metrics, traits::EntityKind};
9use std::{cell::RefCell, marker::PhantomData};
10
11thread_local! {
12    static SINK_OVERRIDE: RefCell<Option<*const dyn MetricsSink>> = RefCell::new(None);
13}
14
15///
16/// ExecKind
17///
18
19#[derive(Clone, Copy, Debug)]
20pub enum ExecKind {
21    Load,
22    Save,
23    Delete,
24}
25
26///
27/// PlanKind
28///
29
30#[derive(Clone, Copy, Debug)]
31pub enum PlanKind {
32    Keys,
33    Index,
34    Range,
35    FullScan,
36}
37
38///
39/// MetricsEvent
40///
41
42#[derive(Clone, Copy, Debug)]
43pub enum MetricsEvent {
44    ExecStart {
45        kind: ExecKind,
46        entity_path: &'static str,
47    },
48    ExecFinish {
49        kind: ExecKind,
50        entity_path: &'static str,
51        rows_touched: u64,
52        inst_delta: u64,
53    },
54    RowsScanned {
55        entity_path: &'static str,
56        rows_scanned: u64,
57    },
58    UniqueViolation {
59        entity_path: &'static str,
60    },
61    IndexDelta {
62        entity_path: &'static str,
63        inserts: u64,
64        removes: u64,
65    },
66    Plan {
67        kind: PlanKind,
68    },
69}
70
71///
72/// MetricsSink
73///
74
75pub trait MetricsSink {
76    fn record(&self, event: MetricsEvent);
77}
78
79///
80/// NoopMetricsSink
81///
82
83pub struct NoopMetricsSink;
84
85impl MetricsSink for NoopMetricsSink {
86    fn record(&self, _: MetricsEvent) {}
87}
88
89///
90/// GlobalMetricsSink
91///
92
93pub struct GlobalMetricsSink;
94
95impl MetricsSink for GlobalMetricsSink {
96    #[expect(clippy::too_many_lines)]
97    fn record(&self, event: MetricsEvent) {
98        match event {
99            MetricsEvent::ExecStart { kind, entity_path } => {
100                metrics::with_state_mut(|m| {
101                    match kind {
102                        ExecKind::Load => m.ops.load_calls = m.ops.load_calls.saturating_add(1),
103                        ExecKind::Save => m.ops.save_calls = m.ops.save_calls.saturating_add(1),
104                        ExecKind::Delete => {
105                            m.ops.delete_calls = m.ops.delete_calls.saturating_add(1);
106                        }
107                    }
108
109                    let entry = m.entities.entry(entity_path.to_string()).or_default();
110                    match kind {
111                        ExecKind::Load => {
112                            entry.load_calls = entry.load_calls.saturating_add(1);
113                        }
114                        ExecKind::Save => {
115                            entry.save_calls = entry.save_calls.saturating_add(1);
116                        }
117                        ExecKind::Delete => {
118                            entry.delete_calls = entry.delete_calls.saturating_add(1);
119                        }
120                    }
121                });
122            }
123
124            MetricsEvent::ExecFinish {
125                kind,
126                entity_path,
127                rows_touched,
128                inst_delta,
129            } => {
130                metrics::with_state_mut(|m| {
131                    match kind {
132                        ExecKind::Load => {
133                            m.ops.rows_loaded = m.ops.rows_loaded.saturating_add(rows_touched);
134                            metrics::add_instructions(
135                                &mut m.perf.load_inst_total,
136                                &mut m.perf.load_inst_max,
137                                inst_delta,
138                            );
139                        }
140                        ExecKind::Save => {
141                            metrics::add_instructions(
142                                &mut m.perf.save_inst_total,
143                                &mut m.perf.save_inst_max,
144                                inst_delta,
145                            );
146                        }
147                        ExecKind::Delete => {
148                            m.ops.rows_deleted = m.ops.rows_deleted.saturating_add(rows_touched);
149                            metrics::add_instructions(
150                                &mut m.perf.delete_inst_total,
151                                &mut m.perf.delete_inst_max,
152                                inst_delta,
153                            );
154                        }
155                    }
156
157                    let entry = m.entities.entry(entity_path.to_string()).or_default();
158                    match kind {
159                        ExecKind::Load => {
160                            entry.rows_loaded = entry.rows_loaded.saturating_add(rows_touched);
161                        }
162                        ExecKind::Delete => {
163                            entry.rows_deleted = entry.rows_deleted.saturating_add(rows_touched);
164                        }
165                        ExecKind::Save => {}
166                    }
167                });
168            }
169
170            MetricsEvent::RowsScanned {
171                entity_path,
172                rows_scanned,
173            } => {
174                metrics::with_state_mut(|m| {
175                    m.ops.rows_scanned = m.ops.rows_scanned.saturating_add(rows_scanned);
176                    let entry = m.entities.entry(entity_path.to_string()).or_default();
177                    entry.rows_scanned = entry.rows_scanned.saturating_add(rows_scanned);
178                });
179            }
180
181            MetricsEvent::UniqueViolation { entity_path } => {
182                metrics::with_state_mut(|m| {
183                    m.ops.unique_violations = m.ops.unique_violations.saturating_add(1);
184                    let entry = m.entities.entry(entity_path.to_string()).or_default();
185                    entry.unique_violations = entry.unique_violations.saturating_add(1);
186                });
187            }
188
189            MetricsEvent::IndexDelta {
190                entity_path,
191                inserts,
192                removes,
193            } => {
194                metrics::with_state_mut(|m| {
195                    m.ops.index_inserts = m.ops.index_inserts.saturating_add(inserts);
196                    m.ops.index_removes = m.ops.index_removes.saturating_add(removes);
197                    let entry = m.entities.entry(entity_path.to_string()).or_default();
198                    entry.index_inserts = entry.index_inserts.saturating_add(inserts);
199                    entry.index_removes = entry.index_removes.saturating_add(removes);
200                });
201            }
202
203            MetricsEvent::Plan { kind } => {
204                metrics::with_state_mut(|m| match kind {
205                    PlanKind::Keys => m.ops.plan_keys = m.ops.plan_keys.saturating_add(1),
206                    PlanKind::Index => m.ops.plan_index = m.ops.plan_index.saturating_add(1),
207                    PlanKind::Range => m.ops.plan_range = m.ops.plan_range.saturating_add(1),
208                    PlanKind::FullScan => {
209                        m.ops.plan_full_scan = m.ops.plan_full_scan.saturating_add(1);
210                    }
211                });
212            }
213        }
214    }
215}
216
217pub const GLOBAL_METRICS_SINK: GlobalMetricsSink = GlobalMetricsSink;
218
219pub fn record(event: MetricsEvent) {
220    let override_ptr = SINK_OVERRIDE.with(|cell| *cell.borrow());
221    if let Some(ptr) = override_ptr {
222        // SAFETY:
223        // Preconditions:
224        // - `ptr` was produced from a valid `&dyn MetricsSink` in `with_metrics_sink`.
225        // - `with_metrics_sink` always restores the previous pointer before returning,
226        //   including unwind paths via `Guard::drop`.
227        // - `record` is synchronous and never stores `ptr` beyond this call.
228        //
229        // Aliasing:
230        // - We materialize only a shared reference (`&dyn MetricsSink`), matching the
231        //   original shared borrow used to install the override.
232        // - No mutable alias to the same sink is created here.
233        //
234        // What would break this:
235        // - If `with_metrics_sink` failed to restore on all exits (normal + panic),
236        //   `ptr` could outlive the borrowed sink and become dangling.
237        // - If `record` were changed to store or dispatch asynchronously using `ptr`,
238        //   lifetime assumptions would no longer hold.
239        unsafe { (&*ptr).record(event) };
240    } else {
241        GLOBAL_METRICS_SINK.record(event);
242    }
243}
244
245/// Snapshot the current metrics state for endpoint/test plumbing.
246///
247/// `since_ms` filters by window start (`EventState::since_ms`), not by per-event timestamps.
248#[must_use]
249pub fn metrics_report(since_ms: Option<u64>) -> metrics::EventReport {
250    metrics::report_since(since_ms)
251}
252
253/// Reset ephemeral metrics counters.
254pub fn metrics_reset() {
255    metrics::reset();
256}
257
258/// Reset all metrics state (counters + perf).
259pub fn metrics_reset_all() {
260    metrics::reset_all();
261}
262
263/// Run a closure with a temporary metrics sink override.
264pub fn with_metrics_sink<T>(sink: &dyn MetricsSink, f: impl FnOnce() -> T) -> T {
265    struct Guard(Option<*const dyn MetricsSink>);
266
267    impl Drop for Guard {
268        fn drop(&mut self) {
269            SINK_OVERRIDE.with(|cell| {
270                *cell.borrow_mut() = self.0;
271            });
272        }
273    }
274
275    // SAFETY:
276    // Preconditions:
277    // - `sink_ptr` is installed only for this dynamic scope.
278    // - `Guard` always restores the previous slot on all exits, including panic.
279    // - `record` only dereferences synchronously and never persists `sink_ptr`.
280    //
281    // Aliasing:
282    // - We erase lifetime to a raw pointer, but still only expose shared access.
283    // - No mutable alias to the same sink is introduced by this conversion.
284    //
285    // What would break this:
286    // - Any async/deferred use of `sink_ptr` beyond this scope.
287    // - Any path that bypasses Guard restoration.
288    let sink_ptr = unsafe { std::mem::transmute::<&dyn MetricsSink, *const dyn MetricsSink>(sink) };
289    let prev = SINK_OVERRIDE.with(|cell| {
290        let mut slot = cell.borrow_mut();
291        slot.replace(sink_ptr)
292    });
293    let _guard = Guard(prev);
294
295    f()
296}
297
298///
299/// Span
300/// RAII guard to simplify metrics instrumentation
301///
302
303pub(crate) struct Span<E: EntityKind> {
304    kind: ExecKind,
305    start: u64,
306    rows: u64,
307    finished: bool,
308    _marker: PhantomData<E>,
309}
310
311#[allow(clippy::missing_const_for_fn)]
312fn read_perf_counter() -> u64 {
313    #[cfg(target_arch = "wasm32")]
314    {
315        canic_cdk::api::performance_counter(1)
316    }
317    #[cfg(not(target_arch = "wasm32"))]
318    {
319        0
320    }
321}
322
323impl<E: EntityKind> Span<E> {
324    #[must_use]
325    /// Start a metrics span for a specific entity and executor kind.
326    pub(crate) fn new(kind: ExecKind) -> Self {
327        record(MetricsEvent::ExecStart {
328            kind,
329            entity_path: E::PATH,
330        });
331
332        Self {
333            kind,
334            start: read_perf_counter(),
335            rows: 0,
336            finished: false,
337            _marker: PhantomData,
338        }
339    }
340
341    pub(crate) const fn set_rows(&mut self, rows: u64) {
342        self.rows = rows;
343    }
344
345    fn finish_inner(&self) {
346        let now = read_perf_counter();
347        let delta = now.saturating_sub(self.start);
348
349        record(MetricsEvent::ExecFinish {
350            kind: self.kind,
351            entity_path: E::PATH,
352            rows_touched: self.rows,
353            inst_delta: delta,
354        });
355    }
356}
357
358impl<E: EntityKind> Drop for Span<E> {
359    fn drop(&mut self) {
360        if !self.finished {
361            self.finish_inner();
362            self.finished = true;
363        }
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use std::panic::{AssertUnwindSafe, catch_unwind};
371    use std::sync::atomic::{AtomicUsize, Ordering};
372
373    struct CountingSink<'a> {
374        calls: &'a AtomicUsize,
375    }
376
377    impl MetricsSink for CountingSink<'_> {
378        fn record(&self, _: MetricsEvent) {
379            self.calls.fetch_add(1, Ordering::SeqCst);
380        }
381    }
382
383    #[test]
384    fn with_metrics_sink_routes_and_restores_nested_overrides() {
385        SINK_OVERRIDE.with(|cell| {
386            *cell.borrow_mut() = None;
387        });
388
389        let outer_calls = AtomicUsize::new(0);
390        let inner_calls = AtomicUsize::new(0);
391        let outer = CountingSink {
392            calls: &outer_calls,
393        };
394        let inner = CountingSink {
395            calls: &inner_calls,
396        };
397
398        // No override installed yet.
399        record(MetricsEvent::Plan {
400            kind: PlanKind::Keys,
401        });
402        assert_eq!(outer_calls.load(Ordering::SeqCst), 0);
403        assert_eq!(inner_calls.load(Ordering::SeqCst), 0);
404
405        with_metrics_sink(&outer, || {
406            record(MetricsEvent::Plan {
407                kind: PlanKind::Index,
408            });
409            assert_eq!(outer_calls.load(Ordering::SeqCst), 1);
410            assert_eq!(inner_calls.load(Ordering::SeqCst), 0);
411
412            with_metrics_sink(&inner, || {
413                record(MetricsEvent::Plan {
414                    kind: PlanKind::Range,
415                });
416            });
417
418            // Inner override was restored to outer override.
419            record(MetricsEvent::Plan {
420                kind: PlanKind::FullScan,
421            });
422        });
423
424        assert_eq!(outer_calls.load(Ordering::SeqCst), 2);
425        assert_eq!(inner_calls.load(Ordering::SeqCst), 1);
426
427        // Outer override was restored to previous (none).
428        SINK_OVERRIDE.with(|cell| {
429            assert!(cell.borrow().is_none());
430        });
431
432        record(MetricsEvent::Plan {
433            kind: PlanKind::Keys,
434        });
435        assert_eq!(outer_calls.load(Ordering::SeqCst), 2);
436        assert_eq!(inner_calls.load(Ordering::SeqCst), 1);
437    }
438
439    #[test]
440    fn with_metrics_sink_restores_override_on_panic() {
441        SINK_OVERRIDE.with(|cell| {
442            *cell.borrow_mut() = None;
443        });
444
445        let calls = AtomicUsize::new(0);
446        let sink = CountingSink { calls: &calls };
447
448        let panicked = catch_unwind(AssertUnwindSafe(|| {
449            with_metrics_sink(&sink, || {
450                record(MetricsEvent::Plan {
451                    kind: PlanKind::Index,
452                });
453                panic!("intentional panic for guard test");
454            });
455        }))
456        .is_err();
457        assert!(panicked);
458        assert_eq!(calls.load(Ordering::SeqCst), 1);
459
460        // Guard restored TLS slot after unwind.
461        SINK_OVERRIDE.with(|cell| {
462            assert!(cell.borrow().is_none());
463        });
464
465        record(MetricsEvent::Plan {
466            kind: PlanKind::Range,
467        });
468        assert_eq!(calls.load(Ordering::SeqCst), 1);
469    }
470
471    #[test]
472    fn metrics_report_without_since_returns_counters() {
473        metrics_reset_all();
474        record(MetricsEvent::Plan {
475            kind: PlanKind::Index,
476        });
477
478        let report = metrics_report(None);
479        let counters = report
480            .counters
481            .expect("metrics report should include counters without since filter");
482        assert_eq!(counters.ops.plan_index, 1);
483    }
484
485    #[test]
486    fn metrics_report_since_before_window_returns_counters() {
487        metrics_reset_all();
488        let window_start = metrics::with_state(|m| m.since_ms);
489        record(MetricsEvent::Plan {
490            kind: PlanKind::Keys,
491        });
492
493        let report = metrics_report(Some(window_start.saturating_sub(1)));
494        let counters = report
495            .counters
496            .expect("metrics report should include counters when since_ms is before window");
497        assert_eq!(counters.ops.plan_keys, 1);
498    }
499
500    #[test]
501    fn metrics_report_since_after_window_returns_empty() {
502        metrics_reset_all();
503        let window_start = metrics::with_state(|m| m.since_ms);
504        record(MetricsEvent::Plan {
505            kind: PlanKind::FullScan,
506        });
507
508        let report = metrics_report(Some(window_start.saturating_add(1)));
509        assert!(report.counters.is_none());
510        assert!(report.entity_counters.is_empty());
511    }
512}