Skip to main content

icydb_core/obs/
sink.rs

1//! Metrics sink boundary.
2//!
3//! Core DB logic MUST NOT depend on obs::metrics directly.
4//! All instrumentation flows through MetricsEvent and MetricsSink.
5//!
6//! This module is the only allowed bridge between execution logic
7//! and the global metrics state.
8use crate::{obs::metrics, traits::EntityKind};
9use std::{cell::RefCell, marker::PhantomData};
10
11thread_local! {
12    static SINK_OVERRIDE: RefCell<Option<*const dyn MetricsSink>> = RefCell::new(None);
13}
14
15///
16/// ExecKind
17///
18
19#[derive(Clone, Copy, Debug)]
20pub enum ExecKind {
21    Load,
22    Save,
23    Delete,
24}
25
26///
27/// PlanKind
28///
29
30#[derive(Clone, Copy, Debug)]
31pub enum PlanKind {
32    Keys,
33    Index,
34    Range,
35    FullScan,
36}
37
38///
39/// MetricsEvent
40///
41
42#[derive(Clone, Copy, Debug)]
43pub enum MetricsEvent {
44    ExecStart {
45        kind: ExecKind,
46        entity_path: &'static str,
47    },
48    ExecFinish {
49        kind: ExecKind,
50        entity_path: &'static str,
51        rows_touched: u64,
52        inst_delta: u64,
53    },
54    RowsScanned {
55        entity_path: &'static str,
56        rows_scanned: u64,
57    },
58    ExistsCall {
59        entity_path: &'static str,
60    },
61    UniqueViolation {
62        entity_path: &'static str,
63    },
64    IndexInsert {
65        entity_path: &'static str,
66    },
67    IndexRemove {
68        entity_path: &'static str,
69    },
70    Plan {
71        kind: PlanKind,
72    },
73}
74
75///
76/// MetricsSink
77///
78
79pub trait MetricsSink {
80    fn record(&self, event: MetricsEvent);
81}
82
83///
84/// NoopMetricsSink
85///
86
87pub struct NoopMetricsSink;
88
89impl MetricsSink for NoopMetricsSink {
90    fn record(&self, _: MetricsEvent) {}
91}
92
93///
94/// GlobalMetricsSink
95///
96
97pub struct GlobalMetricsSink;
98
99impl MetricsSink for GlobalMetricsSink {
100    #[expect(clippy::too_many_lines)]
101    fn record(&self, event: MetricsEvent) {
102        match event {
103            MetricsEvent::ExecStart { kind, entity_path } => {
104                metrics::with_state_mut(|m| {
105                    match kind {
106                        ExecKind::Load => m.ops.load_calls = m.ops.load_calls.saturating_add(1),
107                        ExecKind::Save => m.ops.save_calls = m.ops.save_calls.saturating_add(1),
108                        ExecKind::Delete => {
109                            m.ops.delete_calls = m.ops.delete_calls.saturating_add(1);
110                        }
111                    }
112
113                    let entry = m.entities.entry(entity_path.to_string()).or_default();
114                    match kind {
115                        ExecKind::Load => {
116                            entry.load_calls = entry.load_calls.saturating_add(1);
117                        }
118                        ExecKind::Save => {
119                            entry.save_calls = entry.save_calls.saturating_add(1);
120                        }
121                        ExecKind::Delete => {
122                            entry.delete_calls = entry.delete_calls.saturating_add(1);
123                        }
124                    }
125                });
126            }
127
128            MetricsEvent::ExecFinish {
129                kind,
130                entity_path,
131                rows_touched,
132                inst_delta,
133            } => {
134                metrics::with_state_mut(|m| {
135                    match kind {
136                        ExecKind::Load => {
137                            m.ops.rows_loaded = m.ops.rows_loaded.saturating_add(rows_touched);
138                            metrics::add_instructions(
139                                &mut m.perf.load_inst_total,
140                                &mut m.perf.load_inst_max,
141                                inst_delta,
142                            );
143                        }
144                        ExecKind::Save => {
145                            metrics::add_instructions(
146                                &mut m.perf.save_inst_total,
147                                &mut m.perf.save_inst_max,
148                                inst_delta,
149                            );
150                        }
151                        ExecKind::Delete => {
152                            m.ops.rows_deleted = m.ops.rows_deleted.saturating_add(rows_touched);
153                            metrics::add_instructions(
154                                &mut m.perf.delete_inst_total,
155                                &mut m.perf.delete_inst_max,
156                                inst_delta,
157                            );
158                        }
159                    }
160
161                    let entry = m.entities.entry(entity_path.to_string()).or_default();
162                    match kind {
163                        ExecKind::Load => {
164                            entry.rows_loaded = entry.rows_loaded.saturating_add(rows_touched);
165                        }
166                        ExecKind::Delete => {
167                            entry.rows_deleted = entry.rows_deleted.saturating_add(rows_touched);
168                        }
169                        ExecKind::Save => {}
170                    }
171                });
172            }
173
174            MetricsEvent::RowsScanned {
175                entity_path,
176                rows_scanned,
177            } => {
178                metrics::with_state_mut(|m| {
179                    m.ops.rows_scanned = m.ops.rows_scanned.saturating_add(rows_scanned);
180                    let entry = m.entities.entry(entity_path.to_string()).or_default();
181                    entry.rows_scanned = entry.rows_scanned.saturating_add(rows_scanned);
182                });
183            }
184
185            MetricsEvent::ExistsCall { entity_path } => {
186                metrics::with_state_mut(|m| {
187                    m.ops.exists_calls = m.ops.exists_calls.saturating_add(1);
188                    let entry = m.entities.entry(entity_path.to_string()).or_default();
189                    entry.exists_calls = entry.exists_calls.saturating_add(1);
190                });
191            }
192
193            MetricsEvent::UniqueViolation { entity_path } => {
194                metrics::with_state_mut(|m| {
195                    m.ops.unique_violations = m.ops.unique_violations.saturating_add(1);
196                    let entry = m.entities.entry(entity_path.to_string()).or_default();
197                    entry.unique_violations = entry.unique_violations.saturating_add(1);
198                });
199            }
200
201            MetricsEvent::IndexInsert { entity_path } => {
202                metrics::with_state_mut(|m| {
203                    m.ops.index_inserts = m.ops.index_inserts.saturating_add(1);
204                    let entry = m.entities.entry(entity_path.to_string()).or_default();
205                    entry.index_inserts = entry.index_inserts.saturating_add(1);
206                });
207            }
208
209            MetricsEvent::IndexRemove { entity_path } => {
210                metrics::with_state_mut(|m| {
211                    m.ops.index_removes = m.ops.index_removes.saturating_add(1);
212                    let entry = m.entities.entry(entity_path.to_string()).or_default();
213                    entry.index_removes = entry.index_removes.saturating_add(1);
214                });
215            }
216
217            MetricsEvent::Plan { kind } => {
218                metrics::with_state_mut(|m| match kind {
219                    PlanKind::Keys => m.ops.plan_keys = m.ops.plan_keys.saturating_add(1),
220                    PlanKind::Index => m.ops.plan_index = m.ops.plan_index.saturating_add(1),
221                    PlanKind::Range => m.ops.plan_range = m.ops.plan_range.saturating_add(1),
222                    PlanKind::FullScan => {
223                        m.ops.plan_full_scan = m.ops.plan_full_scan.saturating_add(1);
224                    }
225                });
226            }
227        }
228    }
229}
230
231pub const GLOBAL_METRICS_SINK: GlobalMetricsSink = GlobalMetricsSink;
232
233pub fn record(event: MetricsEvent) {
234    let override_ptr = SINK_OVERRIDE.with(|cell| *cell.borrow());
235    if let Some(ptr) = override_ptr {
236        // SAFETY: override is scoped by with_metrics_sink and only used synchronously.
237        unsafe { (&*ptr).record(event) };
238    } else {
239        GLOBAL_METRICS_SINK.record(event);
240    }
241}
242
243/// Snapshot the current metrics state for endpoint/test plumbing.
244#[must_use]
245pub fn metrics_report() -> metrics::EventReport {
246    metrics::report()
247}
248
249/// Reset ephemeral metrics counters.
250pub fn metrics_reset() {
251    metrics::reset();
252}
253
254/// Reset all metrics state (counters + perf).
255pub fn metrics_reset_all() {
256    metrics::reset_all();
257}
258
259/// Run a closure with a temporary metrics sink override.
260pub fn with_metrics_sink<T>(sink: &dyn MetricsSink, f: impl FnOnce() -> T) -> T {
261    struct Guard(Option<*const dyn MetricsSink>);
262
263    impl Drop for Guard {
264        fn drop(&mut self) {
265            SINK_OVERRIDE.with(|cell| {
266                *cell.borrow_mut() = self.0;
267            });
268        }
269    }
270
271    // SAFETY: we erase the reference lifetime for scoped storage in TLS and
272    // restore the previous value on scope exit via Guard.
273    let sink_ptr = unsafe { std::mem::transmute::<&dyn MetricsSink, *const dyn MetricsSink>(sink) };
274    let prev = SINK_OVERRIDE.with(|cell| {
275        let mut slot = cell.borrow_mut();
276        slot.replace(sink_ptr)
277    });
278    let _guard = Guard(prev);
279
280    f()
281}
282
283///
284/// Span
285/// RAII guard to simplify metrics instrumentation
286///
287
288pub(crate) struct Span<E: EntityKind> {
289    kind: ExecKind,
290    start: u64,
291    rows: u64,
292    finished: bool,
293    _marker: PhantomData<E>,
294}
295
296#[allow(clippy::missing_const_for_fn)]
297fn read_perf_counter() -> u64 {
298    #[cfg(target_arch = "wasm32")]
299    {
300        canic_cdk::api::performance_counter(1)
301    }
302    #[cfg(not(target_arch = "wasm32"))]
303    {
304        0
305    }
306}
307
308impl<E: EntityKind> Span<E> {
309    #[must_use]
310    /// Start a metrics span for a specific entity and executor kind.
311    pub(crate) fn new(kind: ExecKind) -> Self {
312        record(MetricsEvent::ExecStart {
313            kind,
314            entity_path: E::PATH,
315        });
316
317        Self {
318            kind,
319            start: read_perf_counter(),
320            rows: 0,
321            finished: false,
322            _marker: PhantomData,
323        }
324    }
325
326    pub(crate) const fn set_rows(&mut self, rows: u64) {
327        self.rows = rows;
328    }
329
330    fn finish_inner(&self) {
331        let now = read_perf_counter();
332        let delta = now.saturating_sub(self.start);
333
334        record(MetricsEvent::ExecFinish {
335            kind: self.kind,
336            entity_path: E::PATH,
337            rows_touched: self.rows,
338            inst_delta: delta,
339        });
340    }
341}
342
343impl<E: EntityKind> Drop for Span<E> {
344    fn drop(&mut self) {
345        if !self.finished {
346            self.finish_inner();
347            self.finished = true;
348        }
349    }
350}