Skip to main content

icydb_core/obs/
sink.rs

1//! Metrics sink boundary.
2//!
3//! Core DB logic MUST NOT depend on obs::metrics directly.
4//! All instrumentation flows through MetricsEvent and MetricsSink.
5//!
6//! This module is the only allowed bridge between execution logic
7//! and the global metrics state.
8use crate::{obs::metrics, traits::EntityKind};
9use std::{cell::RefCell, marker::PhantomData};
10
11thread_local! {
12    static SINK_OVERRIDE: RefCell<Option<*const dyn MetricsSink>> = RefCell::new(None);
13}
14
15///
16/// ExecKind
17///
18
19#[derive(Clone, Copy, Debug)]
20pub enum ExecKind {
21    Load,
22    Save,
23    Delete,
24}
25
26///
27/// PlanKind
28///
29
30#[derive(Clone, Copy, Debug)]
31pub enum PlanKind {
32    Keys,
33    Index,
34    Range,
35    FullScan,
36}
37
38///
39/// MetricsEvent
40///
41
42#[derive(Clone, Copy, Debug)]
43pub enum MetricsEvent {
44    ExecStart {
45        kind: ExecKind,
46        entity_path: &'static str,
47    },
48    ExecFinish {
49        kind: ExecKind,
50        entity_path: &'static str,
51        rows_touched: u64,
52        inst_delta: u64,
53    },
54    RowsScanned {
55        entity_path: &'static str,
56        rows_scanned: u64,
57    },
58    UniqueViolation {
59        entity_path: &'static str,
60    },
61    IndexInsert {
62        entity_path: &'static str,
63    },
64    IndexRemove {
65        entity_path: &'static str,
66    },
67    Plan {
68        kind: PlanKind,
69    },
70}
71
72///
73/// MetricsSink
74///
75
76pub trait MetricsSink {
77    fn record(&self, event: MetricsEvent);
78}
79
80///
81/// NoopMetricsSink
82///
83
84pub struct NoopMetricsSink;
85
86impl MetricsSink for NoopMetricsSink {
87    fn record(&self, _: MetricsEvent) {}
88}
89
90///
91/// GlobalMetricsSink
92///
93
94pub struct GlobalMetricsSink;
95
96impl MetricsSink for GlobalMetricsSink {
97    #[expect(clippy::too_many_lines)]
98    fn record(&self, event: MetricsEvent) {
99        match event {
100            MetricsEvent::ExecStart { kind, entity_path } => {
101                metrics::with_state_mut(|m| {
102                    match kind {
103                        ExecKind::Load => m.ops.load_calls = m.ops.load_calls.saturating_add(1),
104                        ExecKind::Save => m.ops.save_calls = m.ops.save_calls.saturating_add(1),
105                        ExecKind::Delete => {
106                            m.ops.delete_calls = m.ops.delete_calls.saturating_add(1);
107                        }
108                    }
109
110                    let entry = m.entities.entry(entity_path.to_string()).or_default();
111                    match kind {
112                        ExecKind::Load => {
113                            entry.load_calls = entry.load_calls.saturating_add(1);
114                        }
115                        ExecKind::Save => {
116                            entry.save_calls = entry.save_calls.saturating_add(1);
117                        }
118                        ExecKind::Delete => {
119                            entry.delete_calls = entry.delete_calls.saturating_add(1);
120                        }
121                    }
122                });
123            }
124
125            MetricsEvent::ExecFinish {
126                kind,
127                entity_path,
128                rows_touched,
129                inst_delta,
130            } => {
131                metrics::with_state_mut(|m| {
132                    match kind {
133                        ExecKind::Load => {
134                            m.ops.rows_loaded = m.ops.rows_loaded.saturating_add(rows_touched);
135                            metrics::add_instructions(
136                                &mut m.perf.load_inst_total,
137                                &mut m.perf.load_inst_max,
138                                inst_delta,
139                            );
140                        }
141                        ExecKind::Save => {
142                            metrics::add_instructions(
143                                &mut m.perf.save_inst_total,
144                                &mut m.perf.save_inst_max,
145                                inst_delta,
146                            );
147                        }
148                        ExecKind::Delete => {
149                            m.ops.rows_deleted = m.ops.rows_deleted.saturating_add(rows_touched);
150                            metrics::add_instructions(
151                                &mut m.perf.delete_inst_total,
152                                &mut m.perf.delete_inst_max,
153                                inst_delta,
154                            );
155                        }
156                    }
157
158                    let entry = m.entities.entry(entity_path.to_string()).or_default();
159                    match kind {
160                        ExecKind::Load => {
161                            entry.rows_loaded = entry.rows_loaded.saturating_add(rows_touched);
162                        }
163                        ExecKind::Delete => {
164                            entry.rows_deleted = entry.rows_deleted.saturating_add(rows_touched);
165                        }
166                        ExecKind::Save => {}
167                    }
168                });
169            }
170
171            MetricsEvent::RowsScanned {
172                entity_path,
173                rows_scanned,
174            } => {
175                metrics::with_state_mut(|m| {
176                    m.ops.rows_scanned = m.ops.rows_scanned.saturating_add(rows_scanned);
177                    let entry = m.entities.entry(entity_path.to_string()).or_default();
178                    entry.rows_scanned = entry.rows_scanned.saturating_add(rows_scanned);
179                });
180            }
181
182            MetricsEvent::UniqueViolation { entity_path } => {
183                metrics::with_state_mut(|m| {
184                    m.ops.unique_violations = m.ops.unique_violations.saturating_add(1);
185                    let entry = m.entities.entry(entity_path.to_string()).or_default();
186                    entry.unique_violations = entry.unique_violations.saturating_add(1);
187                });
188            }
189
190            MetricsEvent::IndexInsert { entity_path } => {
191                metrics::with_state_mut(|m| {
192                    m.ops.index_inserts = m.ops.index_inserts.saturating_add(1);
193                    let entry = m.entities.entry(entity_path.to_string()).or_default();
194                    entry.index_inserts = entry.index_inserts.saturating_add(1);
195                });
196            }
197
198            MetricsEvent::IndexRemove { entity_path } => {
199                metrics::with_state_mut(|m| {
200                    m.ops.index_removes = m.ops.index_removes.saturating_add(1);
201                    let entry = m.entities.entry(entity_path.to_string()).or_default();
202                    entry.index_removes = entry.index_removes.saturating_add(1);
203                });
204            }
205
206            MetricsEvent::Plan { kind } => {
207                metrics::with_state_mut(|m| match kind {
208                    PlanKind::Keys => m.ops.plan_keys = m.ops.plan_keys.saturating_add(1),
209                    PlanKind::Index => m.ops.plan_index = m.ops.plan_index.saturating_add(1),
210                    PlanKind::Range => m.ops.plan_range = m.ops.plan_range.saturating_add(1),
211                    PlanKind::FullScan => {
212                        m.ops.plan_full_scan = m.ops.plan_full_scan.saturating_add(1);
213                    }
214                });
215            }
216        }
217    }
218}
219
220pub const GLOBAL_METRICS_SINK: GlobalMetricsSink = GlobalMetricsSink;
221
222pub fn record(event: MetricsEvent) {
223    let override_ptr = SINK_OVERRIDE.with(|cell| *cell.borrow());
224    if let Some(ptr) = override_ptr {
225        // SAFETY: override is scoped by with_metrics_sink and only used synchronously.
226        unsafe { (&*ptr).record(event) };
227    } else {
228        GLOBAL_METRICS_SINK.record(event);
229    }
230}
231
232/// Snapshot the current metrics state for endpoint/test plumbing.
233#[must_use]
234pub fn metrics_report() -> metrics::EventReport {
235    metrics::report()
236}
237
238/// Reset ephemeral metrics counters.
239pub fn metrics_reset() {
240    metrics::reset();
241}
242
243/// Reset all metrics state (counters + perf).
244pub fn metrics_reset_all() {
245    metrics::reset_all();
246}
247
248/// Run a closure with a temporary metrics sink override.
249pub fn with_metrics_sink<T>(sink: &dyn MetricsSink, f: impl FnOnce() -> T) -> T {
250    struct Guard(Option<*const dyn MetricsSink>);
251
252    impl Drop for Guard {
253        fn drop(&mut self) {
254            SINK_OVERRIDE.with(|cell| {
255                *cell.borrow_mut() = self.0;
256            });
257        }
258    }
259
260    // SAFETY: we erase the reference lifetime for scoped storage in TLS and
261    // restore the previous value on scope exit via Guard.
262    let sink_ptr = unsafe { std::mem::transmute::<&dyn MetricsSink, *const dyn MetricsSink>(sink) };
263    let prev = SINK_OVERRIDE.with(|cell| {
264        let mut slot = cell.borrow_mut();
265        slot.replace(sink_ptr)
266    });
267    let _guard = Guard(prev);
268
269    f()
270}
271
272///
273/// Span
274/// RAII guard to simplify metrics instrumentation
275///
276
277pub(crate) struct Span<E: EntityKind> {
278    kind: ExecKind,
279    start: u64,
280    rows: u64,
281    finished: bool,
282    _marker: PhantomData<E>,
283}
284
285#[allow(clippy::missing_const_for_fn)]
286fn read_perf_counter() -> u64 {
287    #[cfg(target_arch = "wasm32")]
288    {
289        canic_cdk::api::performance_counter(1)
290    }
291    #[cfg(not(target_arch = "wasm32"))]
292    {
293        0
294    }
295}
296
297impl<E: EntityKind> Span<E> {
298    #[must_use]
299    /// Start a metrics span for a specific entity and executor kind.
300    pub(crate) fn new(kind: ExecKind) -> Self {
301        record(MetricsEvent::ExecStart {
302            kind,
303            entity_path: E::PATH,
304        });
305
306        Self {
307            kind,
308            start: read_perf_counter(),
309            rows: 0,
310            finished: false,
311            _marker: PhantomData,
312        }
313    }
314
315    pub(crate) const fn set_rows(&mut self, rows: u64) {
316        self.rows = rows;
317    }
318
319    fn finish_inner(&self) {
320        let now = read_perf_counter();
321        let delta = now.saturating_sub(self.start);
322
323        record(MetricsEvent::ExecFinish {
324            kind: self.kind,
325            entity_path: E::PATH,
326            rows_touched: self.rows,
327            inst_delta: delta,
328        });
329    }
330}
331
332impl<E: EntityKind> Drop for Span<E> {
333    fn drop(&mut self) {
334        if !self.finished {
335            self.finish_inner();
336            self.finished = true;
337        }
338    }
339}