Skip to main content

icydb_core/obs/
sink.rs

1//! Metrics sink boundary.
2//!
3//! Core DB logic MUST NOT depend on obs::metrics directly.
4//! All instrumentation flows through MetricsEvent and MetricsSink.
5//!
6//! This module is the only allowed bridge between execution logic
7//! and the global metrics state.
8use crate::{obs::metrics, traits::EntityKind};
9use std::{cell::RefCell, marker::PhantomData};
10
11thread_local! {
12    static SINK_OVERRIDE: RefCell<Option<*const dyn MetricsSink>> = RefCell::new(None);
13}
14
15///
16/// ExecKind
17///
18
19#[derive(Clone, Copy, Debug)]
20pub enum ExecKind {
21    Load,
22    Save,
23    Delete,
24}
25
26///
27/// PlanKind
28///
29
30#[derive(Clone, Copy, Debug)]
31pub enum PlanKind {
32    Keys,
33    Index,
34    Range,
35    FullScan,
36}
37
38///
39/// MetricsEvent
40///
41
42#[derive(Clone, Copy, Debug)]
43pub enum MetricsEvent {
44    ExecStart {
45        kind: ExecKind,
46        entity_path: &'static str,
47    },
48    ExecFinish {
49        kind: ExecKind,
50        entity_path: &'static str,
51        rows_touched: u64,
52        inst_delta: u64,
53    },
54    RowsScanned {
55        entity_path: &'static str,
56        rows_scanned: u64,
57    },
58    UniqueViolation {
59        entity_path: &'static str,
60    },
61    IndexInsert {
62        entity_path: &'static str,
63    },
64    IndexRemove {
65        entity_path: &'static str,
66    },
67    Plan {
68        kind: PlanKind,
69    },
70}
71
72///
73/// MetricsSink
74///
75
76pub trait MetricsSink {
77    fn record(&self, event: MetricsEvent);
78}
79
80///
81/// NoopMetricsSink
82///
83
84pub struct NoopMetricsSink;
85
86impl MetricsSink for NoopMetricsSink {
87    fn record(&self, _: MetricsEvent) {}
88}
89
90///
91/// GlobalMetricsSink
92///
93
94pub struct GlobalMetricsSink;
95
96impl MetricsSink for GlobalMetricsSink {
97    #[expect(clippy::too_many_lines)]
98    fn record(&self, event: MetricsEvent) {
99        match event {
100            MetricsEvent::ExecStart { kind, entity_path } => {
101                metrics::with_state_mut(|m| {
102                    match kind {
103                        ExecKind::Load => m.ops.load_calls = m.ops.load_calls.saturating_add(1),
104                        ExecKind::Save => m.ops.save_calls = m.ops.save_calls.saturating_add(1),
105                        ExecKind::Delete => {
106                            m.ops.delete_calls = m.ops.delete_calls.saturating_add(1);
107                        }
108                    }
109
110                    let entry = m.entities.entry(entity_path.to_string()).or_default();
111                    match kind {
112                        ExecKind::Load => {
113                            entry.load_calls = entry.load_calls.saturating_add(1);
114                        }
115                        ExecKind::Save => {
116                            entry.save_calls = entry.save_calls.saturating_add(1);
117                        }
118                        ExecKind::Delete => {
119                            entry.delete_calls = entry.delete_calls.saturating_add(1);
120                        }
121                    }
122                });
123            }
124
125            MetricsEvent::ExecFinish {
126                kind,
127                entity_path,
128                rows_touched,
129                inst_delta,
130            } => {
131                metrics::with_state_mut(|m| {
132                    match kind {
133                        ExecKind::Load => {
134                            m.ops.rows_loaded = m.ops.rows_loaded.saturating_add(rows_touched);
135                            metrics::add_instructions(
136                                &mut m.perf.load_inst_total,
137                                &mut m.perf.load_inst_max,
138                                inst_delta,
139                            );
140                        }
141                        ExecKind::Save => {
142                            metrics::add_instructions(
143                                &mut m.perf.save_inst_total,
144                                &mut m.perf.save_inst_max,
145                                inst_delta,
146                            );
147                        }
148                        ExecKind::Delete => {
149                            m.ops.rows_deleted = m.ops.rows_deleted.saturating_add(rows_touched);
150                            metrics::add_instructions(
151                                &mut m.perf.delete_inst_total,
152                                &mut m.perf.delete_inst_max,
153                                inst_delta,
154                            );
155                        }
156                    }
157
158                    let entry = m.entities.entry(entity_path.to_string()).or_default();
159                    match kind {
160                        ExecKind::Load => {
161                            entry.rows_loaded = entry.rows_loaded.saturating_add(rows_touched);
162                        }
163                        ExecKind::Delete => {
164                            entry.rows_deleted = entry.rows_deleted.saturating_add(rows_touched);
165                        }
166                        ExecKind::Save => {}
167                    }
168                });
169            }
170
171            MetricsEvent::RowsScanned {
172                entity_path,
173                rows_scanned,
174            } => {
175                metrics::with_state_mut(|m| {
176                    m.ops.rows_scanned = m.ops.rows_scanned.saturating_add(rows_scanned);
177                    let entry = m.entities.entry(entity_path.to_string()).or_default();
178                    entry.rows_scanned = entry.rows_scanned.saturating_add(rows_scanned);
179                });
180            }
181
182            MetricsEvent::UniqueViolation { entity_path } => {
183                metrics::with_state_mut(|m| {
184                    m.ops.unique_violations = m.ops.unique_violations.saturating_add(1);
185                    let entry = m.entities.entry(entity_path.to_string()).or_default();
186                    entry.unique_violations = entry.unique_violations.saturating_add(1);
187                });
188            }
189
190            MetricsEvent::IndexInsert { entity_path } => {
191                metrics::with_state_mut(|m| {
192                    m.ops.index_inserts = m.ops.index_inserts.saturating_add(1);
193                    let entry = m.entities.entry(entity_path.to_string()).or_default();
194                    entry.index_inserts = entry.index_inserts.saturating_add(1);
195                });
196            }
197
198            MetricsEvent::IndexRemove { entity_path } => {
199                metrics::with_state_mut(|m| {
200                    m.ops.index_removes = m.ops.index_removes.saturating_add(1);
201                    let entry = m.entities.entry(entity_path.to_string()).or_default();
202                    entry.index_removes = entry.index_removes.saturating_add(1);
203                });
204            }
205
206            MetricsEvent::Plan { kind } => {
207                metrics::with_state_mut(|m| match kind {
208                    PlanKind::Keys => m.ops.plan_keys = m.ops.plan_keys.saturating_add(1),
209                    PlanKind::Index => m.ops.plan_index = m.ops.plan_index.saturating_add(1),
210                    PlanKind::Range => m.ops.plan_range = m.ops.plan_range.saturating_add(1),
211                    PlanKind::FullScan => {
212                        m.ops.plan_full_scan = m.ops.plan_full_scan.saturating_add(1);
213                    }
214                });
215            }
216        }
217    }
218}
219
220pub const GLOBAL_METRICS_SINK: GlobalMetricsSink = GlobalMetricsSink;
221
222pub fn record(event: MetricsEvent) {
223    let override_ptr = SINK_OVERRIDE.with(|cell| *cell.borrow());
224    if let Some(ptr) = override_ptr {
225        // SAFETY:
226        // Preconditions:
227        // - `ptr` was produced from a valid `&dyn MetricsSink` in `with_metrics_sink`.
228        // - `with_metrics_sink` always restores the previous pointer before returning,
229        //   including unwind paths via `Guard::drop`.
230        // - `record` is synchronous and never stores `ptr` beyond this call.
231        //
232        // Aliasing:
233        // - We materialize only a shared reference (`&dyn MetricsSink`), matching the
234        //   original shared borrow used to install the override.
235        // - No mutable alias to the same sink is created here.
236        //
237        // What would break this:
238        // - If `with_metrics_sink` failed to restore on all exits (normal + panic),
239        //   `ptr` could outlive the borrowed sink and become dangling.
240        // - If `record` were changed to store or dispatch asynchronously using `ptr`,
241        //   lifetime assumptions would no longer hold.
242        unsafe { (&*ptr).record(event) };
243    } else {
244        GLOBAL_METRICS_SINK.record(event);
245    }
246}
247
248/// Snapshot the current metrics state for endpoint/test plumbing.
249///
250/// `since_ms` filters by window start (`EventState::since_ms`), not by per-event timestamps.
251#[must_use]
252pub fn metrics_report(since_ms: Option<u64>) -> metrics::EventReport {
253    metrics::report_since(since_ms)
254}
255
256/// Reset ephemeral metrics counters.
257pub fn metrics_reset() {
258    metrics::reset();
259}
260
261/// Reset all metrics state (counters + perf).
262pub fn metrics_reset_all() {
263    metrics::reset_all();
264}
265
266/// Run a closure with a temporary metrics sink override.
267pub fn with_metrics_sink<T>(sink: &dyn MetricsSink, f: impl FnOnce() -> T) -> T {
268    struct Guard(Option<*const dyn MetricsSink>);
269
270    impl Drop for Guard {
271        fn drop(&mut self) {
272            SINK_OVERRIDE.with(|cell| {
273                *cell.borrow_mut() = self.0;
274            });
275        }
276    }
277
278    // SAFETY:
279    // Preconditions:
280    // - `sink_ptr` is installed only for this dynamic scope.
281    // - `Guard` always restores the previous slot on all exits, including panic.
282    // - `record` only dereferences synchronously and never persists `sink_ptr`.
283    //
284    // Aliasing:
285    // - We erase lifetime to a raw pointer, but still only expose shared access.
286    // - No mutable alias to the same sink is introduced by this conversion.
287    //
288    // What would break this:
289    // - Any async/deferred use of `sink_ptr` beyond this scope.
290    // - Any path that bypasses Guard restoration.
291    let sink_ptr = unsafe { std::mem::transmute::<&dyn MetricsSink, *const dyn MetricsSink>(sink) };
292    let prev = SINK_OVERRIDE.with(|cell| {
293        let mut slot = cell.borrow_mut();
294        slot.replace(sink_ptr)
295    });
296    let _guard = Guard(prev);
297
298    f()
299}
300
301///
302/// Span
303/// RAII guard to simplify metrics instrumentation
304///
305
306pub(crate) struct Span<E: EntityKind> {
307    kind: ExecKind,
308    start: u64,
309    rows: u64,
310    finished: bool,
311    _marker: PhantomData<E>,
312}
313
314#[allow(clippy::missing_const_for_fn)]
315fn read_perf_counter() -> u64 {
316    #[cfg(target_arch = "wasm32")]
317    {
318        canic_cdk::api::performance_counter(1)
319    }
320    #[cfg(not(target_arch = "wasm32"))]
321    {
322        0
323    }
324}
325
326impl<E: EntityKind> Span<E> {
327    #[must_use]
328    /// Start a metrics span for a specific entity and executor kind.
329    pub(crate) fn new(kind: ExecKind) -> Self {
330        record(MetricsEvent::ExecStart {
331            kind,
332            entity_path: E::PATH,
333        });
334
335        Self {
336            kind,
337            start: read_perf_counter(),
338            rows: 0,
339            finished: false,
340            _marker: PhantomData,
341        }
342    }
343
344    pub(crate) const fn set_rows(&mut self, rows: u64) {
345        self.rows = rows;
346    }
347
348    fn finish_inner(&self) {
349        let now = read_perf_counter();
350        let delta = now.saturating_sub(self.start);
351
352        record(MetricsEvent::ExecFinish {
353            kind: self.kind,
354            entity_path: E::PATH,
355            rows_touched: self.rows,
356            inst_delta: delta,
357        });
358    }
359}
360
361impl<E: EntityKind> Drop for Span<E> {
362    fn drop(&mut self) {
363        if !self.finished {
364            self.finish_inner();
365            self.finished = true;
366        }
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373    use std::panic::{AssertUnwindSafe, catch_unwind};
374    use std::sync::atomic::{AtomicUsize, Ordering};
375
376    struct CountingSink<'a> {
377        calls: &'a AtomicUsize,
378    }
379
380    impl MetricsSink for CountingSink<'_> {
381        fn record(&self, _: MetricsEvent) {
382            self.calls.fetch_add(1, Ordering::SeqCst);
383        }
384    }
385
386    #[test]
387    fn with_metrics_sink_routes_and_restores_nested_overrides() {
388        SINK_OVERRIDE.with(|cell| {
389            *cell.borrow_mut() = None;
390        });
391
392        let outer_calls = AtomicUsize::new(0);
393        let inner_calls = AtomicUsize::new(0);
394        let outer = CountingSink {
395            calls: &outer_calls,
396        };
397        let inner = CountingSink {
398            calls: &inner_calls,
399        };
400
401        // No override installed yet.
402        record(MetricsEvent::Plan {
403            kind: PlanKind::Keys,
404        });
405        assert_eq!(outer_calls.load(Ordering::SeqCst), 0);
406        assert_eq!(inner_calls.load(Ordering::SeqCst), 0);
407
408        with_metrics_sink(&outer, || {
409            record(MetricsEvent::Plan {
410                kind: PlanKind::Index,
411            });
412            assert_eq!(outer_calls.load(Ordering::SeqCst), 1);
413            assert_eq!(inner_calls.load(Ordering::SeqCst), 0);
414
415            with_metrics_sink(&inner, || {
416                record(MetricsEvent::Plan {
417                    kind: PlanKind::Range,
418                });
419            });
420
421            // Inner override was restored to outer override.
422            record(MetricsEvent::Plan {
423                kind: PlanKind::FullScan,
424            });
425        });
426
427        assert_eq!(outer_calls.load(Ordering::SeqCst), 2);
428        assert_eq!(inner_calls.load(Ordering::SeqCst), 1);
429
430        // Outer override was restored to previous (none).
431        SINK_OVERRIDE.with(|cell| {
432            assert!(cell.borrow().is_none());
433        });
434
435        record(MetricsEvent::Plan {
436            kind: PlanKind::Keys,
437        });
438        assert_eq!(outer_calls.load(Ordering::SeqCst), 2);
439        assert_eq!(inner_calls.load(Ordering::SeqCst), 1);
440    }
441
442    #[test]
443    fn with_metrics_sink_restores_override_on_panic() {
444        SINK_OVERRIDE.with(|cell| {
445            *cell.borrow_mut() = None;
446        });
447
448        let calls = AtomicUsize::new(0);
449        let sink = CountingSink { calls: &calls };
450
451        let panicked = catch_unwind(AssertUnwindSafe(|| {
452            with_metrics_sink(&sink, || {
453                record(MetricsEvent::Plan {
454                    kind: PlanKind::Index,
455                });
456                panic!("intentional panic for guard test");
457            });
458        }))
459        .is_err();
460        assert!(panicked);
461        assert_eq!(calls.load(Ordering::SeqCst), 1);
462
463        // Guard restored TLS slot after unwind.
464        SINK_OVERRIDE.with(|cell| {
465            assert!(cell.borrow().is_none());
466        });
467
468        record(MetricsEvent::Plan {
469            kind: PlanKind::Range,
470        });
471        assert_eq!(calls.load(Ordering::SeqCst), 1);
472    }
473
474    #[test]
475    fn metrics_report_without_since_returns_counters() {
476        metrics_reset_all();
477        record(MetricsEvent::Plan {
478            kind: PlanKind::Index,
479        });
480
481        let report = metrics_report(None);
482        let counters = report
483            .counters
484            .expect("metrics report should include counters without since filter");
485        assert_eq!(counters.ops.plan_index, 1);
486    }
487
488    #[test]
489    fn metrics_report_since_before_window_returns_counters() {
490        metrics_reset_all();
491        let window_start = metrics::with_state(|m| m.since_ms);
492        record(MetricsEvent::Plan {
493            kind: PlanKind::Keys,
494        });
495
496        let report = metrics_report(Some(window_start.saturating_sub(1)));
497        let counters = report
498            .counters
499            .expect("metrics report should include counters when since_ms is before window");
500        assert_eq!(counters.ops.plan_keys, 1);
501    }
502
503    #[test]
504    fn metrics_report_since_after_window_returns_empty() {
505        metrics_reset_all();
506        let window_start = metrics::with_state(|m| m.since_ms);
507        record(MetricsEvent::Plan {
508            kind: PlanKind::FullScan,
509        });
510
511        let report = metrics_report(Some(window_start.saturating_add(1)));
512        assert!(report.counters.is_none());
513        assert!(report.entity_counters.is_empty());
514    }
515}