Skip to main content

fsqlite_observability/
lib.rs

1//! MVCC conflict analytics and observability infrastructure.
2//!
3//! Provides shared types and utilities for conflict tracing, metrics
4//! aggregation, and diagnostic logging across the FrankenSQLite MVCC layer.
5//!
6//! # Design Principles
7//!
8//! - **Zero-cost when unused:** All observation is opt-in via the
9//!   [`ConflictObserver`] trait. When no observer is registered, conflict
10//!   emission compiles to nothing (the default [`NoOpObserver`] is inlined).
11//! - **Non-blocking:** Observers MUST NOT acquire page locks or block writers.
12//!   Conflict tracing is purely diagnostic.
13//! - **Shared foundation:** Types defined here are reused by downstream
14//!   observability beads (bd-t6sv2.2, .3, .5, .6, .8, .12).
15
16use fsqlite_types::sync_primitives::{Duration, Instant, Mutex};
17use fsqlite_types::{CommitSeq, PageNumber, TxnId, TxnToken};
18use serde::Serialize;
19use std::collections::{HashMap, VecDeque};
20use std::sync::LazyLock;
21use std::sync::atomic::{AtomicU64, Ordering};
22
23// ---------------------------------------------------------------------------
24// Structured trace metrics (bd-19u.1)
25// ---------------------------------------------------------------------------
26
27static FSQLITE_TRACE_SPANS_TOTAL: AtomicU64 = AtomicU64::new(0);
28static FSQLITE_TRACE_EXPORT_ERRORS_TOTAL: AtomicU64 = AtomicU64::new(0);
29static FSQLITE_COMPAT_TRACE_CALLBACKS_TOTAL: AtomicU64 = AtomicU64::new(0);
30static TRACE_ID_SEQUENCE: AtomicU64 = AtomicU64::new(1);
31static DECISION_ID_SEQUENCE: AtomicU64 = AtomicU64::new(1);
32
33/// Snapshot of structured tracing counters.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
35pub struct TraceMetricsSnapshot {
36    pub fsqlite_trace_spans_total: u64,
37    pub fsqlite_trace_export_errors_total: u64,
38    pub fsqlite_compat_trace_callbacks_total: u64,
39}
40
41/// Allocate the next trace identifier.
42#[must_use]
43pub fn next_trace_id() -> u64 {
44    TRACE_ID_SEQUENCE.fetch_add(1, Ordering::Relaxed)
45}
46
47/// Allocate the next decision identifier.
48#[must_use]
49pub fn next_decision_id() -> u64 {
50    DECISION_ID_SEQUENCE.fetch_add(1, Ordering::Relaxed)
51}
52
53/// Record creation of a tracing span in the SQL pipeline.
54pub fn record_trace_span_created() {
55    FSQLITE_TRACE_SPANS_TOTAL.fetch_add(1, Ordering::Relaxed);
56}
57
58/// Record an export batch for tracing spans.
59pub fn record_trace_export(spans_exported: u64, export_latency_us: u64) {
60    if !tracing::enabled!(target: "fsqlite.trace_export", tracing::Level::DEBUG) {
61        return;
62    }
63    let span = tracing::span!(
64        target: "fsqlite.trace_export",
65        tracing::Level::DEBUG,
66        "trace_export",
67        spans_exported,
68        export_latency_us
69    );
70    let _guard = span.enter();
71    tracing::debug!("trace export observed");
72}
73
74/// Record a failed span-export attempt.
75pub fn record_trace_export_error() {
76    FSQLITE_TRACE_EXPORT_ERRORS_TOTAL.fetch_add(1, Ordering::Relaxed);
77}
78
79/// Record a sqlite3_trace_v2 compatibility callback invocation.
80pub fn record_compat_trace_callback() {
81    FSQLITE_COMPAT_TRACE_CALLBACKS_TOTAL.fetch_add(1, Ordering::Relaxed);
82}
83
84/// Read a point-in-time snapshot of trace counters.
85#[must_use]
86pub fn trace_metrics_snapshot() -> TraceMetricsSnapshot {
87    TraceMetricsSnapshot {
88        fsqlite_trace_spans_total: FSQLITE_TRACE_SPANS_TOTAL.load(Ordering::Relaxed),
89        fsqlite_trace_export_errors_total: FSQLITE_TRACE_EXPORT_ERRORS_TOTAL
90            .load(Ordering::Relaxed),
91        fsqlite_compat_trace_callbacks_total: FSQLITE_COMPAT_TRACE_CALLBACKS_TOTAL
92            .load(Ordering::Relaxed),
93    }
94}
95
96/// Reset trace counters to zero (tests/diagnostics).
97pub fn reset_trace_metrics() {
98    FSQLITE_TRACE_SPANS_TOTAL.store(0, Ordering::Relaxed);
99    FSQLITE_TRACE_EXPORT_ERRORS_TOTAL.store(0, Ordering::Relaxed);
100    FSQLITE_COMPAT_TRACE_CALLBACKS_TOTAL.store(0, Ordering::Relaxed);
101}
102
103// ---------------------------------------------------------------------------
104// io_uring latency telemetry + conformal bound signal (bd-al1)
105// ---------------------------------------------------------------------------
106
107const IO_URING_LATENCY_WINDOW_CAPACITY: usize = 1024;
108const P99_NUMERATOR: usize = 99;
109const P99_DENOMINATOR: usize = 100;
110
111/// Global io_uring latency metrics singleton.
112pub static GLOBAL_IO_URING_LATENCY_METRICS: LazyLock<IoUringLatencyMetrics> =
113    LazyLock::new(|| IoUringLatencyMetrics::new(IO_URING_LATENCY_WINDOW_CAPACITY));
114
115#[derive(Debug, Clone, Serialize)]
116pub struct IoUringLatencySnapshot {
117    pub read_samples_total: u64,
118    pub write_samples_total: u64,
119    pub unix_fallbacks_total: u64,
120    pub read_tail_violations_total: u64,
121    pub write_tail_violations_total: u64,
122    pub window_capacity: usize,
123    pub read_window_len: usize,
124    pub write_window_len: usize,
125    pub read_p99_latency_us: u64,
126    pub write_p99_latency_us: u64,
127    pub read_conformal_upper_bound_us: u64,
128    pub write_conformal_upper_bound_us: u64,
129}
130
131#[derive(Default)]
132struct IoLatencySeries {
133    latencies_ns: VecDeque<u64>,
134    nonconformity_ns: VecDeque<u64>,
135}
136
137impl IoLatencySeries {
138    fn push(&mut self, sample_ns: u64, sample_capacity: usize) {
139        let baseline = self.p99_latency_ns().unwrap_or(sample_ns);
140        let score = sample_ns.saturating_sub(baseline);
141        push_bounded(&mut self.latencies_ns, sample_ns, sample_capacity);
142        push_bounded(&mut self.nonconformity_ns, score, sample_capacity);
143    }
144
145    fn p99_latency_ns(&self) -> Option<u64> {
146        quantile_from_deque(&self.latencies_ns, P99_NUMERATOR, P99_DENOMINATOR)
147    }
148
149    fn conformal_upper_bound_ns(&self) -> Option<u64> {
150        let baseline = self.p99_latency_ns()?;
151        let q = conformal_quantile(&self.nonconformity_ns)?;
152        Some(baseline.saturating_add(q))
153    }
154
155    fn reset(&mut self) {
156        self.latencies_ns.clear();
157        self.nonconformity_ns.clear();
158    }
159}
160
161#[derive(Default)]
162struct IoUringLatencyWindow {
163    read: IoLatencySeries,
164    write: IoLatencySeries,
165}
166
167pub struct IoUringLatencyMetrics {
168    pub read_samples_total: AtomicU64,
169    pub write_samples_total: AtomicU64,
170    pub unix_fallbacks_total: AtomicU64,
171    pub read_tail_violations_total: AtomicU64,
172    pub write_tail_violations_total: AtomicU64,
173    sample_capacity: usize,
174    window: Mutex<IoUringLatencyWindow>,
175}
176
177impl IoUringLatencyMetrics {
178    #[must_use]
179    pub const fn new(sample_capacity: usize) -> Self {
180        Self {
181            read_samples_total: AtomicU64::new(0),
182            write_samples_total: AtomicU64::new(0),
183            unix_fallbacks_total: AtomicU64::new(0),
184            read_tail_violations_total: AtomicU64::new(0),
185            write_tail_violations_total: AtomicU64::new(0),
186            sample_capacity,
187            window: Mutex::new(IoUringLatencyWindow {
188                read: IoLatencySeries {
189                    latencies_ns: VecDeque::new(),
190                    nonconformity_ns: VecDeque::new(),
191                },
192                write: IoLatencySeries {
193                    latencies_ns: VecDeque::new(),
194                    nonconformity_ns: VecDeque::new(),
195                },
196            }),
197        }
198    }
199
200    pub fn record_read_latency(&self, latency: Duration) -> bool {
201        self.read_samples_total.fetch_add(1, Ordering::Relaxed);
202        let sample_ns = duration_to_nanos_saturated(latency);
203        let mut window = self.window.lock();
204        let prior_bound = window.read.conformal_upper_bound_ns();
205        window.read.push(sample_ns, self.sample_capacity);
206        let is_tail_violation = prior_bound.is_some_and(|bound| sample_ns > bound);
207        if is_tail_violation {
208            self.read_tail_violations_total
209                .fetch_add(1, Ordering::Relaxed);
210        }
211        is_tail_violation
212    }
213
214    pub fn record_write_latency(&self, latency: Duration) -> bool {
215        self.write_samples_total.fetch_add(1, Ordering::Relaxed);
216        let sample_ns = duration_to_nanos_saturated(latency);
217        let mut window = self.window.lock();
218        let prior_bound = window.write.conformal_upper_bound_ns();
219        window.write.push(sample_ns, self.sample_capacity);
220        let is_tail_violation = prior_bound.is_some_and(|bound| sample_ns > bound);
221        if is_tail_violation {
222            self.write_tail_violations_total
223                .fetch_add(1, Ordering::Relaxed);
224        }
225        is_tail_violation
226    }
227
228    pub fn record_unix_fallback(&self) {
229        self.unix_fallbacks_total.fetch_add(1, Ordering::Relaxed);
230    }
231
232    #[must_use]
233    pub fn snapshot(&self) -> IoUringLatencySnapshot {
234        let window = self.window.lock();
235
236        IoUringLatencySnapshot {
237            read_samples_total: self.read_samples_total.load(Ordering::Relaxed),
238            write_samples_total: self.write_samples_total.load(Ordering::Relaxed),
239            unix_fallbacks_total: self.unix_fallbacks_total.load(Ordering::Relaxed),
240            read_tail_violations_total: self.read_tail_violations_total.load(Ordering::Relaxed),
241            write_tail_violations_total: self.write_tail_violations_total.load(Ordering::Relaxed),
242            window_capacity: self.sample_capacity,
243            read_window_len: window.read.latencies_ns.len(),
244            write_window_len: window.write.latencies_ns.len(),
245            read_p99_latency_us: nanos_to_micros(window.read.p99_latency_ns().unwrap_or(0)),
246            write_p99_latency_us: nanos_to_micros(window.write.p99_latency_ns().unwrap_or(0)),
247            read_conformal_upper_bound_us: nanos_to_micros(
248                window.read.conformal_upper_bound_ns().unwrap_or(0),
249            ),
250            write_conformal_upper_bound_us: nanos_to_micros(
251                window.write.conformal_upper_bound_ns().unwrap_or(0),
252            ),
253        }
254    }
255
256    pub fn reset(&self) {
257        self.read_samples_total.store(0, Ordering::Relaxed);
258        self.write_samples_total.store(0, Ordering::Relaxed);
259        self.unix_fallbacks_total.store(0, Ordering::Relaxed);
260        self.read_tail_violations_total.store(0, Ordering::Relaxed);
261        self.write_tail_violations_total.store(0, Ordering::Relaxed);
262        let mut window = self.window.lock();
263        window.read.reset();
264        window.write.reset();
265    }
266}
267
268impl Default for IoUringLatencyMetrics {
269    fn default() -> Self {
270        Self::new(IO_URING_LATENCY_WINDOW_CAPACITY)
271    }
272}
273
274pub fn record_io_uring_read_latency(latency: Duration) -> bool {
275    GLOBAL_IO_URING_LATENCY_METRICS.record_read_latency(latency)
276}
277
278pub fn record_io_uring_write_latency(latency: Duration) -> bool {
279    GLOBAL_IO_URING_LATENCY_METRICS.record_write_latency(latency)
280}
281
282pub fn record_io_uring_unix_fallback() {
283    GLOBAL_IO_URING_LATENCY_METRICS.record_unix_fallback();
284}
285
286#[must_use]
287pub fn io_uring_latency_snapshot() -> IoUringLatencySnapshot {
288    GLOBAL_IO_URING_LATENCY_METRICS.snapshot()
289}
290
291pub fn reset_io_uring_latency_metrics() {
292    GLOBAL_IO_URING_LATENCY_METRICS.reset();
293}
294
295fn push_bounded(buffer: &mut VecDeque<u64>, value: u64, sample_capacity: usize) {
296    if sample_capacity == 0 {
297        return;
298    }
299    if buffer.len() == sample_capacity {
300        let _ = buffer.pop_front();
301    }
302    buffer.push_back(value);
303}
304
305fn quantile_from_deque(
306    values: &VecDeque<u64>,
307    numerator: usize,
308    denominator: usize,
309) -> Option<u64> {
310    if values.is_empty() || denominator == 0 {
311        return None;
312    }
313
314    let mut sorted: Vec<u64> = values.iter().copied().collect();
315    sorted.sort_unstable();
316
317    let n = sorted.len();
318    let rank = numerator
319        .saturating_mul(n)
320        .div_ceil(denominator)
321        .saturating_sub(1)
322        .min(n.saturating_sub(1));
323    sorted.get(rank).copied()
324}
325
326fn conformal_quantile(nonconformity: &VecDeque<u64>) -> Option<u64> {
327    if nonconformity.is_empty() {
328        return None;
329    }
330
331    let mut sorted: Vec<u64> = nonconformity.iter().copied().collect();
332    sorted.sort_unstable();
333
334    let n = sorted.len();
335    let rank = P99_NUMERATOR
336        .saturating_mul(n.saturating_add(1))
337        .div_ceil(P99_DENOMINATOR)
338        .saturating_sub(1)
339        .min(n.saturating_sub(1));
340    sorted.get(rank).copied()
341}
342
343fn nanos_to_micros(nanos: u64) -> u64 {
344    nanos / 1_000
345}
346
347fn duration_to_nanos_saturated(duration: Duration) -> u64 {
348    duration.as_nanos().min(u128::from(u64::MAX)) as u64
349}
350
351// ---------------------------------------------------------------------------
352// Cx propagation telemetry (bd-2g5.6.1)
353// ---------------------------------------------------------------------------
354
355/// Global Cx propagation metrics singleton.
356///
357/// Tracks how well the capability context (`Cx`) is threaded through
358/// connection and transaction paths. Incremented on the code paths that
359/// detect missing or invalid propagation, as well as on successful
360/// propagation checkpoints, cancellation cleanup outcomes, and trace
361/// linkage establishments.
362pub static GLOBAL_CX_PROPAGATION_METRICS: CxPropagationMetrics = CxPropagationMetrics::new();
363
364/// Atomic counters for Cx propagation telemetry.
365///
366/// Counters follow the same lock-free `Relaxed` ordering convention as
367/// the rest of the observability crate — callers may observe stale reads
368/// but never torn values.
369pub struct CxPropagationMetrics {
370    /// Number of successful Cx propagation checkpoints.
371    pub propagation_successes_total: AtomicU64,
372    /// Number of detected missing or invalid Cx propagation.
373    pub propagation_failures_total: AtomicU64,
374    /// Number of cancellation cleanup operations completed.
375    pub cancellation_cleanups_total: AtomicU64,
376    /// Number of trace-ID linkage establishments (Cx → span).
377    pub trace_linkages_total: AtomicU64,
378    /// Number of Cx instances created for transaction scopes.
379    pub cx_created_total: AtomicU64,
380    /// Number of Cx cancel propagations observed.
381    pub cancel_propagations_total: AtomicU64,
382}
383
384impl Default for CxPropagationMetrics {
385    fn default() -> Self {
386        Self::new()
387    }
388}
389
390impl CxPropagationMetrics {
391    /// Create a new metrics instance with all counters at zero.
392    #[must_use]
393    pub const fn new() -> Self {
394        Self {
395            propagation_successes_total: AtomicU64::new(0),
396            propagation_failures_total: AtomicU64::new(0),
397            cancellation_cleanups_total: AtomicU64::new(0),
398            trace_linkages_total: AtomicU64::new(0),
399            cx_created_total: AtomicU64::new(0),
400            cancel_propagations_total: AtomicU64::new(0),
401        }
402    }
403
404    /// Record a successful Cx propagation checkpoint.
405    pub fn record_propagation_success(&self) {
406        self.propagation_successes_total
407            .fetch_add(1, Ordering::Relaxed);
408    }
409
410    /// Record a missing or invalid Cx propagation and emit a WARN diagnostic.
411    pub fn record_propagation_failure(&self, site: &str) {
412        self.propagation_failures_total
413            .fetch_add(1, Ordering::Relaxed);
414        tracing::warn!(
415            target: "fsqlite.cx_propagation",
416            site,
417            "missing or invalid Cx propagation detected"
418        );
419    }
420
421    /// Record a cancellation cleanup completion.
422    pub fn record_cancellation_cleanup(&self) {
423        self.cancellation_cleanups_total
424            .fetch_add(1, Ordering::Relaxed);
425    }
426
427    /// Record a trace-ID linkage establishment.
428    pub fn record_trace_linkage(&self) {
429        self.trace_linkages_total.fetch_add(1, Ordering::Relaxed);
430    }
431
432    /// Record creation of a Cx for a transaction scope.
433    pub fn record_cx_created(&self) {
434        self.cx_created_total.fetch_add(1, Ordering::Relaxed);
435    }
436
437    /// Record a cancel propagation event.
438    pub fn record_cancel_propagation(&self) {
439        self.cancel_propagations_total
440            .fetch_add(1, Ordering::Relaxed);
441    }
442
443    /// Read a point-in-time snapshot.
444    #[must_use]
445    pub fn snapshot(&self) -> CxPropagationMetricsSnapshot {
446        CxPropagationMetricsSnapshot {
447            propagation_successes_total: self.propagation_successes_total.load(Ordering::Relaxed),
448            propagation_failures_total: self.propagation_failures_total.load(Ordering::Relaxed),
449            cancellation_cleanups_total: self.cancellation_cleanups_total.load(Ordering::Relaxed),
450            trace_linkages_total: self.trace_linkages_total.load(Ordering::Relaxed),
451            cx_created_total: self.cx_created_total.load(Ordering::Relaxed),
452            cancel_propagations_total: self.cancel_propagations_total.load(Ordering::Relaxed),
453        }
454    }
455
456    /// Reset all counters to zero (tests/diagnostics).
457    pub fn reset(&self) {
458        self.propagation_successes_total.store(0, Ordering::Relaxed);
459        self.propagation_failures_total.store(0, Ordering::Relaxed);
460        self.cancellation_cleanups_total.store(0, Ordering::Relaxed);
461        self.trace_linkages_total.store(0, Ordering::Relaxed);
462        self.cx_created_total.store(0, Ordering::Relaxed);
463        self.cancel_propagations_total.store(0, Ordering::Relaxed);
464    }
465}
466
467/// Serializable snapshot of Cx propagation metrics.
468#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
469pub struct CxPropagationMetricsSnapshot {
470    pub propagation_successes_total: u64,
471    pub propagation_failures_total: u64,
472    pub cancellation_cleanups_total: u64,
473    pub trace_linkages_total: u64,
474    pub cx_created_total: u64,
475    pub cancel_propagations_total: u64,
476}
477
478impl CxPropagationMetricsSnapshot {
479    /// Propagation failure ratio (failures / total attempts). Returns 0.0
480    /// when no attempts have been recorded.
481    #[must_use]
482    #[allow(clippy::cast_precision_loss)]
483    pub fn failure_ratio(&self) -> f64 {
484        let total = self.propagation_successes_total + self.propagation_failures_total;
485        if total == 0 {
486            return 0.0;
487        }
488        self.propagation_failures_total as f64 / total as f64
489    }
490}
491
492impl std::fmt::Display for CxPropagationMetricsSnapshot {
493    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
494        write!(
495            f,
496            "cx_propagation(ok={} fail={} cancel_cleanup={} trace_link={} cx_new={} cancel_prop={} fail_ratio={:.4})",
497            self.propagation_successes_total,
498            self.propagation_failures_total,
499            self.cancellation_cleanups_total,
500            self.trace_linkages_total,
501            self.cx_created_total,
502            self.cancel_propagations_total,
503            self.failure_ratio(),
504        )
505    }
506}
507
508// ---------------------------------------------------------------------------
509// TxnSlot crash/occupancy telemetry (bd-2g5.1)
510// ---------------------------------------------------------------------------
511
512/// Sentinel slot-id value used when the caller cannot map a concrete index.
513const UNKNOWN_SLOT_ID: usize = usize::MAX;
514
515/// Global TxnSlot observability metrics singleton.
516///
517/// Tracks active TxnSlot occupancy (`fsqlite_txn_slots_active`) and crash
518/// detections (`fsqlite_txn_slot_crashes_detected_total`).
519pub static GLOBAL_TXN_SLOT_METRICS: TxnSlotMetrics = TxnSlotMetrics::new();
520
521/// Atomic counters for TxnSlot lifecycle telemetry.
522///
523/// Counters follow the same lock-free `Relaxed` ordering convention used by the
524/// rest of this crate.
525pub struct TxnSlotMetrics {
526    /// Gauge: number of currently active (published) transaction slots.
527    pub fsqlite_txn_slots_active: AtomicU64,
528    /// Counter: number of detected orphan/crashed transaction slots.
529    pub fsqlite_txn_slot_crashes_detected_total: AtomicU64,
530}
531
532impl Default for TxnSlotMetrics {
533    fn default() -> Self {
534        Self::new()
535    }
536}
537
538impl TxnSlotMetrics {
539    /// Create a new metrics instance with all counters at zero.
540    #[must_use]
541    pub const fn new() -> Self {
542        Self {
543            fsqlite_txn_slots_active: AtomicU64::new(0),
544            fsqlite_txn_slot_crashes_detected_total: AtomicU64::new(0),
545        }
546    }
547
548    #[inline]
549    const fn normalize_slot_id(slot_id: Option<usize>) -> usize {
550        match slot_id {
551            Some(value) => value,
552            None => UNKNOWN_SLOT_ID,
553        }
554    }
555
556    fn log_context_from_env() -> (String, u64, String) {
557        let run_id = std::env::var("RUN_ID").unwrap_or_else(|_| "(none)".to_owned());
558        let trace_id = std::env::var("TRACE_ID")
559            .ok()
560            .and_then(|value| value.parse::<u64>().ok())
561            .unwrap_or(0);
562        let scenario_id = std::env::var("SCENARIO_ID").unwrap_or_else(|_| "(none)".to_owned());
563        (run_id, trace_id, scenario_id)
564    }
565
566    fn decrement_active_slots_saturating(&self) -> u64 {
567        loop {
568            let prev = self.fsqlite_txn_slots_active.load(Ordering::Relaxed);
569            let next = prev.saturating_sub(1);
570            if self
571                .fsqlite_txn_slots_active
572                .compare_exchange_weak(prev, next, Ordering::Relaxed, Ordering::Relaxed)
573                .is_ok()
574            {
575                return next;
576            }
577        }
578    }
579
580    /// Record a successful slot allocation/publish.
581    pub fn record_slot_allocated(&self, slot_id: usize, process_id: u32) {
582        let started_at = Instant::now();
583        let active_after = self
584            .fsqlite_txn_slots_active
585            .fetch_add(1, Ordering::Relaxed)
586            .saturating_add(1);
587        let operation_elapsed_us = started_at.elapsed().as_micros().max(1);
588        let (run_id, trace_id, scenario_id) = Self::log_context_from_env();
589        let span = tracing::span!(
590            target: "fsqlite.txn_slot",
591            tracing::Level::INFO,
592            "txn_slot",
593            slot_id,
594            process_id,
595            run_id = %run_id.as_str(),
596            trace_id,
597            scenario_id = %scenario_id.as_str(),
598            operation = "alloc"
599        );
600        let _guard = span.enter();
601        tracing::info!(
602            fsqlite_txn_slots_active = active_after,
603            operation_elapsed_us,
604            run_id = %run_id.as_str(),
605            trace_id,
606            scenario_id = %scenario_id.as_str(),
607            failure_context = "none",
608            "transaction slot allocated"
609        );
610    }
611
612    /// Record a slot release/free operation.
613    pub fn record_slot_released(&self, slot_id: Option<usize>, process_id: u32) {
614        let started_at = Instant::now();
615        let active_after = self.decrement_active_slots_saturating();
616        let slot_id = Self::normalize_slot_id(slot_id);
617        let operation_elapsed_us = started_at.elapsed().as_micros().max(1);
618        let (run_id, trace_id, scenario_id) = Self::log_context_from_env();
619        let span = tracing::span!(
620            target: "fsqlite.txn_slot",
621            tracing::Level::INFO,
622            "txn_slot",
623            slot_id,
624            process_id,
625            run_id = %run_id.as_str(),
626            trace_id,
627            scenario_id = %scenario_id.as_str(),
628            operation = "release"
629        );
630        let _guard = span.enter();
631        tracing::info!(
632            fsqlite_txn_slots_active = active_after,
633            operation_elapsed_us,
634            run_id = %run_id.as_str(),
635            trace_id,
636            scenario_id = %scenario_id.as_str(),
637            failure_context = "none",
638            "transaction slot released"
639        );
640    }
641
642    /// Record detection/reclamation of a crashed/orphaned slot.
643    pub fn record_crash_detected(
644        &self,
645        slot_id: Option<usize>,
646        process_id: u32,
647        orphan_txn_id: u64,
648    ) {
649        let started_at = Instant::now();
650        let total = self
651            .fsqlite_txn_slot_crashes_detected_total
652            .fetch_add(1, Ordering::Relaxed)
653            .saturating_add(1);
654        let slot_id = Self::normalize_slot_id(slot_id);
655        let operation_elapsed_us = started_at.elapsed().as_micros().max(1);
656        let (run_id, trace_id, scenario_id) = Self::log_context_from_env();
657        let span = tracing::span!(
658            target: "fsqlite.txn_slot",
659            tracing::Level::WARN,
660            "txn_slot",
661            slot_id,
662            process_id,
663            run_id = %run_id.as_str(),
664            trace_id,
665            scenario_id = %scenario_id.as_str(),
666            operation = "crash_detect"
667        );
668        let _guard = span.enter();
669        tracing::warn!(
670            orphan_txn_id,
671            fsqlite_txn_slot_crashes_detected_total = total,
672            operation_elapsed_us,
673            run_id = %run_id.as_str(),
674            trace_id,
675            scenario_id = %scenario_id.as_str(),
676            failure_context = "orphan_slot_reclaimed_during_cleanup",
677            "orphaned transaction slot crash detected"
678        );
679    }
680
681    /// Read a point-in-time snapshot.
682    #[must_use]
683    pub fn snapshot(&self) -> TxnSlotMetricsSnapshot {
684        TxnSlotMetricsSnapshot {
685            fsqlite_txn_slots_active: self.fsqlite_txn_slots_active.load(Ordering::Relaxed),
686            fsqlite_txn_slot_crashes_detected_total: self
687                .fsqlite_txn_slot_crashes_detected_total
688                .load(Ordering::Relaxed),
689        }
690    }
691
692    /// Reset all counters to zero (tests/diagnostics).
693    pub fn reset(&self) {
694        self.fsqlite_txn_slots_active.store(0, Ordering::Relaxed);
695        self.fsqlite_txn_slot_crashes_detected_total
696            .store(0, Ordering::Relaxed);
697    }
698}
699
700/// Serializable snapshot of TxnSlot telemetry counters.
701#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
702pub struct TxnSlotMetricsSnapshot {
703    pub fsqlite_txn_slots_active: u64,
704    pub fsqlite_txn_slot_crashes_detected_total: u64,
705}
706
707impl std::fmt::Display for TxnSlotMetricsSnapshot {
708    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
709        write!(
710            f,
711            "txn_slots(active={} crashes={})",
712            self.fsqlite_txn_slots_active, self.fsqlite_txn_slot_crashes_detected_total
713        )
714    }
715}
716
717// ---------------------------------------------------------------------------
718// ConflictEvent — the core event type
719// ---------------------------------------------------------------------------
720
721/// A single conflict event emitted by the MVCC layer.
722///
723/// Each variant carries enough context to reconstruct what happened
724/// without access to internal MVCC state.
725#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
726pub enum ConflictEvent {
727    /// A page lock acquisition was denied because another txn holds it.
728    PageLockContention {
729        /// The page that was contended.
730        page: PageNumber,
731        /// The transaction that tried to acquire the lock.
732        requester: TxnId,
733        /// The transaction currently holding the lock.
734        holder: TxnId,
735        /// Monotonic event timestamp (nanoseconds since observer creation).
736        timestamp_ns: u64,
737    },
738
739    /// First-Committer-Wins (FCW) detected base drift on a page.
740    FcwBaseDrift {
741        /// The page where drift was detected.
742        page: PageNumber,
743        /// The transaction that lost the FCW race.
744        loser: TxnId,
745        /// The transaction that committed first (winner).
746        winner_commit_seq: CommitSeq,
747        /// Whether merge was attempted.
748        merge_attempted: bool,
749        /// Whether merge succeeded (if attempted).
750        merge_succeeded: bool,
751        /// Monotonic event timestamp.
752        timestamp_ns: u64,
753    },
754
755    /// SSI validation detected a dangerous structure (write skew).
756    SsiAbort {
757        /// The transaction that was aborted.
758        txn: TxnToken,
759        /// The reason for the abort.
760        reason: SsiAbortCategory,
761        /// Number of incoming rw-antidependency edges.
762        in_edge_count: usize,
763        /// Number of outgoing rw-antidependency edges.
764        out_edge_count: usize,
765        /// Monotonic event timestamp.
766        timestamp_ns: u64,
767    },
768
769    /// A transaction committed successfully after resolving conflicts.
770    ConflictResolved {
771        /// The transaction that committed.
772        txn: TxnId,
773        /// Number of page conflicts resolved via merge.
774        pages_merged: usize,
775        /// Commit sequence assigned.
776        commit_seq: CommitSeq,
777        /// Monotonic event timestamp.
778        timestamp_ns: u64,
779    },
780}
781
782/// Categorized SSI abort reason (serialization-friendly).
783#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)]
784pub enum SsiAbortCategory {
785    /// Transaction is the pivot (has both in + out rw edges).
786    Pivot,
787    /// A committed reader has an incoming rw edge.
788    CommittedPivot,
789    /// Transaction was eagerly marked for abort.
790    MarkedForAbort,
791}
792
793impl ConflictEvent {
794    /// Extract the monotonic timestamp from any event variant.
795    #[must_use]
796    pub fn timestamp_ns(&self) -> u64 {
797        match self {
798            Self::PageLockContention { timestamp_ns, .. }
799            | Self::FcwBaseDrift { timestamp_ns, .. }
800            | Self::SsiAbort { timestamp_ns, .. }
801            | Self::ConflictResolved { timestamp_ns, .. } => *timestamp_ns,
802        }
803    }
804
805    /// Whether this event represents a conflict (contention/drift/abort).
806    #[must_use]
807    pub fn is_conflict(&self) -> bool {
808        !matches!(self, Self::ConflictResolved { .. })
809    }
810}
811
812// ---------------------------------------------------------------------------
813// ConflictObserver — trait for zero-cost opt-in observation
814// ---------------------------------------------------------------------------
815
816/// Observer trait for conflict events.
817///
818/// Implementations MUST be non-blocking and MUST NOT acquire page locks.
819/// The observer is called on the hot path during lock acquisition and
820/// commit validation; expensive work should be deferred.
821pub trait ConflictObserver: Send + Sync {
822    /// Called when a conflict event occurs.
823    fn on_event(&self, event: &ConflictEvent);
824}
825
826/// No-op observer that compiles to nothing. Default when observability is
827/// not configured.
828#[derive(Debug, Clone, Copy)]
829pub struct NoOpObserver;
830
831impl ConflictObserver for NoOpObserver {
832    #[inline(always)]
833    fn on_event(&self, _event: &ConflictEvent) {}
834}
835
836// ---------------------------------------------------------------------------
837// RingBuffer — bounded event storage
838// ---------------------------------------------------------------------------
839
840/// Fixed-capacity ring buffer for storing recent conflict events.
841///
842/// When the buffer is full, the oldest event is overwritten. Thread-safe
843/// via internal `Mutex` (not on the hot path — only accessed via PRAGMA).
844pub struct ConflictRingBuffer {
845    events: Mutex<RingBuf>,
846}
847
848struct RingBuf {
849    buf: Vec<ConflictEvent>,
850    capacity: usize,
851    head: usize,
852    len: usize,
853}
854
855impl RingBuf {
856    fn new(capacity: usize) -> Self {
857        Self {
858            buf: Vec::with_capacity(capacity),
859            capacity,
860            head: 0,
861            len: 0,
862        }
863    }
864
865    fn push(&mut self, event: ConflictEvent) {
866        if self.capacity == 0 {
867            return;
868        }
869        let idx = (self.head + self.len) % self.capacity;
870        if self.buf.len() < self.capacity {
871            self.buf.push(event);
872        } else {
873            self.buf[idx] = event;
874        }
875        if self.len == self.capacity {
876            self.head = (self.head + 1) % self.capacity;
877        } else {
878            self.len += 1;
879        }
880    }
881
882    fn drain_ordered(&self) -> Vec<ConflictEvent> {
883        let mut result = Vec::with_capacity(self.len);
884        for i in 0..self.len {
885            let idx = (self.head + i) % self.capacity;
886            result.push(self.buf[idx].clone());
887        }
888        result
889    }
890
891    fn clear(&mut self) {
892        self.buf.clear();
893        self.head = 0;
894        self.len = 0;
895    }
896}
897
898impl ConflictRingBuffer {
899    /// Create a new ring buffer with the given capacity.
900    #[must_use]
901    pub fn new(capacity: usize) -> Self {
902        Self {
903            events: Mutex::new(RingBuf::new(capacity)),
904        }
905    }
906
907    /// Push an event into the ring buffer.
908    pub fn push(&self, event: ConflictEvent) {
909        self.events.lock().push(event);
910    }
911
912    /// Return all events in chronological order.
913    #[must_use]
914    pub fn snapshot(&self) -> Vec<ConflictEvent> {
915        self.events.lock().drain_ordered()
916    }
917
918    /// Clear all stored events.
919    pub fn clear(&self) {
920        self.events.lock().clear();
921    }
922
923    /// Current number of stored events.
924    #[must_use]
925    pub fn len(&self) -> usize {
926        self.events.lock().len
927    }
928
929    /// Whether the buffer is empty.
930    #[must_use]
931    pub fn is_empty(&self) -> bool {
932        self.len() == 0
933    }
934
935    /// Configured capacity.
936    #[must_use]
937    pub fn capacity(&self) -> usize {
938        self.events.lock().capacity
939    }
940}
941
942// ---------------------------------------------------------------------------
943// ConflictMetrics — aggregated statistics
944// ---------------------------------------------------------------------------
945
946/// Aggregated conflict statistics exposed via PRAGMA.
947///
948/// All counters are atomic for lock-free updates from the hot path.
949/// Statistics are per-connection (not global).
950pub struct ConflictMetrics {
951    /// Total conflict events (contention + drift + abort).
952    pub conflicts_total: AtomicU64,
953    /// Page lock contention events.
954    pub page_contentions: AtomicU64,
955    /// FCW base drift events.
956    pub fcw_drifts: AtomicU64,
957    /// FCW merge attempts.
958    pub fcw_merge_attempts: AtomicU64,
959    /// FCW merge successes.
960    pub fcw_merge_successes: AtomicU64,
961    /// SSI abort events.
962    pub ssi_aborts: AtomicU64,
963    /// Successful conflict resolutions via merge.
964    pub conflicts_resolved: AtomicU64,
965    /// Per-page contention counts (behind mutex, not hot path).
966    page_hotspots: Mutex<HashMap<PageNumber, u64>>,
967    /// Creation time for rate calculations.
968    created_at: Instant,
969}
970
971impl ConflictMetrics {
972    /// Create a new metrics instance with all counters at zero.
973    #[must_use]
974    pub fn new() -> Self {
975        Self {
976            conflicts_total: AtomicU64::new(0),
977            page_contentions: AtomicU64::new(0),
978            fcw_drifts: AtomicU64::new(0),
979            fcw_merge_attempts: AtomicU64::new(0),
980            fcw_merge_successes: AtomicU64::new(0),
981            ssi_aborts: AtomicU64::new(0),
982            conflicts_resolved: AtomicU64::new(0),
983            page_hotspots: Mutex::new(HashMap::new()),
984            created_at: Instant::now(),
985        }
986    }
987
988    /// Record a conflict event, updating all relevant counters.
989    pub fn record(&self, event: &ConflictEvent) {
990        match event {
991            ConflictEvent::PageLockContention { page, .. } => {
992                self.conflicts_total.fetch_add(1, Ordering::Relaxed);
993                self.page_contentions.fetch_add(1, Ordering::Relaxed);
994                *self.page_hotspots.lock().entry(*page).or_insert(0) += 1;
995            }
996            ConflictEvent::FcwBaseDrift {
997                page,
998                merge_attempted,
999                merge_succeeded,
1000                ..
1001            } => {
1002                self.conflicts_total.fetch_add(1, Ordering::Relaxed);
1003                self.fcw_drifts.fetch_add(1, Ordering::Relaxed);
1004                if *merge_attempted {
1005                    self.fcw_merge_attempts.fetch_add(1, Ordering::Relaxed);
1006                    if *merge_succeeded {
1007                        self.fcw_merge_successes.fetch_add(1, Ordering::Relaxed);
1008                    }
1009                }
1010                *self.page_hotspots.lock().entry(*page).or_insert(0) += 1;
1011            }
1012            ConflictEvent::SsiAbort { .. } => {
1013                self.conflicts_total.fetch_add(1, Ordering::Relaxed);
1014                self.ssi_aborts.fetch_add(1, Ordering::Relaxed);
1015            }
1016            ConflictEvent::ConflictResolved { .. } => {
1017                self.conflicts_resolved.fetch_add(1, Ordering::Relaxed);
1018            }
1019        }
1020    }
1021
1022    /// Reset all counters to zero.
1023    pub fn reset(&self) {
1024        self.conflicts_total.store(0, Ordering::Relaxed);
1025        self.page_contentions.store(0, Ordering::Relaxed);
1026        self.fcw_drifts.store(0, Ordering::Relaxed);
1027        self.fcw_merge_attempts.store(0, Ordering::Relaxed);
1028        self.fcw_merge_successes.store(0, Ordering::Relaxed);
1029        self.ssi_aborts.store(0, Ordering::Relaxed);
1030        self.conflicts_resolved.store(0, Ordering::Relaxed);
1031        self.page_hotspots.lock().clear();
1032    }
1033
1034    /// Elapsed time since metrics creation.
1035    #[must_use]
1036    pub fn elapsed(&self) -> std::time::Duration {
1037        self.created_at.elapsed()
1038    }
1039
1040    /// Conflicts per second since creation.
1041    #[must_use]
1042    #[allow(clippy::cast_precision_loss)]
1043    pub fn conflicts_per_second(&self) -> f64 {
1044        let elapsed_secs = self.created_at.elapsed().as_secs_f64();
1045        if elapsed_secs < f64::EPSILON {
1046            return 0.0;
1047        }
1048        self.conflicts_total.load(Ordering::Relaxed) as f64 / elapsed_secs
1049    }
1050
1051    /// Top N pages by contention count.
1052    #[must_use]
1053    pub fn top_hotspots(&self, n: usize) -> Vec<(PageNumber, u64)> {
1054        let mut entries: Vec<(PageNumber, u64)> = {
1055            let map = self.page_hotspots.lock();
1056            map.iter().map(|(&k, &v)| (k, v)).collect()
1057        };
1058        entries.sort_by_key(|e| std::cmp::Reverse(e.1));
1059        entries.truncate(n);
1060        entries
1061    }
1062
1063    /// Snapshot all metrics as a serializable summary.
1064    #[must_use]
1065    #[allow(clippy::cast_precision_loss)]
1066    pub fn snapshot(&self) -> ConflictMetricsSnapshot {
1067        ConflictMetricsSnapshot {
1068            conflicts_total: self.conflicts_total.load(Ordering::Relaxed),
1069            page_contentions: self.page_contentions.load(Ordering::Relaxed),
1070            fcw_drifts: self.fcw_drifts.load(Ordering::Relaxed),
1071            fcw_merge_attempts: self.fcw_merge_attempts.load(Ordering::Relaxed),
1072            fcw_merge_successes: self.fcw_merge_successes.load(Ordering::Relaxed),
1073            ssi_aborts: self.ssi_aborts.load(Ordering::Relaxed),
1074            conflicts_resolved: self.conflicts_resolved.load(Ordering::Relaxed),
1075            conflicts_per_second: self.conflicts_per_second(),
1076            elapsed_secs: self.created_at.elapsed().as_secs_f64(),
1077            top_hotspots: self.top_hotspots(10),
1078        }
1079    }
1080}
1081
1082impl Default for ConflictMetrics {
1083    fn default() -> Self {
1084        Self::new()
1085    }
1086}
1087
1088/// Serializable snapshot of conflict metrics.
1089#[derive(Debug, Clone, Serialize)]
1090pub struct ConflictMetricsSnapshot {
1091    pub conflicts_total: u64,
1092    pub page_contentions: u64,
1093    pub fcw_drifts: u64,
1094    pub fcw_merge_attempts: u64,
1095    pub fcw_merge_successes: u64,
1096    pub ssi_aborts: u64,
1097    pub conflicts_resolved: u64,
1098    pub conflicts_per_second: f64,
1099    pub elapsed_secs: f64,
1100    pub top_hotspots: Vec<(PageNumber, u64)>,
1101}
1102
1103// ---------------------------------------------------------------------------
1104// MetricsObserver — observer that records to both metrics and ring buffer
1105// ---------------------------------------------------------------------------
1106
1107/// Combined observer that records events to both a [`ConflictMetrics`]
1108/// aggregator and a [`ConflictRingBuffer`] for detailed logging.
1109pub struct MetricsObserver {
1110    metrics: ConflictMetrics,
1111    log: ConflictRingBuffer,
1112    epoch: Instant,
1113}
1114
1115impl MetricsObserver {
1116    /// Create a new metrics observer with the given ring buffer capacity.
1117    #[must_use]
1118    pub fn new(log_capacity: usize) -> Self {
1119        Self {
1120            metrics: ConflictMetrics::new(),
1121            log: ConflictRingBuffer::new(log_capacity),
1122            epoch: Instant::now(),
1123        }
1124    }
1125
1126    /// Access the aggregated metrics.
1127    #[must_use]
1128    pub fn metrics(&self) -> &ConflictMetrics {
1129        &self.metrics
1130    }
1131
1132    /// Access the conflict log ring buffer.
1133    #[must_use]
1134    pub fn log(&self) -> &ConflictRingBuffer {
1135        &self.log
1136    }
1137
1138    /// Elapsed nanoseconds since observer creation (for timestamps).
1139    #[must_use]
1140    pub fn elapsed_ns(&self) -> u64 {
1141        #[allow(clippy::cast_possible_truncation)] // clamped to u64::MAX
1142        {
1143            self.epoch.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64
1144        }
1145    }
1146
1147    /// Reset both metrics and log.
1148    pub fn reset(&self) {
1149        self.metrics.reset();
1150        self.log.clear();
1151    }
1152}
1153
1154impl ConflictObserver for MetricsObserver {
1155    fn on_event(&self, event: &ConflictEvent) {
1156        self.metrics.record(event);
1157        self.log.push(event.clone());
1158    }
1159}
1160
1161// ---------------------------------------------------------------------------
1162// Tests
1163// ---------------------------------------------------------------------------
1164
1165#[cfg(test)]
1166mod tests {
1167    use super::*;
1168
1169    fn page(n: u32) -> PageNumber {
1170        PageNumber::new(n).unwrap()
1171    }
1172
1173    fn txn(n: u64) -> TxnId {
1174        TxnId::new(n).unwrap()
1175    }
1176
1177    fn make_contention_event(pg: u32, req: u64, hold: u64) -> ConflictEvent {
1178        ConflictEvent::PageLockContention {
1179            page: page(pg),
1180            requester: txn(req),
1181            holder: txn(hold),
1182            timestamp_ns: 1000,
1183        }
1184    }
1185
1186    #[test]
1187    fn noop_observer_compiles_away() {
1188        let obs = NoOpObserver;
1189        let event = make_contention_event(1, 2, 3);
1190        obs.on_event(&event);
1191        // If this compiles and runs, it proves the no-op path works.
1192    }
1193
1194    #[test]
1195    fn ring_buffer_basic_push_and_snapshot() {
1196        let rb = ConflictRingBuffer::new(3);
1197        assert!(rb.is_empty());
1198
1199        rb.push(make_contention_event(1, 10, 20));
1200        rb.push(make_contention_event(2, 11, 21));
1201        assert_eq!(rb.len(), 2);
1202
1203        let snap = rb.snapshot();
1204        assert_eq!(snap.len(), 2);
1205        assert!(
1206            matches!(&snap[0], ConflictEvent::PageLockContention { page, .. } if page.get() == 1)
1207        );
1208        assert!(
1209            matches!(&snap[1], ConflictEvent::PageLockContention { page, .. } if page.get() == 2)
1210        );
1211    }
1212
1213    #[test]
1214    fn ring_buffer_wraps_on_overflow() {
1215        let rb = ConflictRingBuffer::new(2);
1216
1217        rb.push(make_contention_event(1, 10, 20));
1218        rb.push(make_contention_event(2, 11, 21));
1219        rb.push(make_contention_event(3, 12, 22)); // overwrites first
1220
1221        assert_eq!(rb.len(), 2);
1222        let snap = rb.snapshot();
1223        // Should contain events for pages 2 and 3 (oldest evicted)
1224        assert!(
1225            matches!(&snap[0], ConflictEvent::PageLockContention { page, .. } if page.get() == 2)
1226        );
1227        assert!(
1228            matches!(&snap[1], ConflictEvent::PageLockContention { page, .. } if page.get() == 3)
1229        );
1230    }
1231
1232    #[test]
1233    fn ring_buffer_clear() {
1234        let rb = ConflictRingBuffer::new(10);
1235        rb.push(make_contention_event(1, 10, 20));
1236        rb.push(make_contention_event(2, 11, 21));
1237        assert_eq!(rb.len(), 2);
1238
1239        rb.clear();
1240        assert!(rb.is_empty());
1241        assert!(rb.snapshot().is_empty());
1242    }
1243
1244    #[test]
1245    fn ring_buffer_zero_capacity() {
1246        let rb = ConflictRingBuffer::new(0);
1247        rb.push(make_contention_event(1, 10, 20));
1248        assert!(rb.is_empty());
1249    }
1250
1251    #[test]
1252    fn conflict_metrics_basic_recording() {
1253        let m = ConflictMetrics::new();
1254
1255        m.record(&make_contention_event(1, 10, 20));
1256        m.record(&make_contention_event(1, 11, 20)); // same page
1257        m.record(&make_contention_event(2, 12, 20));
1258
1259        assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 3);
1260        assert_eq!(m.page_contentions.load(Ordering::Relaxed), 3);
1261
1262        let hotspots = m.top_hotspots(5);
1263        assert_eq!(hotspots.len(), 2);
1264        assert_eq!(hotspots[0].0, page(1));
1265        assert_eq!(hotspots[0].1, 2);
1266    }
1267
1268    #[test]
1269    fn conflict_metrics_fcw_recording() {
1270        let m = ConflictMetrics::new();
1271
1272        m.record(&ConflictEvent::FcwBaseDrift {
1273            page: page(5),
1274            loser: txn(10),
1275            winner_commit_seq: CommitSeq::new(100),
1276            merge_attempted: true,
1277            merge_succeeded: true,
1278            timestamp_ns: 2000,
1279        });
1280
1281        assert_eq!(m.fcw_drifts.load(Ordering::Relaxed), 1);
1282        assert_eq!(m.fcw_merge_attempts.load(Ordering::Relaxed), 1);
1283        assert_eq!(m.fcw_merge_successes.load(Ordering::Relaxed), 1);
1284    }
1285
1286    #[test]
1287    fn conflict_metrics_ssi_recording() {
1288        let m = ConflictMetrics::new();
1289
1290        m.record(&ConflictEvent::SsiAbort {
1291            txn: TxnToken::new(txn(10), fsqlite_types::TxnEpoch::new(1)),
1292            reason: SsiAbortCategory::Pivot,
1293            in_edge_count: 1,
1294            out_edge_count: 1,
1295            timestamp_ns: 3000,
1296        });
1297
1298        assert_eq!(m.ssi_aborts.load(Ordering::Relaxed), 1);
1299        assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 1);
1300    }
1301
1302    #[test]
1303    fn conflict_metrics_reset() {
1304        let m = ConflictMetrics::new();
1305        m.record(&make_contention_event(1, 10, 20));
1306        assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 1);
1307
1308        m.reset();
1309        assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 0);
1310        assert_eq!(m.page_contentions.load(Ordering::Relaxed), 0);
1311        assert!(m.top_hotspots(5).is_empty());
1312    }
1313
1314    #[test]
1315    fn metrics_observer_records_both() {
1316        let obs = MetricsObserver::new(100);
1317        let event = make_contention_event(1, 10, 20);
1318        obs.on_event(&event);
1319
1320        assert_eq!(obs.metrics().conflicts_total.load(Ordering::Relaxed), 1);
1321        assert_eq!(obs.log().len(), 1);
1322    }
1323
1324    #[test]
1325    fn conflict_event_timestamp() {
1326        let event = make_contention_event(1, 10, 20);
1327        assert_eq!(event.timestamp_ns(), 1000);
1328    }
1329
1330    #[test]
1331    fn conflict_event_is_conflict() {
1332        assert!(make_contention_event(1, 10, 20).is_conflict());
1333        assert!(
1334            !ConflictEvent::ConflictResolved {
1335                txn: txn(1),
1336                pages_merged: 0,
1337                commit_seq: CommitSeq::new(1),
1338                timestamp_ns: 0,
1339            }
1340            .is_conflict()
1341        );
1342    }
1343
1344    #[test]
1345    fn metrics_snapshot_serializable() {
1346        let m = ConflictMetrics::new();
1347        m.record(&make_contention_event(1, 10, 20));
1348        let snap = m.snapshot();
1349        let json = serde_json::to_string(&snap).unwrap();
1350        assert!(json.contains("\"conflicts_total\":1"));
1351    }
1352
1353    // ===================================================================
1354    // bd-t6sv2.1: Additional observability tests
1355    // ===================================================================
1356
1357    #[test]
1358    fn ring_buffer_stress_many_pushes() {
1359        // Push far more events than capacity; verify only the last N survive.
1360        let cap = 10;
1361        let rb = ConflictRingBuffer::new(cap);
1362        for i in 1..=200_u32 {
1363            rb.push(make_contention_event(i, u64::from(i), u64::from(i) + 1));
1364        }
1365        assert_eq!(rb.len(), cap);
1366        let snap = rb.snapshot();
1367        assert_eq!(snap.len(), cap);
1368        // Oldest surviving event should be page 191.
1369        assert!(matches!(
1370            &snap[0],
1371            ConflictEvent::PageLockContention { page, .. } if page.get() == 191
1372        ),);
1373        // Newest should be page 200.
1374        assert!(matches!(
1375            &snap[cap - 1],
1376            ConflictEvent::PageLockContention { page, .. } if page.get() == 200
1377        ),);
1378    }
1379
1380    #[test]
1381    fn ring_buffer_capacity_one() {
1382        // Edge case: capacity of 1 always holds the latest event.
1383        let rb = ConflictRingBuffer::new(1);
1384        rb.push(make_contention_event(1, 10, 20));
1385        rb.push(make_contention_event(2, 11, 21));
1386        rb.push(make_contention_event(3, 12, 22));
1387        assert_eq!(rb.len(), 1);
1388        let snap = rb.snapshot();
1389        assert!(
1390            matches!(&snap[0], ConflictEvent::PageLockContention { page, .. } if page.get() == 3)
1391        );
1392    }
1393
1394    #[test]
1395    fn ring_buffer_clear_after_wrap() {
1396        // Ensure clear works correctly after the buffer has wrapped.
1397        let rb = ConflictRingBuffer::new(2);
1398        rb.push(make_contention_event(1, 10, 20));
1399        rb.push(make_contention_event(2, 11, 21));
1400        rb.push(make_contention_event(3, 12, 22)); // wrap
1401        assert_eq!(rb.len(), 2);
1402
1403        rb.clear();
1404        assert!(rb.is_empty());
1405        assert_eq!(rb.capacity(), 2);
1406
1407        // Re-use after clear.
1408        rb.push(make_contention_event(4, 13, 23));
1409        assert_eq!(rb.len(), 1);
1410        let snap = rb.snapshot();
1411        assert!(
1412            matches!(&snap[0], ConflictEvent::PageLockContention { page, .. } if page.get() == 4)
1413        );
1414    }
1415
1416    #[test]
1417    fn metrics_all_fcw_merge_combinations() {
1418        // Test all four combinations of merge_attempted x merge_succeeded.
1419        let m = ConflictMetrics::new();
1420
1421        let cases = [
1422            (false, false),
1423            (true, false),
1424            (true, true),
1425            (false, false), // duplicate no-merge
1426        ];
1427        for (attempted, succeeded) in cases {
1428            m.record(&ConflictEvent::FcwBaseDrift {
1429                page: page(1),
1430                loser: txn(1),
1431                winner_commit_seq: CommitSeq::new(1),
1432                merge_attempted: attempted,
1433                merge_succeeded: succeeded,
1434                timestamp_ns: 0,
1435            });
1436        }
1437
1438        assert_eq!(m.fcw_drifts.load(Ordering::Relaxed), 4);
1439        assert_eq!(m.fcw_merge_attempts.load(Ordering::Relaxed), 2);
1440        assert_eq!(m.fcw_merge_successes.load(Ordering::Relaxed), 1);
1441    }
1442
1443    #[test]
1444    fn metrics_all_ssi_abort_categories() {
1445        let m = ConflictMetrics::new();
1446
1447        for reason in [
1448            SsiAbortCategory::Pivot,
1449            SsiAbortCategory::CommittedPivot,
1450            SsiAbortCategory::MarkedForAbort,
1451        ] {
1452            m.record(&ConflictEvent::SsiAbort {
1453                txn: TxnToken::new(txn(1), fsqlite_types::TxnEpoch::new(1)),
1454                reason,
1455                in_edge_count: 1,
1456                out_edge_count: 1,
1457                timestamp_ns: 0,
1458            });
1459        }
1460
1461        assert_eq!(m.ssi_aborts.load(Ordering::Relaxed), 3);
1462        assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 3);
1463    }
1464
1465    #[test]
1466    fn trace_metrics_snapshot_and_reset() {
1467        reset_trace_metrics();
1468        record_trace_span_created();
1469        record_trace_span_created();
1470        record_trace_export(2, 17);
1471        record_trace_export_error();
1472        record_compat_trace_callback();
1473
1474        let snapshot = trace_metrics_snapshot();
1475        assert_eq!(snapshot.fsqlite_trace_spans_total, 2);
1476        assert_eq!(snapshot.fsqlite_trace_export_errors_total, 1);
1477        assert_eq!(snapshot.fsqlite_compat_trace_callbacks_total, 1);
1478
1479        reset_trace_metrics();
1480        let reset = trace_metrics_snapshot();
1481        assert_eq!(reset.fsqlite_trace_spans_total, 0);
1482        assert_eq!(reset.fsqlite_trace_export_errors_total, 0);
1483        assert_eq!(reset.fsqlite_compat_trace_callbacks_total, 0);
1484    }
1485
1486    #[test]
1487    fn io_uring_latency_snapshot_and_reset() {
1488        reset_io_uring_latency_metrics();
1489
1490        record_io_uring_read_latency(Duration::from_micros(40));
1491        record_io_uring_read_latency(Duration::from_micros(125));
1492        record_io_uring_write_latency(Duration::from_micros(55));
1493        record_io_uring_unix_fallback();
1494
1495        let snapshot = io_uring_latency_snapshot();
1496        assert_eq!(snapshot.read_samples_total, 2);
1497        assert_eq!(snapshot.write_samples_total, 1);
1498        assert_eq!(snapshot.unix_fallbacks_total, 1);
1499        assert!(snapshot.read_tail_violations_total <= snapshot.read_samples_total);
1500        assert!(snapshot.write_tail_violations_total <= snapshot.write_samples_total);
1501        assert!(snapshot.read_p99_latency_us >= 125);
1502        assert!(snapshot.write_p99_latency_us >= 55);
1503        assert!(snapshot.read_conformal_upper_bound_us >= snapshot.read_p99_latency_us);
1504        assert!(snapshot.write_conformal_upper_bound_us >= snapshot.write_p99_latency_us);
1505
1506        reset_io_uring_latency_metrics();
1507        let reset = io_uring_latency_snapshot();
1508        assert_eq!(reset.read_samples_total, 0);
1509        assert_eq!(reset.write_samples_total, 0);
1510        assert_eq!(reset.unix_fallbacks_total, 0);
1511        assert_eq!(reset.read_tail_violations_total, 0);
1512        assert_eq!(reset.write_tail_violations_total, 0);
1513        assert_eq!(reset.read_window_len, 0);
1514        assert_eq!(reset.write_window_len, 0);
1515    }
1516
1517    #[test]
1518    fn io_uring_latency_conformal_upper_bound_is_tail_safe() {
1519        let metrics = IoUringLatencyMetrics::new(16);
1520        let mut saw_violation = false;
1521        for latency in [20_u64, 22, 21, 23, 20, 24, 26, 200] {
1522            if metrics.record_read_latency(Duration::from_micros(latency)) {
1523                saw_violation = true;
1524            }
1525        }
1526
1527        let snapshot = metrics.snapshot();
1528        assert!(snapshot.read_p99_latency_us >= 200);
1529        assert!(snapshot.read_conformal_upper_bound_us >= snapshot.read_p99_latency_us);
1530        assert!(saw_violation);
1531        assert!(snapshot.read_tail_violations_total >= 1);
1532    }
1533
1534    #[test]
1535    fn trace_and_decision_ids_are_monotonic() {
1536        let first_trace = next_trace_id();
1537        let second_trace = next_trace_id();
1538        assert!(second_trace > first_trace);
1539
1540        let first_decision = next_decision_id();
1541        let second_decision = next_decision_id();
1542        assert!(second_decision > first_decision);
1543    }
1544
1545    #[test]
1546    fn metrics_conflict_resolved_not_counted_as_conflict() {
1547        // ConflictResolved increments resolved counter but NOT conflicts_total.
1548        let m = ConflictMetrics::new();
1549        for i in 1..=5_u64 {
1550            m.record(&ConflictEvent::ConflictResolved {
1551                txn: txn(i),
1552                pages_merged: 2,
1553                commit_seq: CommitSeq::new(i * 10),
1554                timestamp_ns: 0,
1555            });
1556        }
1557
1558        assert_eq!(m.conflicts_resolved.load(Ordering::Relaxed), 5);
1559        assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 0);
1560    }
1561
1562    #[test]
1563    fn metrics_hotspot_ordering() {
1564        // Verify top_hotspots returns pages sorted by descending frequency.
1565        let m = ConflictMetrics::new();
1566        // Page 5: 3 contentions, page 10: 1, page 15: 2.
1567        for _ in 0..3 {
1568            m.record(&make_contention_event(5, 1, 2));
1569        }
1570        m.record(&make_contention_event(10, 1, 2));
1571        for _ in 0..2 {
1572            m.record(&make_contention_event(15, 1, 2));
1573        }
1574
1575        let hotspots = m.top_hotspots(3);
1576        assert_eq!(hotspots.len(), 3);
1577        assert_eq!(hotspots[0], (page(5), 3));
1578        assert_eq!(hotspots[1], (page(15), 2));
1579        assert_eq!(hotspots[2], (page(10), 1));
1580    }
1581
1582    #[test]
1583    fn metrics_hotspot_truncation() {
1584        // top_hotspots(N) should return at most N entries.
1585        let m = ConflictMetrics::new();
1586        for i in 1..=20_u32 {
1587            m.record(&make_contention_event(i, 1, 2));
1588        }
1589        assert_eq!(m.top_hotspots(5).len(), 5);
1590        assert_eq!(m.top_hotspots(0).len(), 0);
1591    }
1592
1593    #[test]
1594    fn metrics_snapshot_all_fields() {
1595        // Verify snapshot captures all counter types accurately.
1596        let m = ConflictMetrics::new();
1597        m.record(&make_contention_event(1, 10, 20));
1598        m.record(&ConflictEvent::FcwBaseDrift {
1599            page: page(2),
1600            loser: txn(3),
1601            winner_commit_seq: CommitSeq::new(100),
1602            merge_attempted: true,
1603            merge_succeeded: false,
1604            timestamp_ns: 0,
1605        });
1606        m.record(&ConflictEvent::SsiAbort {
1607            txn: TxnToken::new(txn(4), fsqlite_types::TxnEpoch::new(1)),
1608            reason: SsiAbortCategory::Pivot,
1609            in_edge_count: 2,
1610            out_edge_count: 3,
1611            timestamp_ns: 0,
1612        });
1613        m.record(&ConflictEvent::ConflictResolved {
1614            txn: txn(5),
1615            pages_merged: 1,
1616            commit_seq: CommitSeq::new(200),
1617            timestamp_ns: 0,
1618        });
1619
1620        let snap = m.snapshot();
1621        assert_eq!(snap.conflicts_total, 3); // contention + drift + abort
1622        assert_eq!(snap.page_contentions, 1);
1623        assert_eq!(snap.fcw_drifts, 1);
1624        assert_eq!(snap.fcw_merge_attempts, 1);
1625        assert_eq!(snap.fcw_merge_successes, 0);
1626        assert_eq!(snap.ssi_aborts, 1);
1627        assert_eq!(snap.conflicts_resolved, 1);
1628        assert!(snap.elapsed_secs >= 0.0);
1629    }
1630
1631    #[test]
1632    fn metrics_observer_log_preserves_order() {
1633        // Events in the ring buffer are in chronological order.
1634        let obs = MetricsObserver::new(100);
1635        for i in 1..=5_u32 {
1636            obs.on_event(&make_contention_event(i, u64::from(i), u64::from(i) + 10));
1637        }
1638
1639        let events = obs.log().snapshot();
1640        assert_eq!(events.len(), 5);
1641        for (idx, event) in events.iter().enumerate() {
1642            let expected_page = u32::try_from(idx + 1).unwrap();
1643            assert!(matches!(
1644                event,
1645                ConflictEvent::PageLockContention { page, .. } if page.get() == expected_page
1646            ),);
1647        }
1648    }
1649
1650    #[test]
1651    fn metrics_observer_elapsed_ns_monotonic() {
1652        let obs = MetricsObserver::new(10);
1653        let t1 = obs.elapsed_ns();
1654        // Busy-wait briefly to ensure some time passes.
1655        std::thread::yield_now();
1656        let t2 = obs.elapsed_ns();
1657        assert!(t2 >= t1, "elapsed_ns must be monotonically non-decreasing");
1658    }
1659
1660    #[test]
1661    fn conflict_event_serde_roundtrip() {
1662        // All event variants should serialize to JSON and back.
1663        let events = vec![
1664            make_contention_event(1, 2, 3),
1665            ConflictEvent::FcwBaseDrift {
1666                page: page(4),
1667                loser: txn(5),
1668                winner_commit_seq: CommitSeq::new(100),
1669                merge_attempted: true,
1670                merge_succeeded: true,
1671                timestamp_ns: 42,
1672            },
1673            ConflictEvent::SsiAbort {
1674                txn: TxnToken::new(txn(6), fsqlite_types::TxnEpoch::new(2)),
1675                reason: SsiAbortCategory::CommittedPivot,
1676                in_edge_count: 3,
1677                out_edge_count: 4,
1678                timestamp_ns: 99,
1679            },
1680            ConflictEvent::ConflictResolved {
1681                txn: txn(7),
1682                pages_merged: 5,
1683                commit_seq: CommitSeq::new(200),
1684                timestamp_ns: 123,
1685            },
1686        ];
1687
1688        for event in &events {
1689            let json = serde_json::to_string(event).unwrap();
1690            assert!(!json.is_empty(), "serialization should produce output");
1691        }
1692    }
1693
1694    #[test]
1695    fn conflict_event_is_conflict_all_variants() {
1696        assert!(make_contention_event(1, 2, 3).is_conflict());
1697
1698        assert!(
1699            ConflictEvent::FcwBaseDrift {
1700                page: page(1),
1701                loser: txn(1),
1702                winner_commit_seq: CommitSeq::new(1),
1703                merge_attempted: false,
1704                merge_succeeded: false,
1705                timestamp_ns: 0,
1706            }
1707            .is_conflict()
1708        );
1709
1710        assert!(
1711            ConflictEvent::SsiAbort {
1712                txn: TxnToken::new(txn(1), fsqlite_types::TxnEpoch::new(1)),
1713                reason: SsiAbortCategory::Pivot,
1714                in_edge_count: 0,
1715                out_edge_count: 0,
1716                timestamp_ns: 0,
1717            }
1718            .is_conflict()
1719        );
1720
1721        assert!(
1722            !ConflictEvent::ConflictResolved {
1723                txn: txn(1),
1724                pages_merged: 0,
1725                commit_seq: CommitSeq::new(1),
1726                timestamp_ns: 0,
1727            }
1728            .is_conflict()
1729        );
1730    }
1731
1732    // ===================================================================
1733    // bd-2g5.6.1: Cx propagation telemetry tests
1734    // ===================================================================
1735
1736    #[test]
1737    fn cx_propagation_metrics_basic() {
1738        GLOBAL_CX_PROPAGATION_METRICS.reset();
1739
1740        GLOBAL_CX_PROPAGATION_METRICS.record_propagation_success();
1741        GLOBAL_CX_PROPAGATION_METRICS.record_propagation_success();
1742        GLOBAL_CX_PROPAGATION_METRICS.record_propagation_failure("test_site_1");
1743        GLOBAL_CX_PROPAGATION_METRICS.record_cancellation_cleanup();
1744        GLOBAL_CX_PROPAGATION_METRICS.record_trace_linkage();
1745        GLOBAL_CX_PROPAGATION_METRICS.record_cx_created();
1746        GLOBAL_CX_PROPAGATION_METRICS.record_cancel_propagation();
1747
1748        let snap = GLOBAL_CX_PROPAGATION_METRICS.snapshot();
1749        assert_eq!(snap.propagation_successes_total, 2);
1750        assert_eq!(snap.propagation_failures_total, 1);
1751        assert_eq!(snap.cancellation_cleanups_total, 1);
1752        assert_eq!(snap.trace_linkages_total, 1);
1753        assert_eq!(snap.cx_created_total, 1);
1754        assert_eq!(snap.cancel_propagations_total, 1);
1755    }
1756
1757    #[test]
1758    fn cx_propagation_metrics_reset() {
1759        GLOBAL_CX_PROPAGATION_METRICS.reset();
1760        GLOBAL_CX_PROPAGATION_METRICS.record_propagation_success();
1761        GLOBAL_CX_PROPAGATION_METRICS.record_propagation_failure("reset_test");
1762        assert!(
1763            GLOBAL_CX_PROPAGATION_METRICS
1764                .propagation_successes_total
1765                .load(Ordering::Relaxed)
1766                > 0
1767        );
1768
1769        GLOBAL_CX_PROPAGATION_METRICS.reset();
1770        let snap = GLOBAL_CX_PROPAGATION_METRICS.snapshot();
1771        assert_eq!(snap.propagation_successes_total, 0);
1772        assert_eq!(snap.propagation_failures_total, 0);
1773        assert_eq!(snap.cancellation_cleanups_total, 0);
1774        assert_eq!(snap.trace_linkages_total, 0);
1775        assert_eq!(snap.cx_created_total, 0);
1776        assert_eq!(snap.cancel_propagations_total, 0);
1777    }
1778
1779    #[test]
1780    #[allow(clippy::float_cmp)]
1781    fn cx_propagation_failure_ratio() {
1782        let m = CxPropagationMetrics::new();
1783
1784        // Zero total → 0.0 ratio.
1785        assert_eq!(m.snapshot().failure_ratio(), 0.0);
1786
1787        // 1 success, 0 failures → 0.0 ratio.
1788        m.record_propagation_success();
1789        assert!((m.snapshot().failure_ratio() - 0.0).abs() < f64::EPSILON);
1790
1791        // 1 success, 1 failure → 0.5 ratio.
1792        m.record_propagation_failure("ratio_test");
1793        assert!((m.snapshot().failure_ratio() - 0.5).abs() < f64::EPSILON);
1794
1795        // 1 success, 3 failures → 0.75 ratio.
1796        m.record_propagation_failure("ratio_test");
1797        m.record_propagation_failure("ratio_test");
1798        assert!((m.snapshot().failure_ratio() - 0.75).abs() < f64::EPSILON);
1799    }
1800
1801    #[test]
1802    fn cx_propagation_snapshot_display() {
1803        let m = CxPropagationMetrics::new();
1804        m.record_propagation_success();
1805        m.record_propagation_success();
1806        m.record_propagation_failure("display_test");
1807        let display = format!("{}", m.snapshot());
1808        assert!(display.contains("ok=2"));
1809        assert!(display.contains("fail=1"));
1810        assert!(display.contains("fail_ratio="));
1811    }
1812
1813    #[test]
1814    fn cx_propagation_snapshot_serializable() {
1815        let m = CxPropagationMetrics::new();
1816        m.record_propagation_success();
1817        m.record_trace_linkage();
1818        let snap = m.snapshot();
1819        let json = serde_json::to_string(&snap).unwrap();
1820        assert!(json.contains("\"propagation_successes_total\":1"));
1821        assert!(json.contains("\"trace_linkages_total\":1"));
1822    }
1823
1824    #[test]
1825    fn cx_propagation_independent_counters() {
1826        // Each counter increments independently.
1827        let m = CxPropagationMetrics::new();
1828        for _ in 0..5 {
1829            m.record_propagation_success();
1830        }
1831        for _ in 0..3 {
1832            m.record_cancellation_cleanup();
1833        }
1834        m.record_cx_created();
1835        m.record_cx_created();
1836
1837        let snap = m.snapshot();
1838        assert_eq!(snap.propagation_successes_total, 5);
1839        assert_eq!(snap.propagation_failures_total, 0);
1840        assert_eq!(snap.cancellation_cleanups_total, 3);
1841        assert_eq!(snap.trace_linkages_total, 0);
1842        assert_eq!(snap.cx_created_total, 2);
1843        assert_eq!(snap.cancel_propagations_total, 0);
1844    }
1845
1846    #[test]
1847    fn cx_propagation_concurrent_safety() {
1848        // Multiple threads can record concurrently without panicking.
1849        let m = &CxPropagationMetrics::new();
1850        let barrier = std::sync::Arc::new(std::sync::Barrier::new(4));
1851        std::thread::scope(|s| {
1852            for _ in 0..4 {
1853                let b = barrier.clone();
1854                s.spawn(move || {
1855                    b.wait();
1856                    for _ in 0..100 {
1857                        m.record_propagation_success();
1858                        m.record_propagation_failure("concurrent_test");
1859                        m.record_cancellation_cleanup();
1860                        m.record_trace_linkage();
1861                        m.record_cx_created();
1862                        m.record_cancel_propagation();
1863                    }
1864                });
1865            }
1866        });
1867
1868        let snap = m.snapshot();
1869        assert_eq!(snap.propagation_successes_total, 400);
1870        assert_eq!(snap.propagation_failures_total, 400);
1871        assert_eq!(snap.cancellation_cleanups_total, 400);
1872        assert_eq!(snap.trace_linkages_total, 400);
1873        assert_eq!(snap.cx_created_total, 400);
1874        assert_eq!(snap.cancel_propagations_total, 400);
1875    }
1876
1877    // ===================================================================
1878    // bd-2g5.1: TxnSlot telemetry tests
1879    // ===================================================================
1880
1881    #[test]
1882    fn txn_slot_metrics_alloc_release_and_crash() {
1883        let m = TxnSlotMetrics::new();
1884
1885        m.record_slot_allocated(3, 1001);
1886        m.record_slot_allocated(4, 1001);
1887        m.record_crash_detected(Some(4), 1001, 42);
1888        m.record_slot_released(Some(4), 1001);
1889
1890        let snap = m.snapshot();
1891        assert_eq!(snap.fsqlite_txn_slots_active, 1);
1892        assert_eq!(snap.fsqlite_txn_slot_crashes_detected_total, 1);
1893    }
1894
1895    #[test]
1896    fn txn_slot_metrics_release_saturates_at_zero() {
1897        let m = TxnSlotMetrics::new();
1898
1899        // Releasing without prior alloc should never underflow.
1900        m.record_slot_released(None, 0);
1901        m.record_slot_released(None, 0);
1902
1903        let snap = m.snapshot();
1904        assert_eq!(snap.fsqlite_txn_slots_active, 0);
1905        assert_eq!(snap.fsqlite_txn_slot_crashes_detected_total, 0);
1906    }
1907
1908    #[test]
1909    fn txn_slot_metrics_snapshot_display_and_serde() {
1910        let m = TxnSlotMetrics::new();
1911        m.record_slot_allocated(7, 2222);
1912        m.record_crash_detected(None, 2222, 9001);
1913
1914        let snap = m.snapshot();
1915        let display = format!("{snap}");
1916        assert!(display.contains("txn_slots(active=1 crashes=1)"));
1917
1918        let json = serde_json::to_string(&snap).unwrap();
1919        assert!(json.contains("\"fsqlite_txn_slots_active\":1"));
1920        assert!(json.contains("\"fsqlite_txn_slot_crashes_detected_total\":1"));
1921    }
1922}