Skip to main content

icydb_core/obs/
sink.rs

1//! Metrics sink boundary.
2//!
3//! Core DB logic MUST NOT depend on obs::metrics directly.
4//! All instrumentation flows through MetricsEvent and MetricsSink.
5//!
6//! This module is the only allowed bridge between execution logic
7//! and the global metrics state.
8use crate::{obs::metrics, traits::EntityKind};
9use std::{cell::RefCell, marker::PhantomData};
10
11thread_local! {
12    static SINK_OVERRIDE: RefCell<Option<*const dyn MetricsSink>> = RefCell::new(None);
13}
14
15///
16/// ExecKind
17///
18
19#[derive(Clone, Copy, Debug)]
20pub enum ExecKind {
21    Load,
22    Save,
23    Delete,
24}
25
26///
27/// PlanKind
28///
29
30#[derive(Clone, Copy, Debug)]
31pub enum PlanKind {
32    Keys,
33    Index,
34    Range,
35    FullScan,
36}
37
38///
39/// MetricsEvent
40///
41
42#[derive(Clone, Copy, Debug)]
43pub enum MetricsEvent {
44    ExecStart {
45        kind: ExecKind,
46        entity_path: &'static str,
47    },
48    ExecFinish {
49        kind: ExecKind,
50        entity_path: &'static str,
51        rows_touched: u64,
52        inst_delta: u64,
53    },
54    RowsScanned {
55        entity_path: &'static str,
56        rows_scanned: u64,
57    },
58    UniqueViolation {
59        entity_path: &'static str,
60    },
61    IndexDelta {
62        entity_path: &'static str,
63        inserts: u64,
64        removes: u64,
65    },
66    ReverseIndexDelta {
67        entity_path: &'static str,
68        inserts: u64,
69        removes: u64,
70    },
71    RelationValidation {
72        entity_path: &'static str,
73        reverse_lookups: u64,
74        blocked_deletes: u64,
75    },
76    Plan {
77        kind: PlanKind,
78    },
79}
80
81///
82/// MetricsSink
83///
84
85pub trait MetricsSink {
86    fn record(&self, event: MetricsEvent);
87}
88
89///
90/// NoopMetricsSink
91///
92
93pub struct NoopMetricsSink;
94
95impl MetricsSink for NoopMetricsSink {
96    fn record(&self, _: MetricsEvent) {}
97}
98
99///
100/// GlobalMetricsSink
101///
102
103pub struct GlobalMetricsSink;
104
105impl MetricsSink for GlobalMetricsSink {
106    #[expect(clippy::too_many_lines)]
107    fn record(&self, event: MetricsEvent) {
108        match event {
109            MetricsEvent::ExecStart { kind, entity_path } => {
110                metrics::with_state_mut(|m| {
111                    match kind {
112                        ExecKind::Load => m.ops.load_calls = m.ops.load_calls.saturating_add(1),
113                        ExecKind::Save => m.ops.save_calls = m.ops.save_calls.saturating_add(1),
114                        ExecKind::Delete => {
115                            m.ops.delete_calls = m.ops.delete_calls.saturating_add(1);
116                        }
117                    }
118
119                    let entry = m.entities.entry(entity_path.to_string()).or_default();
120                    match kind {
121                        ExecKind::Load => {
122                            entry.load_calls = entry.load_calls.saturating_add(1);
123                        }
124                        ExecKind::Save => {
125                            entry.save_calls = entry.save_calls.saturating_add(1);
126                        }
127                        ExecKind::Delete => {
128                            entry.delete_calls = entry.delete_calls.saturating_add(1);
129                        }
130                    }
131                });
132            }
133
134            MetricsEvent::ExecFinish {
135                kind,
136                entity_path,
137                rows_touched,
138                inst_delta,
139            } => {
140                metrics::with_state_mut(|m| {
141                    match kind {
142                        ExecKind::Load => {
143                            m.ops.rows_loaded = m.ops.rows_loaded.saturating_add(rows_touched);
144                            metrics::add_instructions(
145                                &mut m.perf.load_inst_total,
146                                &mut m.perf.load_inst_max,
147                                inst_delta,
148                            );
149                        }
150                        ExecKind::Save => {
151                            metrics::add_instructions(
152                                &mut m.perf.save_inst_total,
153                                &mut m.perf.save_inst_max,
154                                inst_delta,
155                            );
156                        }
157                        ExecKind::Delete => {
158                            m.ops.rows_deleted = m.ops.rows_deleted.saturating_add(rows_touched);
159                            metrics::add_instructions(
160                                &mut m.perf.delete_inst_total,
161                                &mut m.perf.delete_inst_max,
162                                inst_delta,
163                            );
164                        }
165                    }
166
167                    let entry = m.entities.entry(entity_path.to_string()).or_default();
168                    match kind {
169                        ExecKind::Load => {
170                            entry.rows_loaded = entry.rows_loaded.saturating_add(rows_touched);
171                        }
172                        ExecKind::Delete => {
173                            entry.rows_deleted = entry.rows_deleted.saturating_add(rows_touched);
174                        }
175                        ExecKind::Save => {}
176                    }
177                });
178            }
179
180            MetricsEvent::RowsScanned {
181                entity_path,
182                rows_scanned,
183            } => {
184                metrics::with_state_mut(|m| {
185                    m.ops.rows_scanned = m.ops.rows_scanned.saturating_add(rows_scanned);
186                    let entry = m.entities.entry(entity_path.to_string()).or_default();
187                    entry.rows_scanned = entry.rows_scanned.saturating_add(rows_scanned);
188                });
189            }
190
191            MetricsEvent::UniqueViolation { entity_path } => {
192                metrics::with_state_mut(|m| {
193                    m.ops.unique_violations = m.ops.unique_violations.saturating_add(1);
194                    let entry = m.entities.entry(entity_path.to_string()).or_default();
195                    entry.unique_violations = entry.unique_violations.saturating_add(1);
196                });
197            }
198
199            MetricsEvent::IndexDelta {
200                entity_path,
201                inserts,
202                removes,
203            } => {
204                metrics::with_state_mut(|m| {
205                    m.ops.index_inserts = m.ops.index_inserts.saturating_add(inserts);
206                    m.ops.index_removes = m.ops.index_removes.saturating_add(removes);
207                    let entry = m.entities.entry(entity_path.to_string()).or_default();
208                    entry.index_inserts = entry.index_inserts.saturating_add(inserts);
209                    entry.index_removes = entry.index_removes.saturating_add(removes);
210                });
211            }
212
213            MetricsEvent::ReverseIndexDelta {
214                entity_path,
215                inserts,
216                removes,
217            } => {
218                metrics::with_state_mut(|m| {
219                    m.ops.reverse_index_inserts =
220                        m.ops.reverse_index_inserts.saturating_add(inserts);
221                    m.ops.reverse_index_removes =
222                        m.ops.reverse_index_removes.saturating_add(removes);
223                    let entry = m.entities.entry(entity_path.to_string()).or_default();
224                    entry.reverse_index_inserts =
225                        entry.reverse_index_inserts.saturating_add(inserts);
226                    entry.reverse_index_removes =
227                        entry.reverse_index_removes.saturating_add(removes);
228                });
229            }
230
231            MetricsEvent::RelationValidation {
232                entity_path,
233                reverse_lookups,
234                blocked_deletes,
235            } => {
236                metrics::with_state_mut(|m| {
237                    m.ops.relation_reverse_lookups = m
238                        .ops
239                        .relation_reverse_lookups
240                        .saturating_add(reverse_lookups);
241                    m.ops.relation_delete_blocks =
242                        m.ops.relation_delete_blocks.saturating_add(blocked_deletes);
243
244                    let entry = m.entities.entry(entity_path.to_string()).or_default();
245                    entry.relation_reverse_lookups = entry
246                        .relation_reverse_lookups
247                        .saturating_add(reverse_lookups);
248                    entry.relation_delete_blocks =
249                        entry.relation_delete_blocks.saturating_add(blocked_deletes);
250                });
251            }
252
253            MetricsEvent::Plan { kind } => {
254                metrics::with_state_mut(|m| match kind {
255                    PlanKind::Keys => m.ops.plan_keys = m.ops.plan_keys.saturating_add(1),
256                    PlanKind::Index => m.ops.plan_index = m.ops.plan_index.saturating_add(1),
257                    PlanKind::Range => m.ops.plan_range = m.ops.plan_range.saturating_add(1),
258                    PlanKind::FullScan => {
259                        m.ops.plan_full_scan = m.ops.plan_full_scan.saturating_add(1);
260                    }
261                });
262            }
263        }
264    }
265}
266
267pub const GLOBAL_METRICS_SINK: GlobalMetricsSink = GlobalMetricsSink;
268
269pub fn record(event: MetricsEvent) {
270    let override_ptr = SINK_OVERRIDE.with(|cell| *cell.borrow());
271    if let Some(ptr) = override_ptr {
272        // SAFETY:
273        // Preconditions:
274        // - `ptr` was produced from a valid `&dyn MetricsSink` in `with_metrics_sink`.
275        // - `with_metrics_sink` always restores the previous pointer before returning,
276        //   including unwind paths via `Guard::drop`.
277        // - `record` is synchronous and never stores `ptr` beyond this call.
278        //
279        // Aliasing:
280        // - We materialize only a shared reference (`&dyn MetricsSink`), matching the
281        //   original shared borrow used to install the override.
282        // - No mutable alias to the same sink is created here.
283        //
284        // What would break this:
285        // - If `with_metrics_sink` failed to restore on all exits (normal + panic),
286        //   `ptr` could outlive the borrowed sink and become dangling.
287        // - If `record` were changed to store or dispatch asynchronously using `ptr`,
288        //   lifetime assumptions would no longer hold.
289        unsafe { (&*ptr).record(event) };
290    } else {
291        GLOBAL_METRICS_SINK.record(event);
292    }
293}
294
295/// Snapshot the current metrics state for endpoint/test plumbing.
296///
297/// `window_start_ms` filters by window start (`EventState::window_start_ms`),
298/// not by per-event timestamps.
299#[must_use]
300pub fn metrics_report(window_start_ms: Option<u64>) -> metrics::EventReport {
301    metrics::report_window_start(window_start_ms)
302}
303
304/// Reset ephemeral metrics counters.
305pub fn metrics_reset() {
306    metrics::reset();
307}
308
309/// Reset all metrics state (counters + perf).
310pub fn metrics_reset_all() {
311    metrics::reset_all();
312}
313
314/// Run a closure with a temporary metrics sink override.
315pub fn with_metrics_sink<T>(sink: &dyn MetricsSink, f: impl FnOnce() -> T) -> T {
316    struct Guard(Option<*const dyn MetricsSink>);
317
318    impl Drop for Guard {
319        fn drop(&mut self) {
320            SINK_OVERRIDE.with(|cell| {
321                *cell.borrow_mut() = self.0;
322            });
323        }
324    }
325
326    // SAFETY:
327    // Preconditions:
328    // - `sink_ptr` is installed only for this dynamic scope.
329    // - `Guard` always restores the previous slot on all exits, including panic.
330    // - `record` only dereferences synchronously and never persists `sink_ptr`.
331    //
332    // Aliasing:
333    // - We erase lifetime to a raw pointer, but still only expose shared access.
334    // - No mutable alias to the same sink is introduced by this conversion.
335    //
336    // What would break this:
337    // - Any async/deferred use of `sink_ptr` beyond this scope.
338    // - Any path that bypasses Guard restoration.
339    let sink_ptr = unsafe { std::mem::transmute::<&dyn MetricsSink, *const dyn MetricsSink>(sink) };
340    let prev = SINK_OVERRIDE.with(|cell| {
341        let mut slot = cell.borrow_mut();
342        slot.replace(sink_ptr)
343    });
344    let _guard = Guard(prev);
345
346    f()
347}
348
349///
350/// Span
351/// RAII guard to simplify metrics instrumentation
352///
353
354pub(crate) struct Span<E: EntityKind> {
355    kind: ExecKind,
356    start: u64,
357    rows: u64,
358    finished: bool,
359    _marker: PhantomData<E>,
360}
361
362#[allow(clippy::missing_const_for_fn)]
363fn read_perf_counter() -> u64 {
364    #[cfg(target_arch = "wasm32")]
365    {
366        canic_cdk::api::performance_counter(1)
367    }
368    #[cfg(not(target_arch = "wasm32"))]
369    {
370        0
371    }
372}
373
374impl<E: EntityKind> Span<E> {
375    #[must_use]
376    /// Start a metrics span for a specific entity and executor kind.
377    pub(crate) fn new(kind: ExecKind) -> Self {
378        record(MetricsEvent::ExecStart {
379            kind,
380            entity_path: E::PATH,
381        });
382
383        Self {
384            kind,
385            start: read_perf_counter(),
386            rows: 0,
387            finished: false,
388            _marker: PhantomData,
389        }
390    }
391
392    pub(crate) const fn set_rows(&mut self, rows: u64) {
393        self.rows = rows;
394    }
395
396    fn finish_inner(&self) {
397        let now = read_perf_counter();
398        let delta = now.saturating_sub(self.start);
399
400        record(MetricsEvent::ExecFinish {
401            kind: self.kind,
402            entity_path: E::PATH,
403            rows_touched: self.rows,
404            inst_delta: delta,
405        });
406    }
407}
408
409impl<E: EntityKind> Drop for Span<E> {
410    fn drop(&mut self) {
411        if !self.finished {
412            self.finish_inner();
413            self.finished = true;
414        }
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use std::panic::{AssertUnwindSafe, catch_unwind};
422    use std::sync::atomic::{AtomicUsize, Ordering};
423
424    struct CountingSink<'a> {
425        calls: &'a AtomicUsize,
426    }
427
428    impl MetricsSink for CountingSink<'_> {
429        fn record(&self, _: MetricsEvent) {
430            self.calls.fetch_add(1, Ordering::SeqCst);
431        }
432    }
433
434    #[test]
435    fn with_metrics_sink_routes_and_restores_nested_overrides() {
436        SINK_OVERRIDE.with(|cell| {
437            *cell.borrow_mut() = None;
438        });
439
440        let outer_calls = AtomicUsize::new(0);
441        let inner_calls = AtomicUsize::new(0);
442        let outer = CountingSink {
443            calls: &outer_calls,
444        };
445        let inner = CountingSink {
446            calls: &inner_calls,
447        };
448
449        // No override installed yet.
450        record(MetricsEvent::Plan {
451            kind: PlanKind::Keys,
452        });
453        assert_eq!(outer_calls.load(Ordering::SeqCst), 0);
454        assert_eq!(inner_calls.load(Ordering::SeqCst), 0);
455
456        with_metrics_sink(&outer, || {
457            record(MetricsEvent::Plan {
458                kind: PlanKind::Index,
459            });
460            assert_eq!(outer_calls.load(Ordering::SeqCst), 1);
461            assert_eq!(inner_calls.load(Ordering::SeqCst), 0);
462
463            with_metrics_sink(&inner, || {
464                record(MetricsEvent::Plan {
465                    kind: PlanKind::Range,
466                });
467            });
468
469            // Inner override was restored to outer override.
470            record(MetricsEvent::Plan {
471                kind: PlanKind::FullScan,
472            });
473        });
474
475        assert_eq!(outer_calls.load(Ordering::SeqCst), 2);
476        assert_eq!(inner_calls.load(Ordering::SeqCst), 1);
477
478        // Outer override was restored to previous (none).
479        SINK_OVERRIDE.with(|cell| {
480            assert!(cell.borrow().is_none());
481        });
482
483        record(MetricsEvent::Plan {
484            kind: PlanKind::Keys,
485        });
486        assert_eq!(outer_calls.load(Ordering::SeqCst), 2);
487        assert_eq!(inner_calls.load(Ordering::SeqCst), 1);
488    }
489
490    #[test]
491    fn with_metrics_sink_restores_override_on_panic() {
492        SINK_OVERRIDE.with(|cell| {
493            *cell.borrow_mut() = None;
494        });
495
496        let calls = AtomicUsize::new(0);
497        let sink = CountingSink { calls: &calls };
498
499        let panicked = catch_unwind(AssertUnwindSafe(|| {
500            with_metrics_sink(&sink, || {
501                record(MetricsEvent::Plan {
502                    kind: PlanKind::Index,
503                });
504                panic!("intentional panic for guard test");
505            });
506        }))
507        .is_err();
508        assert!(panicked);
509        assert_eq!(calls.load(Ordering::SeqCst), 1);
510
511        // Guard restored TLS slot after unwind.
512        SINK_OVERRIDE.with(|cell| {
513            assert!(cell.borrow().is_none());
514        });
515
516        record(MetricsEvent::Plan {
517            kind: PlanKind::Range,
518        });
519        assert_eq!(calls.load(Ordering::SeqCst), 1);
520    }
521
522    #[test]
523    fn metrics_report_without_window_start_returns_counters() {
524        metrics_reset_all();
525        record(MetricsEvent::Plan {
526            kind: PlanKind::Index,
527        });
528
529        let report = metrics_report(None);
530        let counters = report
531            .counters
532            .expect("metrics report should include counters without since filter");
533        assert_eq!(counters.ops.plan_index, 1);
534    }
535
536    #[test]
537    fn metrics_report_window_start_before_window_returns_counters() {
538        metrics_reset_all();
539        let window_start = metrics::with_state(|m| m.window_start_ms);
540        record(MetricsEvent::Plan {
541            kind: PlanKind::Keys,
542        });
543
544        let report = metrics_report(Some(window_start.saturating_sub(1)));
545        let counters = report
546            .counters
547            .expect("metrics report should include counters when window_start_ms is before window");
548        assert_eq!(counters.ops.plan_keys, 1);
549    }
550
551    #[test]
552    fn metrics_report_window_start_after_window_returns_empty() {
553        metrics_reset_all();
554        let window_start = metrics::with_state(|m| m.window_start_ms);
555        record(MetricsEvent::Plan {
556            kind: PlanKind::FullScan,
557        });
558
559        let report = metrics_report(Some(window_start.saturating_add(1)));
560        assert!(report.counters.is_none());
561        assert!(report.entity_counters.is_empty());
562    }
563
564    #[test]
565    fn reverse_and_relation_metrics_events_accumulate() {
566        metrics_reset_all();
567
568        record(MetricsEvent::ReverseIndexDelta {
569            entity_path: "obs::tests::Entity",
570            inserts: 3,
571            removes: 2,
572        });
573        record(MetricsEvent::RelationValidation {
574            entity_path: "obs::tests::Entity",
575            reverse_lookups: 5,
576            blocked_deletes: 1,
577        });
578
579        let counters = metrics_report(None)
580            .counters
581            .expect("metrics report should include counters");
582        assert_eq!(counters.ops.reverse_index_inserts, 3);
583        assert_eq!(counters.ops.reverse_index_removes, 2);
584        assert_eq!(counters.ops.relation_reverse_lookups, 5);
585        assert_eq!(counters.ops.relation_delete_blocks, 1);
586
587        let entity = counters
588            .entities
589            .get("obs::tests::Entity")
590            .expect("entity counters should be present");
591        assert_eq!(entity.reverse_index_inserts, 3);
592        assert_eq!(entity.reverse_index_removes, 2);
593        assert_eq!(entity.relation_reverse_lookups, 5);
594        assert_eq!(entity.relation_delete_blocks, 1);
595    }
596}