icydb_core/obs/
sink.rs

1//! Metrics sink boundary.
2//!
3//! Core DB logic MUST NOT depend on obs::metrics directly.
4//! All instrumentation flows through MetricsEvent and MetricsSink.
5//!
6//! This module is the only allowed bridge between execution logic
7//! and the global metrics state.
8use crate::{obs::metrics, traits::EntityKind};
9use canic_cdk::api::performance_counter;
10use std::{cell::RefCell, marker::PhantomData};
11
12thread_local! {
13    static SINK_OVERRIDE: RefCell<Option<*const dyn MetricsSink>> = RefCell::new(None);
14}
15
16///
17/// ExecKind
18///
19
20#[derive(Clone, Copy, Debug)]
21pub enum ExecKind {
22    Load,
23    Save,
24    Delete,
25}
26
27///
28/// PlanKind
29///
30
31#[derive(Clone, Copy, Debug)]
32pub enum PlanKind {
33    Keys,
34    Index,
35    Range,
36    FullScan,
37}
38
39///
40/// MetricsEvent
41///
42
43#[derive(Clone, Copy, Debug)]
44pub enum MetricsEvent {
45    ExecStart {
46        kind: ExecKind,
47        entity_path: &'static str,
48    },
49    ExecFinish {
50        kind: ExecKind,
51        entity_path: &'static str,
52        rows_touched: u64,
53        inst_delta: u64,
54    },
55    RowsScanned {
56        entity_path: &'static str,
57        rows_scanned: u64,
58    },
59    ExistsCall {
60        entity_path: &'static str,
61    },
62    UniqueViolation {
63        entity_path: &'static str,
64    },
65    IndexInsert {
66        entity_path: &'static str,
67    },
68    IndexRemove {
69        entity_path: &'static str,
70    },
71    Plan {
72        kind: PlanKind,
73    },
74}
75
76///
77/// MetricsSink
78///
79
80pub trait MetricsSink {
81    fn record(&self, event: MetricsEvent);
82}
83
84///
85/// NoopMetricsSink
86///
87
88pub struct NoopMetricsSink;
89
90impl MetricsSink for NoopMetricsSink {
91    fn record(&self, _: MetricsEvent) {}
92}
93
94///
95/// GlobalMetricsSink
96///
97
98pub struct GlobalMetricsSink;
99
100impl MetricsSink for GlobalMetricsSink {
101    #[expect(clippy::too_many_lines)]
102    fn record(&self, event: MetricsEvent) {
103        match event {
104            MetricsEvent::ExecStart { kind, entity_path } => {
105                metrics::with_state_mut(|m| {
106                    match kind {
107                        ExecKind::Load => m.ops.load_calls = m.ops.load_calls.saturating_add(1),
108                        ExecKind::Save => m.ops.save_calls = m.ops.save_calls.saturating_add(1),
109                        ExecKind::Delete => {
110                            m.ops.delete_calls = m.ops.delete_calls.saturating_add(1);
111                        }
112                    }
113
114                    let entry = m.entities.entry(entity_path.to_string()).or_default();
115                    match kind {
116                        ExecKind::Load => {
117                            entry.load_calls = entry.load_calls.saturating_add(1);
118                        }
119                        ExecKind::Save => {
120                            entry.save_calls = entry.save_calls.saturating_add(1);
121                        }
122                        ExecKind::Delete => {
123                            entry.delete_calls = entry.delete_calls.saturating_add(1);
124                        }
125                    }
126                });
127            }
128
129            MetricsEvent::ExecFinish {
130                kind,
131                entity_path,
132                rows_touched,
133                inst_delta,
134            } => {
135                metrics::with_state_mut(|m| {
136                    match kind {
137                        ExecKind::Load => {
138                            m.ops.rows_loaded = m.ops.rows_loaded.saturating_add(rows_touched);
139                            metrics::add_instructions(
140                                &mut m.perf.load_inst_total,
141                                &mut m.perf.load_inst_max,
142                                inst_delta,
143                            );
144                        }
145                        ExecKind::Save => {
146                            metrics::add_instructions(
147                                &mut m.perf.save_inst_total,
148                                &mut m.perf.save_inst_max,
149                                inst_delta,
150                            );
151                        }
152                        ExecKind::Delete => {
153                            m.ops.rows_deleted = m.ops.rows_deleted.saturating_add(rows_touched);
154                            metrics::add_instructions(
155                                &mut m.perf.delete_inst_total,
156                                &mut m.perf.delete_inst_max,
157                                inst_delta,
158                            );
159                        }
160                    }
161
162                    let entry = m.entities.entry(entity_path.to_string()).or_default();
163                    match kind {
164                        ExecKind::Load => {
165                            entry.rows_loaded = entry.rows_loaded.saturating_add(rows_touched);
166                        }
167                        ExecKind::Delete => {
168                            entry.rows_deleted = entry.rows_deleted.saturating_add(rows_touched);
169                        }
170                        ExecKind::Save => {}
171                    }
172                });
173            }
174
175            MetricsEvent::RowsScanned {
176                entity_path,
177                rows_scanned,
178            } => {
179                metrics::with_state_mut(|m| {
180                    m.ops.rows_scanned = m.ops.rows_scanned.saturating_add(rows_scanned);
181                    let entry = m.entities.entry(entity_path.to_string()).or_default();
182                    entry.rows_scanned = entry.rows_scanned.saturating_add(rows_scanned);
183                });
184            }
185
186            MetricsEvent::ExistsCall { entity_path } => {
187                metrics::with_state_mut(|m| {
188                    m.ops.exists_calls = m.ops.exists_calls.saturating_add(1);
189                    let entry = m.entities.entry(entity_path.to_string()).or_default();
190                    entry.exists_calls = entry.exists_calls.saturating_add(1);
191                });
192            }
193
194            MetricsEvent::UniqueViolation { entity_path } => {
195                metrics::with_state_mut(|m| {
196                    m.ops.unique_violations = m.ops.unique_violations.saturating_add(1);
197                    let entry = m.entities.entry(entity_path.to_string()).or_default();
198                    entry.unique_violations = entry.unique_violations.saturating_add(1);
199                });
200            }
201
202            MetricsEvent::IndexInsert { entity_path } => {
203                metrics::with_state_mut(|m| {
204                    m.ops.index_inserts = m.ops.index_inserts.saturating_add(1);
205                    let entry = m.entities.entry(entity_path.to_string()).or_default();
206                    entry.index_inserts = entry.index_inserts.saturating_add(1);
207                });
208            }
209
210            MetricsEvent::IndexRemove { entity_path } => {
211                metrics::with_state_mut(|m| {
212                    m.ops.index_removes = m.ops.index_removes.saturating_add(1);
213                    let entry = m.entities.entry(entity_path.to_string()).or_default();
214                    entry.index_removes = entry.index_removes.saturating_add(1);
215                });
216            }
217
218            MetricsEvent::Plan { kind } => {
219                metrics::with_state_mut(|m| match kind {
220                    PlanKind::Keys => m.ops.plan_keys = m.ops.plan_keys.saturating_add(1),
221                    PlanKind::Index => m.ops.plan_index = m.ops.plan_index.saturating_add(1),
222                    PlanKind::Range => m.ops.plan_range = m.ops.plan_range.saturating_add(1),
223                    PlanKind::FullScan => {
224                        m.ops.plan_full_scan = m.ops.plan_full_scan.saturating_add(1);
225                    }
226                });
227            }
228        }
229    }
230}
231
232pub const GLOBAL_METRICS_SINK: GlobalMetricsSink = GlobalMetricsSink;
233
234pub fn record(event: MetricsEvent) {
235    let override_ptr = SINK_OVERRIDE.with(|cell| *cell.borrow());
236    if let Some(ptr) = override_ptr {
237        // SAFETY: override is scoped by with_metrics_sink and only used synchronously.
238        unsafe { (&*ptr).record(event) };
239    } else {
240        GLOBAL_METRICS_SINK.record(event);
241    }
242}
243
244/// Snapshot the current metrics state for endpoint/test plumbing.
245#[must_use]
246pub fn metrics_report() -> metrics::EventReport {
247    metrics::report()
248}
249
250/// Reset ephemeral metrics counters.
251pub fn metrics_reset() {
252    metrics::reset();
253}
254
255/// Reset all metrics state (counters + perf).
256pub fn metrics_reset_all() {
257    metrics::reset_all();
258}
259
260/// Run a closure with a temporary metrics sink override.
261pub fn with_metrics_sink<T>(sink: &dyn MetricsSink, f: impl FnOnce() -> T) -> T {
262    struct Guard(Option<*const dyn MetricsSink>);
263
264    impl Drop for Guard {
265        fn drop(&mut self) {
266            SINK_OVERRIDE.with(|cell| {
267                *cell.borrow_mut() = self.0;
268            });
269        }
270    }
271
272    // SAFETY: we erase the reference lifetime for scoped storage in TLS and
273    // restore the previous value on scope exit via Guard.
274    let sink_ptr = unsafe { std::mem::transmute::<&dyn MetricsSink, *const dyn MetricsSink>(sink) };
275    let prev = SINK_OVERRIDE.with(|cell| {
276        let mut slot = cell.borrow_mut();
277        slot.replace(sink_ptr)
278    });
279    let _guard = Guard(prev);
280
281    f()
282}
283
284///
285/// Span
286/// RAII guard to simplify metrics instrumentation
287///
288
289pub(crate) struct Span<E: EntityKind> {
290    kind: ExecKind,
291    start: u64,
292    rows: u64,
293    finished: bool,
294    _marker: PhantomData<E>,
295}
296
297impl<E: EntityKind> Span<E> {
298    #[must_use]
299    /// Start a metrics span for a specific entity and executor kind.
300    pub(crate) fn new(kind: ExecKind) -> Self {
301        record(MetricsEvent::ExecStart {
302            kind,
303            entity_path: E::PATH,
304        });
305
306        Self {
307            kind,
308            start: performance_counter(1),
309            rows: 0,
310            finished: false,
311            _marker: PhantomData,
312        }
313    }
314
315    pub(crate) const fn set_rows(&mut self, rows: u64) {
316        self.rows = rows;
317    }
318
319    #[expect(dead_code)]
320    /// Increment the recorded row count.
321    pub(crate) const fn add_rows(&mut self, rows: u64) {
322        self.rows = self.rows.saturating_add(rows);
323    }
324
325    #[expect(dead_code)]
326    /// Finish the span early (also happens on Drop).
327    pub(crate) fn finish(mut self) {
328        if !self.finished {
329            self.finish_inner();
330            self.finished = true;
331        }
332    }
333
334    fn finish_inner(&self) {
335        let now = performance_counter(1);
336        let delta = now.saturating_sub(self.start);
337
338        record(MetricsEvent::ExecFinish {
339            kind: self.kind,
340            entity_path: E::PATH,
341            rows_touched: self.rows,
342            inst_delta: delta,
343        });
344    }
345}
346
347impl<E: EntityKind> Drop for Span<E> {
348    fn drop(&mut self) {
349        if !self.finished {
350            self.finish_inner();
351            self.finished = true;
352        }
353    }
354}