1use crate::{obs::metrics, traits::EntityKind};
9use std::{cell::RefCell, marker::PhantomData};
10
11thread_local! {
12 static SINK_OVERRIDE: RefCell<Option<*const dyn MetricsSink>> = RefCell::new(None);
13}
14
15#[derive(Clone, Copy, Debug)]
20pub enum ExecKind {
21 Load,
22 Save,
23 Delete,
24}
25
26#[derive(Clone, Copy, Debug)]
31pub enum PlanKind {
32 Keys,
33 Index,
34 Range,
35 FullScan,
36}
37
38#[derive(Clone, Copy, Debug)]
43pub enum MetricsEvent {
44 ExecStart {
45 kind: ExecKind,
46 entity_path: &'static str,
47 },
48 ExecFinish {
49 kind: ExecKind,
50 entity_path: &'static str,
51 rows_touched: u64,
52 inst_delta: u64,
53 },
54 RowsScanned {
55 entity_path: &'static str,
56 rows_scanned: u64,
57 },
58 UniqueViolation {
59 entity_path: &'static str,
60 },
61 IndexDelta {
62 entity_path: &'static str,
63 inserts: u64,
64 removes: u64,
65 },
66 ReverseIndexDelta {
67 entity_path: &'static str,
68 inserts: u64,
69 removes: u64,
70 },
71 RelationValidation {
72 entity_path: &'static str,
73 reverse_lookups: u64,
74 blocked_deletes: u64,
75 },
76 Plan {
77 kind: PlanKind,
78 },
79}
80
81pub trait MetricsSink {
86 fn record(&self, event: MetricsEvent);
87}
88
89pub struct NoopMetricsSink;
94
95impl MetricsSink for NoopMetricsSink {
96 fn record(&self, _: MetricsEvent) {}
97}
98
99pub struct GlobalMetricsSink;
104
105impl MetricsSink for GlobalMetricsSink {
106 #[expect(clippy::too_many_lines)]
107 fn record(&self, event: MetricsEvent) {
108 match event {
109 MetricsEvent::ExecStart { kind, entity_path } => {
110 metrics::with_state_mut(|m| {
111 match kind {
112 ExecKind::Load => m.ops.load_calls = m.ops.load_calls.saturating_add(1),
113 ExecKind::Save => m.ops.save_calls = m.ops.save_calls.saturating_add(1),
114 ExecKind::Delete => {
115 m.ops.delete_calls = m.ops.delete_calls.saturating_add(1);
116 }
117 }
118
119 let entry = m.entities.entry(entity_path.to_string()).or_default();
120 match kind {
121 ExecKind::Load => {
122 entry.load_calls = entry.load_calls.saturating_add(1);
123 }
124 ExecKind::Save => {
125 entry.save_calls = entry.save_calls.saturating_add(1);
126 }
127 ExecKind::Delete => {
128 entry.delete_calls = entry.delete_calls.saturating_add(1);
129 }
130 }
131 });
132 }
133
134 MetricsEvent::ExecFinish {
135 kind,
136 entity_path,
137 rows_touched,
138 inst_delta,
139 } => {
140 metrics::with_state_mut(|m| {
141 match kind {
142 ExecKind::Load => {
143 m.ops.rows_loaded = m.ops.rows_loaded.saturating_add(rows_touched);
144 metrics::add_instructions(
145 &mut m.perf.load_inst_total,
146 &mut m.perf.load_inst_max,
147 inst_delta,
148 );
149 }
150 ExecKind::Save => {
151 metrics::add_instructions(
152 &mut m.perf.save_inst_total,
153 &mut m.perf.save_inst_max,
154 inst_delta,
155 );
156 }
157 ExecKind::Delete => {
158 m.ops.rows_deleted = m.ops.rows_deleted.saturating_add(rows_touched);
159 metrics::add_instructions(
160 &mut m.perf.delete_inst_total,
161 &mut m.perf.delete_inst_max,
162 inst_delta,
163 );
164 }
165 }
166
167 let entry = m.entities.entry(entity_path.to_string()).or_default();
168 match kind {
169 ExecKind::Load => {
170 entry.rows_loaded = entry.rows_loaded.saturating_add(rows_touched);
171 }
172 ExecKind::Delete => {
173 entry.rows_deleted = entry.rows_deleted.saturating_add(rows_touched);
174 }
175 ExecKind::Save => {}
176 }
177 });
178 }
179
180 MetricsEvent::RowsScanned {
181 entity_path,
182 rows_scanned,
183 } => {
184 metrics::with_state_mut(|m| {
185 m.ops.rows_scanned = m.ops.rows_scanned.saturating_add(rows_scanned);
186 let entry = m.entities.entry(entity_path.to_string()).or_default();
187 entry.rows_scanned = entry.rows_scanned.saturating_add(rows_scanned);
188 });
189 }
190
191 MetricsEvent::UniqueViolation { entity_path } => {
192 metrics::with_state_mut(|m| {
193 m.ops.unique_violations = m.ops.unique_violations.saturating_add(1);
194 let entry = m.entities.entry(entity_path.to_string()).or_default();
195 entry.unique_violations = entry.unique_violations.saturating_add(1);
196 });
197 }
198
199 MetricsEvent::IndexDelta {
200 entity_path,
201 inserts,
202 removes,
203 } => {
204 metrics::with_state_mut(|m| {
205 m.ops.index_inserts = m.ops.index_inserts.saturating_add(inserts);
206 m.ops.index_removes = m.ops.index_removes.saturating_add(removes);
207 let entry = m.entities.entry(entity_path.to_string()).or_default();
208 entry.index_inserts = entry.index_inserts.saturating_add(inserts);
209 entry.index_removes = entry.index_removes.saturating_add(removes);
210 });
211 }
212
213 MetricsEvent::ReverseIndexDelta {
214 entity_path,
215 inserts,
216 removes,
217 } => {
218 metrics::with_state_mut(|m| {
219 m.ops.reverse_index_inserts =
220 m.ops.reverse_index_inserts.saturating_add(inserts);
221 m.ops.reverse_index_removes =
222 m.ops.reverse_index_removes.saturating_add(removes);
223 let entry = m.entities.entry(entity_path.to_string()).or_default();
224 entry.reverse_index_inserts =
225 entry.reverse_index_inserts.saturating_add(inserts);
226 entry.reverse_index_removes =
227 entry.reverse_index_removes.saturating_add(removes);
228 });
229 }
230
231 MetricsEvent::RelationValidation {
232 entity_path,
233 reverse_lookups,
234 blocked_deletes,
235 } => {
236 metrics::with_state_mut(|m| {
237 m.ops.relation_reverse_lookups = m
238 .ops
239 .relation_reverse_lookups
240 .saturating_add(reverse_lookups);
241 m.ops.relation_delete_blocks =
242 m.ops.relation_delete_blocks.saturating_add(blocked_deletes);
243
244 let entry = m.entities.entry(entity_path.to_string()).or_default();
245 entry.relation_reverse_lookups = entry
246 .relation_reverse_lookups
247 .saturating_add(reverse_lookups);
248 entry.relation_delete_blocks =
249 entry.relation_delete_blocks.saturating_add(blocked_deletes);
250 });
251 }
252
253 MetricsEvent::Plan { kind } => {
254 metrics::with_state_mut(|m| match kind {
255 PlanKind::Keys => m.ops.plan_keys = m.ops.plan_keys.saturating_add(1),
256 PlanKind::Index => m.ops.plan_index = m.ops.plan_index.saturating_add(1),
257 PlanKind::Range => m.ops.plan_range = m.ops.plan_range.saturating_add(1),
258 PlanKind::FullScan => {
259 m.ops.plan_full_scan = m.ops.plan_full_scan.saturating_add(1);
260 }
261 });
262 }
263 }
264 }
265}
266
267pub const GLOBAL_METRICS_SINK: GlobalMetricsSink = GlobalMetricsSink;
268
269pub fn record(event: MetricsEvent) {
270 let override_ptr = SINK_OVERRIDE.with(|cell| *cell.borrow());
271 if let Some(ptr) = override_ptr {
272 unsafe { (&*ptr).record(event) };
290 } else {
291 GLOBAL_METRICS_SINK.record(event);
292 }
293}
294
295#[must_use]
300pub fn metrics_report(window_start_ms: Option<u64>) -> metrics::EventReport {
301 metrics::report_window_start(window_start_ms)
302}
303
304pub fn metrics_reset() {
306 metrics::reset();
307}
308
309pub fn metrics_reset_all() {
311 metrics::reset_all();
312}
313
314pub fn with_metrics_sink<T>(sink: &dyn MetricsSink, f: impl FnOnce() -> T) -> T {
316 struct Guard(Option<*const dyn MetricsSink>);
317
318 impl Drop for Guard {
319 fn drop(&mut self) {
320 SINK_OVERRIDE.with(|cell| {
321 *cell.borrow_mut() = self.0;
322 });
323 }
324 }
325
326 let sink_ptr = unsafe { std::mem::transmute::<&dyn MetricsSink, *const dyn MetricsSink>(sink) };
340 let prev = SINK_OVERRIDE.with(|cell| {
341 let mut slot = cell.borrow_mut();
342 slot.replace(sink_ptr)
343 });
344 let _guard = Guard(prev);
345
346 f()
347}
348
349pub(crate) struct Span<E: EntityKind> {
355 kind: ExecKind,
356 start: u64,
357 rows: u64,
358 finished: bool,
359 _marker: PhantomData<E>,
360}
361
362#[allow(clippy::missing_const_for_fn)]
363fn read_perf_counter() -> u64 {
364 #[cfg(target_arch = "wasm32")]
365 {
366 canic_cdk::api::performance_counter(1)
367 }
368 #[cfg(not(target_arch = "wasm32"))]
369 {
370 0
371 }
372}
373
374impl<E: EntityKind> Span<E> {
375 #[must_use]
376 pub(crate) fn new(kind: ExecKind) -> Self {
378 record(MetricsEvent::ExecStart {
379 kind,
380 entity_path: E::PATH,
381 });
382
383 Self {
384 kind,
385 start: read_perf_counter(),
386 rows: 0,
387 finished: false,
388 _marker: PhantomData,
389 }
390 }
391
392 pub(crate) const fn set_rows(&mut self, rows: u64) {
393 self.rows = rows;
394 }
395
396 fn finish_inner(&self) {
397 let now = read_perf_counter();
398 let delta = now.saturating_sub(self.start);
399
400 record(MetricsEvent::ExecFinish {
401 kind: self.kind,
402 entity_path: E::PATH,
403 rows_touched: self.rows,
404 inst_delta: delta,
405 });
406 }
407}
408
409impl<E: EntityKind> Drop for Span<E> {
410 fn drop(&mut self) {
411 if !self.finished {
412 self.finish_inner();
413 self.finished = true;
414 }
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use std::panic::{AssertUnwindSafe, catch_unwind};
422 use std::sync::atomic::{AtomicUsize, Ordering};
423
424 struct CountingSink<'a> {
425 calls: &'a AtomicUsize,
426 }
427
428 impl MetricsSink for CountingSink<'_> {
429 fn record(&self, _: MetricsEvent) {
430 self.calls.fetch_add(1, Ordering::SeqCst);
431 }
432 }
433
434 #[test]
435 fn with_metrics_sink_routes_and_restores_nested_overrides() {
436 SINK_OVERRIDE.with(|cell| {
437 *cell.borrow_mut() = None;
438 });
439
440 let outer_calls = AtomicUsize::new(0);
441 let inner_calls = AtomicUsize::new(0);
442 let outer = CountingSink {
443 calls: &outer_calls,
444 };
445 let inner = CountingSink {
446 calls: &inner_calls,
447 };
448
449 record(MetricsEvent::Plan {
451 kind: PlanKind::Keys,
452 });
453 assert_eq!(outer_calls.load(Ordering::SeqCst), 0);
454 assert_eq!(inner_calls.load(Ordering::SeqCst), 0);
455
456 with_metrics_sink(&outer, || {
457 record(MetricsEvent::Plan {
458 kind: PlanKind::Index,
459 });
460 assert_eq!(outer_calls.load(Ordering::SeqCst), 1);
461 assert_eq!(inner_calls.load(Ordering::SeqCst), 0);
462
463 with_metrics_sink(&inner, || {
464 record(MetricsEvent::Plan {
465 kind: PlanKind::Range,
466 });
467 });
468
469 record(MetricsEvent::Plan {
471 kind: PlanKind::FullScan,
472 });
473 });
474
475 assert_eq!(outer_calls.load(Ordering::SeqCst), 2);
476 assert_eq!(inner_calls.load(Ordering::SeqCst), 1);
477
478 SINK_OVERRIDE.with(|cell| {
480 assert!(cell.borrow().is_none());
481 });
482
483 record(MetricsEvent::Plan {
484 kind: PlanKind::Keys,
485 });
486 assert_eq!(outer_calls.load(Ordering::SeqCst), 2);
487 assert_eq!(inner_calls.load(Ordering::SeqCst), 1);
488 }
489
490 #[test]
491 fn with_metrics_sink_restores_override_on_panic() {
492 SINK_OVERRIDE.with(|cell| {
493 *cell.borrow_mut() = None;
494 });
495
496 let calls = AtomicUsize::new(0);
497 let sink = CountingSink { calls: &calls };
498
499 let panicked = catch_unwind(AssertUnwindSafe(|| {
500 with_metrics_sink(&sink, || {
501 record(MetricsEvent::Plan {
502 kind: PlanKind::Index,
503 });
504 panic!("intentional panic for guard test");
505 });
506 }))
507 .is_err();
508 assert!(panicked);
509 assert_eq!(calls.load(Ordering::SeqCst), 1);
510
511 SINK_OVERRIDE.with(|cell| {
513 assert!(cell.borrow().is_none());
514 });
515
516 record(MetricsEvent::Plan {
517 kind: PlanKind::Range,
518 });
519 assert_eq!(calls.load(Ordering::SeqCst), 1);
520 }
521
522 #[test]
523 fn metrics_report_without_window_start_returns_counters() {
524 metrics_reset_all();
525 record(MetricsEvent::Plan {
526 kind: PlanKind::Index,
527 });
528
529 let report = metrics_report(None);
530 let counters = report
531 .counters
532 .expect("metrics report should include counters without since filter");
533 assert_eq!(counters.ops.plan_index, 1);
534 }
535
536 #[test]
537 fn metrics_report_window_start_before_window_returns_counters() {
538 metrics_reset_all();
539 let window_start = metrics::with_state(|m| m.window_start_ms);
540 record(MetricsEvent::Plan {
541 kind: PlanKind::Keys,
542 });
543
544 let report = metrics_report(Some(window_start.saturating_sub(1)));
545 let counters = report
546 .counters
547 .expect("metrics report should include counters when window_start_ms is before window");
548 assert_eq!(counters.ops.plan_keys, 1);
549 }
550
551 #[test]
552 fn metrics_report_window_start_after_window_returns_empty() {
553 metrics_reset_all();
554 let window_start = metrics::with_state(|m| m.window_start_ms);
555 record(MetricsEvent::Plan {
556 kind: PlanKind::FullScan,
557 });
558
559 let report = metrics_report(Some(window_start.saturating_add(1)));
560 assert!(report.counters.is_none());
561 assert!(report.entity_counters.is_empty());
562 }
563
564 #[test]
565 fn reverse_and_relation_metrics_events_accumulate() {
566 metrics_reset_all();
567
568 record(MetricsEvent::ReverseIndexDelta {
569 entity_path: "obs::tests::Entity",
570 inserts: 3,
571 removes: 2,
572 });
573 record(MetricsEvent::RelationValidation {
574 entity_path: "obs::tests::Entity",
575 reverse_lookups: 5,
576 blocked_deletes: 1,
577 });
578
579 let counters = metrics_report(None)
580 .counters
581 .expect("metrics report should include counters");
582 assert_eq!(counters.ops.reverse_index_inserts, 3);
583 assert_eq!(counters.ops.reverse_index_removes, 2);
584 assert_eq!(counters.ops.relation_reverse_lookups, 5);
585 assert_eq!(counters.ops.relation_delete_blocks, 1);
586
587 let entity = counters
588 .entities
589 .get("obs::tests::Entity")
590 .expect("entity counters should be present");
591 assert_eq!(entity.reverse_index_inserts, 3);
592 assert_eq!(entity.reverse_index_removes, 2);
593 assert_eq!(entity.relation_reverse_lookups, 5);
594 assert_eq!(entity.relation_delete_blocks, 1);
595 }
596}