Skip to main content

icydb_core/obs/
sink.rs

1//! Metrics sink boundary.
2//!
3//! Core DB logic MUST NOT depend on obs::metrics directly.
4//! All instrumentation flows through MetricsEvent and MetricsSink.
5//!
6//! This module is the only allowed bridge between execution logic
7//! and the global metrics state.
8use crate::{obs::metrics, traits::EntityKind};
9use std::{cell::RefCell, marker::PhantomData};
10
11thread_local! {
12    static SINK_OVERRIDE: RefCell<Option<*const dyn MetricsSink>> = RefCell::new(None);
13}
14
15///
16/// ExecKind
17///
18
19#[derive(Clone, Copy, Debug)]
20pub enum ExecKind {
21    Load,
22    Save,
23    Delete,
24}
25
26///
27/// PlanKind
28///
29
30#[derive(Clone, Copy, Debug)]
31pub enum PlanKind {
32    Keys,
33    Index,
34    Range,
35    FullScan,
36}
37
38///
39/// MetricsEvent
40///
41
42#[derive(Clone, Copy, Debug)]
43pub enum MetricsEvent {
44    ExecStart {
45        kind: ExecKind,
46        entity_path: &'static str,
47    },
48    ExecFinish {
49        kind: ExecKind,
50        entity_path: &'static str,
51        rows_touched: u64,
52        inst_delta: u64,
53    },
54    RowsScanned {
55        entity_path: &'static str,
56        rows_scanned: u64,
57    },
58    UniqueViolation {
59        entity_path: &'static str,
60    },
61    IndexDelta {
62        entity_path: &'static str,
63        inserts: u64,
64        removes: u64,
65    },
66    ReverseIndexDelta {
67        entity_path: &'static str,
68        inserts: u64,
69        removes: u64,
70    },
71    RelationValidation {
72        entity_path: &'static str,
73        reverse_lookups: u64,
74        blocked_deletes: u64,
75    },
76    Plan {
77        kind: PlanKind,
78    },
79}
80
81///
82/// MetricsSink
83///
84
85pub trait MetricsSink {
86    fn record(&self, event: MetricsEvent);
87}
88
89///
90/// NoopMetricsSink
91///
92
93pub struct NoopMetricsSink;
94
95impl MetricsSink for NoopMetricsSink {
96    fn record(&self, _: MetricsEvent) {}
97}
98
99///
100/// GlobalMetricsSink
101///
102
103pub struct GlobalMetricsSink;
104
105impl MetricsSink for GlobalMetricsSink {
106    #[expect(clippy::too_many_lines)]
107    fn record(&self, event: MetricsEvent) {
108        match event {
109            MetricsEvent::ExecStart { kind, entity_path } => {
110                metrics::with_state_mut(|m| {
111                    match kind {
112                        ExecKind::Load => m.ops.load_calls = m.ops.load_calls.saturating_add(1),
113                        ExecKind::Save => m.ops.save_calls = m.ops.save_calls.saturating_add(1),
114                        ExecKind::Delete => {
115                            m.ops.delete_calls = m.ops.delete_calls.saturating_add(1);
116                        }
117                    }
118
119                    let entry = m.entities.entry(entity_path.to_string()).or_default();
120                    match kind {
121                        ExecKind::Load => {
122                            entry.load_calls = entry.load_calls.saturating_add(1);
123                        }
124                        ExecKind::Save => {
125                            entry.save_calls = entry.save_calls.saturating_add(1);
126                        }
127                        ExecKind::Delete => {
128                            entry.delete_calls = entry.delete_calls.saturating_add(1);
129                        }
130                    }
131                });
132            }
133
134            MetricsEvent::ExecFinish {
135                kind,
136                entity_path,
137                rows_touched,
138                inst_delta,
139            } => {
140                metrics::with_state_mut(|m| {
141                    match kind {
142                        ExecKind::Load => {
143                            m.ops.rows_loaded = m.ops.rows_loaded.saturating_add(rows_touched);
144                            metrics::add_instructions(
145                                &mut m.perf.load_inst_total,
146                                &mut m.perf.load_inst_max,
147                                inst_delta,
148                            );
149                        }
150                        ExecKind::Save => {
151                            metrics::add_instructions(
152                                &mut m.perf.save_inst_total,
153                                &mut m.perf.save_inst_max,
154                                inst_delta,
155                            );
156                        }
157                        ExecKind::Delete => {
158                            m.ops.rows_deleted = m.ops.rows_deleted.saturating_add(rows_touched);
159                            metrics::add_instructions(
160                                &mut m.perf.delete_inst_total,
161                                &mut m.perf.delete_inst_max,
162                                inst_delta,
163                            );
164                        }
165                    }
166
167                    let entry = m.entities.entry(entity_path.to_string()).or_default();
168                    match kind {
169                        ExecKind::Load => {
170                            entry.rows_loaded = entry.rows_loaded.saturating_add(rows_touched);
171                        }
172                        ExecKind::Delete => {
173                            entry.rows_deleted = entry.rows_deleted.saturating_add(rows_touched);
174                        }
175                        ExecKind::Save => {}
176                    }
177                });
178            }
179
180            MetricsEvent::RowsScanned {
181                entity_path,
182                rows_scanned,
183            } => {
184                metrics::with_state_mut(|m| {
185                    m.ops.rows_scanned = m.ops.rows_scanned.saturating_add(rows_scanned);
186                    let entry = m.entities.entry(entity_path.to_string()).or_default();
187                    entry.rows_scanned = entry.rows_scanned.saturating_add(rows_scanned);
188                });
189            }
190
191            MetricsEvent::UniqueViolation { entity_path } => {
192                metrics::with_state_mut(|m| {
193                    m.ops.unique_violations = m.ops.unique_violations.saturating_add(1);
194                    let entry = m.entities.entry(entity_path.to_string()).or_default();
195                    entry.unique_violations = entry.unique_violations.saturating_add(1);
196                });
197            }
198
199            MetricsEvent::IndexDelta {
200                entity_path,
201                inserts,
202                removes,
203            } => {
204                metrics::with_state_mut(|m| {
205                    m.ops.index_inserts = m.ops.index_inserts.saturating_add(inserts);
206                    m.ops.index_removes = m.ops.index_removes.saturating_add(removes);
207                    let entry = m.entities.entry(entity_path.to_string()).or_default();
208                    entry.index_inserts = entry.index_inserts.saturating_add(inserts);
209                    entry.index_removes = entry.index_removes.saturating_add(removes);
210                });
211            }
212
213            MetricsEvent::ReverseIndexDelta {
214                entity_path,
215                inserts,
216                removes,
217            } => {
218                metrics::with_state_mut(|m| {
219                    m.ops.reverse_index_inserts =
220                        m.ops.reverse_index_inserts.saturating_add(inserts);
221                    m.ops.reverse_index_removes =
222                        m.ops.reverse_index_removes.saturating_add(removes);
223                    let entry = m.entities.entry(entity_path.to_string()).or_default();
224                    entry.reverse_index_inserts =
225                        entry.reverse_index_inserts.saturating_add(inserts);
226                    entry.reverse_index_removes =
227                        entry.reverse_index_removes.saturating_add(removes);
228                });
229            }
230
231            MetricsEvent::RelationValidation {
232                entity_path,
233                reverse_lookups,
234                blocked_deletes,
235            } => {
236                metrics::with_state_mut(|m| {
237                    m.ops.relation_reverse_lookups = m
238                        .ops
239                        .relation_reverse_lookups
240                        .saturating_add(reverse_lookups);
241                    m.ops.relation_delete_blocks =
242                        m.ops.relation_delete_blocks.saturating_add(blocked_deletes);
243
244                    let entry = m.entities.entry(entity_path.to_string()).or_default();
245                    entry.relation_reverse_lookups = entry
246                        .relation_reverse_lookups
247                        .saturating_add(reverse_lookups);
248                    entry.relation_delete_blocks =
249                        entry.relation_delete_blocks.saturating_add(blocked_deletes);
250                });
251            }
252
253            MetricsEvent::Plan { kind } => {
254                metrics::with_state_mut(|m| match kind {
255                    PlanKind::Keys => m.ops.plan_keys = m.ops.plan_keys.saturating_add(1),
256                    PlanKind::Index => m.ops.plan_index = m.ops.plan_index.saturating_add(1),
257                    PlanKind::Range => m.ops.plan_range = m.ops.plan_range.saturating_add(1),
258                    PlanKind::FullScan => {
259                        m.ops.plan_full_scan = m.ops.plan_full_scan.saturating_add(1);
260                    }
261                });
262            }
263        }
264    }
265}
266
267pub const GLOBAL_METRICS_SINK: GlobalMetricsSink = GlobalMetricsSink;
268
269pub fn record(event: MetricsEvent) {
270    let override_ptr = SINK_OVERRIDE.with(|cell| *cell.borrow());
271    if let Some(ptr) = override_ptr {
272        // SAFETY:
273        // Preconditions:
274        // - `ptr` was produced from a valid `&dyn MetricsSink` in `with_metrics_sink`.
275        // - `with_metrics_sink` always restores the previous pointer before returning,
276        //   including unwind paths via `Guard::drop`.
277        // - `record` is synchronous and never stores `ptr` beyond this call.
278        //
279        // Aliasing:
280        // - We materialize only a shared reference (`&dyn MetricsSink`), matching the
281        //   original shared borrow used to install the override.
282        // - No mutable alias to the same sink is created here.
283        //
284        // What would break this:
285        // - If `with_metrics_sink` failed to restore on all exits (normal + panic),
286        //   `ptr` could outlive the borrowed sink and become dangling.
287        // - If `record` were changed to store or dispatch asynchronously using `ptr`,
288        //   lifetime assumptions would no longer hold.
289        unsafe { (&*ptr).record(event) };
290    } else {
291        GLOBAL_METRICS_SINK.record(event);
292    }
293}
294
295/// Snapshot the current metrics state for endpoint/test plumbing.
296///
297/// `since_ms` filters by window start (`EventState::since_ms`), not by per-event timestamps.
298#[must_use]
299pub fn metrics_report(since_ms: Option<u64>) -> metrics::EventReport {
300    metrics::report_since(since_ms)
301}
302
303/// Reset ephemeral metrics counters.
304pub fn metrics_reset() {
305    metrics::reset();
306}
307
308/// Reset all metrics state (counters + perf).
309pub fn metrics_reset_all() {
310    metrics::reset_all();
311}
312
313/// Run a closure with a temporary metrics sink override.
314pub fn with_metrics_sink<T>(sink: &dyn MetricsSink, f: impl FnOnce() -> T) -> T {
315    struct Guard(Option<*const dyn MetricsSink>);
316
317    impl Drop for Guard {
318        fn drop(&mut self) {
319            SINK_OVERRIDE.with(|cell| {
320                *cell.borrow_mut() = self.0;
321            });
322        }
323    }
324
325    // SAFETY:
326    // Preconditions:
327    // - `sink_ptr` is installed only for this dynamic scope.
328    // - `Guard` always restores the previous slot on all exits, including panic.
329    // - `record` only dereferences synchronously and never persists `sink_ptr`.
330    //
331    // Aliasing:
332    // - We erase lifetime to a raw pointer, but still only expose shared access.
333    // - No mutable alias to the same sink is introduced by this conversion.
334    //
335    // What would break this:
336    // - Any async/deferred use of `sink_ptr` beyond this scope.
337    // - Any path that bypasses Guard restoration.
338    let sink_ptr = unsafe { std::mem::transmute::<&dyn MetricsSink, *const dyn MetricsSink>(sink) };
339    let prev = SINK_OVERRIDE.with(|cell| {
340        let mut slot = cell.borrow_mut();
341        slot.replace(sink_ptr)
342    });
343    let _guard = Guard(prev);
344
345    f()
346}
347
348///
349/// Span
350/// RAII guard to simplify metrics instrumentation
351///
352
353pub(crate) struct Span<E: EntityKind> {
354    kind: ExecKind,
355    start: u64,
356    rows: u64,
357    finished: bool,
358    _marker: PhantomData<E>,
359}
360
361#[allow(clippy::missing_const_for_fn)]
362fn read_perf_counter() -> u64 {
363    #[cfg(target_arch = "wasm32")]
364    {
365        canic_cdk::api::performance_counter(1)
366    }
367    #[cfg(not(target_arch = "wasm32"))]
368    {
369        0
370    }
371}
372
373impl<E: EntityKind> Span<E> {
374    #[must_use]
375    /// Start a metrics span for a specific entity and executor kind.
376    pub(crate) fn new(kind: ExecKind) -> Self {
377        record(MetricsEvent::ExecStart {
378            kind,
379            entity_path: E::PATH,
380        });
381
382        Self {
383            kind,
384            start: read_perf_counter(),
385            rows: 0,
386            finished: false,
387            _marker: PhantomData,
388        }
389    }
390
391    pub(crate) const fn set_rows(&mut self, rows: u64) {
392        self.rows = rows;
393    }
394
395    fn finish_inner(&self) {
396        let now = read_perf_counter();
397        let delta = now.saturating_sub(self.start);
398
399        record(MetricsEvent::ExecFinish {
400            kind: self.kind,
401            entity_path: E::PATH,
402            rows_touched: self.rows,
403            inst_delta: delta,
404        });
405    }
406}
407
408impl<E: EntityKind> Drop for Span<E> {
409    fn drop(&mut self) {
410        if !self.finished {
411            self.finish_inner();
412            self.finished = true;
413        }
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420    use std::panic::{AssertUnwindSafe, catch_unwind};
421    use std::sync::atomic::{AtomicUsize, Ordering};
422
423    struct CountingSink<'a> {
424        calls: &'a AtomicUsize,
425    }
426
427    impl MetricsSink for CountingSink<'_> {
428        fn record(&self, _: MetricsEvent) {
429            self.calls.fetch_add(1, Ordering::SeqCst);
430        }
431    }
432
433    #[test]
434    fn with_metrics_sink_routes_and_restores_nested_overrides() {
435        SINK_OVERRIDE.with(|cell| {
436            *cell.borrow_mut() = None;
437        });
438
439        let outer_calls = AtomicUsize::new(0);
440        let inner_calls = AtomicUsize::new(0);
441        let outer = CountingSink {
442            calls: &outer_calls,
443        };
444        let inner = CountingSink {
445            calls: &inner_calls,
446        };
447
448        // No override installed yet.
449        record(MetricsEvent::Plan {
450            kind: PlanKind::Keys,
451        });
452        assert_eq!(outer_calls.load(Ordering::SeqCst), 0);
453        assert_eq!(inner_calls.load(Ordering::SeqCst), 0);
454
455        with_metrics_sink(&outer, || {
456            record(MetricsEvent::Plan {
457                kind: PlanKind::Index,
458            });
459            assert_eq!(outer_calls.load(Ordering::SeqCst), 1);
460            assert_eq!(inner_calls.load(Ordering::SeqCst), 0);
461
462            with_metrics_sink(&inner, || {
463                record(MetricsEvent::Plan {
464                    kind: PlanKind::Range,
465                });
466            });
467
468            // Inner override was restored to outer override.
469            record(MetricsEvent::Plan {
470                kind: PlanKind::FullScan,
471            });
472        });
473
474        assert_eq!(outer_calls.load(Ordering::SeqCst), 2);
475        assert_eq!(inner_calls.load(Ordering::SeqCst), 1);
476
477        // Outer override was restored to previous (none).
478        SINK_OVERRIDE.with(|cell| {
479            assert!(cell.borrow().is_none());
480        });
481
482        record(MetricsEvent::Plan {
483            kind: PlanKind::Keys,
484        });
485        assert_eq!(outer_calls.load(Ordering::SeqCst), 2);
486        assert_eq!(inner_calls.load(Ordering::SeqCst), 1);
487    }
488
489    #[test]
490    fn with_metrics_sink_restores_override_on_panic() {
491        SINK_OVERRIDE.with(|cell| {
492            *cell.borrow_mut() = None;
493        });
494
495        let calls = AtomicUsize::new(0);
496        let sink = CountingSink { calls: &calls };
497
498        let panicked = catch_unwind(AssertUnwindSafe(|| {
499            with_metrics_sink(&sink, || {
500                record(MetricsEvent::Plan {
501                    kind: PlanKind::Index,
502                });
503                panic!("intentional panic for guard test");
504            });
505        }))
506        .is_err();
507        assert!(panicked);
508        assert_eq!(calls.load(Ordering::SeqCst), 1);
509
510        // Guard restored TLS slot after unwind.
511        SINK_OVERRIDE.with(|cell| {
512            assert!(cell.borrow().is_none());
513        });
514
515        record(MetricsEvent::Plan {
516            kind: PlanKind::Range,
517        });
518        assert_eq!(calls.load(Ordering::SeqCst), 1);
519    }
520
521    #[test]
522    fn metrics_report_without_since_returns_counters() {
523        metrics_reset_all();
524        record(MetricsEvent::Plan {
525            kind: PlanKind::Index,
526        });
527
528        let report = metrics_report(None);
529        let counters = report
530            .counters
531            .expect("metrics report should include counters without since filter");
532        assert_eq!(counters.ops.plan_index, 1);
533    }
534
535    #[test]
536    fn metrics_report_since_before_window_returns_counters() {
537        metrics_reset_all();
538        let window_start = metrics::with_state(|m| m.since_ms);
539        record(MetricsEvent::Plan {
540            kind: PlanKind::Keys,
541        });
542
543        let report = metrics_report(Some(window_start.saturating_sub(1)));
544        let counters = report
545            .counters
546            .expect("metrics report should include counters when since_ms is before window");
547        assert_eq!(counters.ops.plan_keys, 1);
548    }
549
550    #[test]
551    fn metrics_report_since_after_window_returns_empty() {
552        metrics_reset_all();
553        let window_start = metrics::with_state(|m| m.since_ms);
554        record(MetricsEvent::Plan {
555            kind: PlanKind::FullScan,
556        });
557
558        let report = metrics_report(Some(window_start.saturating_add(1)));
559        assert!(report.counters.is_none());
560        assert!(report.entity_counters.is_empty());
561    }
562
563    #[test]
564    fn reverse_and_relation_metrics_events_accumulate() {
565        metrics_reset_all();
566
567        record(MetricsEvent::ReverseIndexDelta {
568            entity_path: "obs::tests::Entity",
569            inserts: 3,
570            removes: 2,
571        });
572        record(MetricsEvent::RelationValidation {
573            entity_path: "obs::tests::Entity",
574            reverse_lookups: 5,
575            blocked_deletes: 1,
576        });
577
578        let counters = metrics_report(None)
579            .counters
580            .expect("metrics report should include counters");
581        assert_eq!(counters.ops.reverse_index_inserts, 3);
582        assert_eq!(counters.ops.reverse_index_removes, 2);
583        assert_eq!(counters.ops.relation_reverse_lookups, 5);
584        assert_eq!(counters.ops.relation_delete_blocks, 1);
585
586        let entity = counters
587            .entities
588            .get("obs::tests::Entity")
589            .expect("entity counters should be present");
590        assert_eq!(entity.reverse_index_inserts, 3);
591        assert_eq!(entity.reverse_index_removes, 2);
592        assert_eq!(entity.relation_reverse_lookups, 5);
593        assert_eq!(entity.relation_delete_blocks, 1);
594    }
595}