1use fsqlite_types::sync_primitives::{Duration, Instant, Mutex};
17use fsqlite_types::{CommitSeq, PageNumber, TxnId, TxnToken};
18use serde::Serialize;
19use std::collections::{HashMap, VecDeque};
20use std::sync::LazyLock;
21use std::sync::atomic::{AtomicU64, Ordering};
22
23static FSQLITE_TRACE_SPANS_TOTAL: AtomicU64 = AtomicU64::new(0);
28static FSQLITE_TRACE_EXPORT_ERRORS_TOTAL: AtomicU64 = AtomicU64::new(0);
29static FSQLITE_COMPAT_TRACE_CALLBACKS_TOTAL: AtomicU64 = AtomicU64::new(0);
30static TRACE_ID_SEQUENCE: AtomicU64 = AtomicU64::new(1);
31static DECISION_ID_SEQUENCE: AtomicU64 = AtomicU64::new(1);
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
35pub struct TraceMetricsSnapshot {
36 pub fsqlite_trace_spans_total: u64,
37 pub fsqlite_trace_export_errors_total: u64,
38 pub fsqlite_compat_trace_callbacks_total: u64,
39}
40
41#[must_use]
43pub fn next_trace_id() -> u64 {
44 TRACE_ID_SEQUENCE.fetch_add(1, Ordering::Relaxed)
45}
46
47#[must_use]
49pub fn next_decision_id() -> u64 {
50 DECISION_ID_SEQUENCE.fetch_add(1, Ordering::Relaxed)
51}
52
53pub fn record_trace_span_created() {
55 FSQLITE_TRACE_SPANS_TOTAL.fetch_add(1, Ordering::Relaxed);
56}
57
58pub fn record_trace_export(spans_exported: u64, export_latency_us: u64) {
60 if !tracing::enabled!(target: "fsqlite.trace_export", tracing::Level::DEBUG) {
61 return;
62 }
63 let span = tracing::span!(
64 target: "fsqlite.trace_export",
65 tracing::Level::DEBUG,
66 "trace_export",
67 spans_exported,
68 export_latency_us
69 );
70 let _guard = span.enter();
71 tracing::debug!("trace export observed");
72}
73
74pub fn record_trace_export_error() {
76 FSQLITE_TRACE_EXPORT_ERRORS_TOTAL.fetch_add(1, Ordering::Relaxed);
77}
78
79pub fn record_compat_trace_callback() {
81 FSQLITE_COMPAT_TRACE_CALLBACKS_TOTAL.fetch_add(1, Ordering::Relaxed);
82}
83
84#[must_use]
86pub fn trace_metrics_snapshot() -> TraceMetricsSnapshot {
87 TraceMetricsSnapshot {
88 fsqlite_trace_spans_total: FSQLITE_TRACE_SPANS_TOTAL.load(Ordering::Relaxed),
89 fsqlite_trace_export_errors_total: FSQLITE_TRACE_EXPORT_ERRORS_TOTAL
90 .load(Ordering::Relaxed),
91 fsqlite_compat_trace_callbacks_total: FSQLITE_COMPAT_TRACE_CALLBACKS_TOTAL
92 .load(Ordering::Relaxed),
93 }
94}
95
96pub fn reset_trace_metrics() {
98 FSQLITE_TRACE_SPANS_TOTAL.store(0, Ordering::Relaxed);
99 FSQLITE_TRACE_EXPORT_ERRORS_TOTAL.store(0, Ordering::Relaxed);
100 FSQLITE_COMPAT_TRACE_CALLBACKS_TOTAL.store(0, Ordering::Relaxed);
101}
102
103const IO_URING_LATENCY_WINDOW_CAPACITY: usize = 1024;
108const P99_NUMERATOR: usize = 99;
109const P99_DENOMINATOR: usize = 100;
110
111pub static GLOBAL_IO_URING_LATENCY_METRICS: LazyLock<IoUringLatencyMetrics> =
113 LazyLock::new(|| IoUringLatencyMetrics::new(IO_URING_LATENCY_WINDOW_CAPACITY));
114
115#[derive(Debug, Clone, Serialize)]
116pub struct IoUringLatencySnapshot {
117 pub read_samples_total: u64,
118 pub write_samples_total: u64,
119 pub unix_fallbacks_total: u64,
120 pub read_tail_violations_total: u64,
121 pub write_tail_violations_total: u64,
122 pub window_capacity: usize,
123 pub read_window_len: usize,
124 pub write_window_len: usize,
125 pub read_p99_latency_us: u64,
126 pub write_p99_latency_us: u64,
127 pub read_conformal_upper_bound_us: u64,
128 pub write_conformal_upper_bound_us: u64,
129}
130
131#[derive(Default)]
132struct IoLatencySeries {
133 latencies_ns: VecDeque<u64>,
134 nonconformity_ns: VecDeque<u64>,
135}
136
137impl IoLatencySeries {
138 fn push(&mut self, sample_ns: u64, sample_capacity: usize) {
139 let baseline = self.p99_latency_ns().unwrap_or(sample_ns);
140 let score = sample_ns.saturating_sub(baseline);
141 push_bounded(&mut self.latencies_ns, sample_ns, sample_capacity);
142 push_bounded(&mut self.nonconformity_ns, score, sample_capacity);
143 }
144
145 fn p99_latency_ns(&self) -> Option<u64> {
146 quantile_from_deque(&self.latencies_ns, P99_NUMERATOR, P99_DENOMINATOR)
147 }
148
149 fn conformal_upper_bound_ns(&self) -> Option<u64> {
150 let baseline = self.p99_latency_ns()?;
151 let q = conformal_quantile(&self.nonconformity_ns)?;
152 Some(baseline.saturating_add(q))
153 }
154
155 fn reset(&mut self) {
156 self.latencies_ns.clear();
157 self.nonconformity_ns.clear();
158 }
159}
160
161#[derive(Default)]
162struct IoUringLatencyWindow {
163 read: IoLatencySeries,
164 write: IoLatencySeries,
165}
166
167pub struct IoUringLatencyMetrics {
168 pub read_samples_total: AtomicU64,
169 pub write_samples_total: AtomicU64,
170 pub unix_fallbacks_total: AtomicU64,
171 pub read_tail_violations_total: AtomicU64,
172 pub write_tail_violations_total: AtomicU64,
173 sample_capacity: usize,
174 window: Mutex<IoUringLatencyWindow>,
175}
176
177impl IoUringLatencyMetrics {
178 #[must_use]
179 pub const fn new(sample_capacity: usize) -> Self {
180 Self {
181 read_samples_total: AtomicU64::new(0),
182 write_samples_total: AtomicU64::new(0),
183 unix_fallbacks_total: AtomicU64::new(0),
184 read_tail_violations_total: AtomicU64::new(0),
185 write_tail_violations_total: AtomicU64::new(0),
186 sample_capacity,
187 window: Mutex::new(IoUringLatencyWindow {
188 read: IoLatencySeries {
189 latencies_ns: VecDeque::new(),
190 nonconformity_ns: VecDeque::new(),
191 },
192 write: IoLatencySeries {
193 latencies_ns: VecDeque::new(),
194 nonconformity_ns: VecDeque::new(),
195 },
196 }),
197 }
198 }
199
200 pub fn record_read_latency(&self, latency: Duration) -> bool {
201 self.read_samples_total.fetch_add(1, Ordering::Relaxed);
202 let sample_ns = duration_to_nanos_saturated(latency);
203 let mut window = self.window.lock();
204 let prior_bound = window.read.conformal_upper_bound_ns();
205 window.read.push(sample_ns, self.sample_capacity);
206 let is_tail_violation = prior_bound.is_some_and(|bound| sample_ns > bound);
207 if is_tail_violation {
208 self.read_tail_violations_total
209 .fetch_add(1, Ordering::Relaxed);
210 }
211 is_tail_violation
212 }
213
214 pub fn record_write_latency(&self, latency: Duration) -> bool {
215 self.write_samples_total.fetch_add(1, Ordering::Relaxed);
216 let sample_ns = duration_to_nanos_saturated(latency);
217 let mut window = self.window.lock();
218 let prior_bound = window.write.conformal_upper_bound_ns();
219 window.write.push(sample_ns, self.sample_capacity);
220 let is_tail_violation = prior_bound.is_some_and(|bound| sample_ns > bound);
221 if is_tail_violation {
222 self.write_tail_violations_total
223 .fetch_add(1, Ordering::Relaxed);
224 }
225 is_tail_violation
226 }
227
228 pub fn record_unix_fallback(&self) {
229 self.unix_fallbacks_total.fetch_add(1, Ordering::Relaxed);
230 }
231
232 #[must_use]
233 pub fn snapshot(&self) -> IoUringLatencySnapshot {
234 let window = self.window.lock();
235
236 IoUringLatencySnapshot {
237 read_samples_total: self.read_samples_total.load(Ordering::Relaxed),
238 write_samples_total: self.write_samples_total.load(Ordering::Relaxed),
239 unix_fallbacks_total: self.unix_fallbacks_total.load(Ordering::Relaxed),
240 read_tail_violations_total: self.read_tail_violations_total.load(Ordering::Relaxed),
241 write_tail_violations_total: self.write_tail_violations_total.load(Ordering::Relaxed),
242 window_capacity: self.sample_capacity,
243 read_window_len: window.read.latencies_ns.len(),
244 write_window_len: window.write.latencies_ns.len(),
245 read_p99_latency_us: nanos_to_micros(window.read.p99_latency_ns().unwrap_or(0)),
246 write_p99_latency_us: nanos_to_micros(window.write.p99_latency_ns().unwrap_or(0)),
247 read_conformal_upper_bound_us: nanos_to_micros(
248 window.read.conformal_upper_bound_ns().unwrap_or(0),
249 ),
250 write_conformal_upper_bound_us: nanos_to_micros(
251 window.write.conformal_upper_bound_ns().unwrap_or(0),
252 ),
253 }
254 }
255
256 pub fn reset(&self) {
257 self.read_samples_total.store(0, Ordering::Relaxed);
258 self.write_samples_total.store(0, Ordering::Relaxed);
259 self.unix_fallbacks_total.store(0, Ordering::Relaxed);
260 self.read_tail_violations_total.store(0, Ordering::Relaxed);
261 self.write_tail_violations_total.store(0, Ordering::Relaxed);
262 let mut window = self.window.lock();
263 window.read.reset();
264 window.write.reset();
265 }
266}
267
268impl Default for IoUringLatencyMetrics {
269 fn default() -> Self {
270 Self::new(IO_URING_LATENCY_WINDOW_CAPACITY)
271 }
272}
273
274pub fn record_io_uring_read_latency(latency: Duration) -> bool {
275 GLOBAL_IO_URING_LATENCY_METRICS.record_read_latency(latency)
276}
277
278pub fn record_io_uring_write_latency(latency: Duration) -> bool {
279 GLOBAL_IO_URING_LATENCY_METRICS.record_write_latency(latency)
280}
281
282pub fn record_io_uring_unix_fallback() {
283 GLOBAL_IO_URING_LATENCY_METRICS.record_unix_fallback();
284}
285
286#[must_use]
287pub fn io_uring_latency_snapshot() -> IoUringLatencySnapshot {
288 GLOBAL_IO_URING_LATENCY_METRICS.snapshot()
289}
290
291pub fn reset_io_uring_latency_metrics() {
292 GLOBAL_IO_URING_LATENCY_METRICS.reset();
293}
294
295fn push_bounded(buffer: &mut VecDeque<u64>, value: u64, sample_capacity: usize) {
296 if sample_capacity == 0 {
297 return;
298 }
299 if buffer.len() == sample_capacity {
300 let _ = buffer.pop_front();
301 }
302 buffer.push_back(value);
303}
304
305fn quantile_from_deque(
306 values: &VecDeque<u64>,
307 numerator: usize,
308 denominator: usize,
309) -> Option<u64> {
310 if values.is_empty() || denominator == 0 {
311 return None;
312 }
313
314 let mut sorted: Vec<u64> = values.iter().copied().collect();
315 sorted.sort_unstable();
316
317 let n = sorted.len();
318 let rank = numerator
319 .saturating_mul(n)
320 .div_ceil(denominator)
321 .saturating_sub(1)
322 .min(n.saturating_sub(1));
323 sorted.get(rank).copied()
324}
325
326fn conformal_quantile(nonconformity: &VecDeque<u64>) -> Option<u64> {
327 if nonconformity.is_empty() {
328 return None;
329 }
330
331 let mut sorted: Vec<u64> = nonconformity.iter().copied().collect();
332 sorted.sort_unstable();
333
334 let n = sorted.len();
335 let rank = P99_NUMERATOR
336 .saturating_mul(n.saturating_add(1))
337 .div_ceil(P99_DENOMINATOR)
338 .saturating_sub(1)
339 .min(n.saturating_sub(1));
340 sorted.get(rank).copied()
341}
342
343fn nanos_to_micros(nanos: u64) -> u64 {
344 nanos / 1_000
345}
346
347fn duration_to_nanos_saturated(duration: Duration) -> u64 {
348 duration.as_nanos().min(u128::from(u64::MAX)) as u64
349}
350
351pub static GLOBAL_CX_PROPAGATION_METRICS: CxPropagationMetrics = CxPropagationMetrics::new();
363
364pub struct CxPropagationMetrics {
370 pub propagation_successes_total: AtomicU64,
372 pub propagation_failures_total: AtomicU64,
374 pub cancellation_cleanups_total: AtomicU64,
376 pub trace_linkages_total: AtomicU64,
378 pub cx_created_total: AtomicU64,
380 pub cancel_propagations_total: AtomicU64,
382}
383
384impl Default for CxPropagationMetrics {
385 fn default() -> Self {
386 Self::new()
387 }
388}
389
390impl CxPropagationMetrics {
391 #[must_use]
393 pub const fn new() -> Self {
394 Self {
395 propagation_successes_total: AtomicU64::new(0),
396 propagation_failures_total: AtomicU64::new(0),
397 cancellation_cleanups_total: AtomicU64::new(0),
398 trace_linkages_total: AtomicU64::new(0),
399 cx_created_total: AtomicU64::new(0),
400 cancel_propagations_total: AtomicU64::new(0),
401 }
402 }
403
404 pub fn record_propagation_success(&self) {
406 self.propagation_successes_total
407 .fetch_add(1, Ordering::Relaxed);
408 }
409
410 pub fn record_propagation_failure(&self, site: &str) {
412 self.propagation_failures_total
413 .fetch_add(1, Ordering::Relaxed);
414 tracing::warn!(
415 target: "fsqlite.cx_propagation",
416 site,
417 "missing or invalid Cx propagation detected"
418 );
419 }
420
421 pub fn record_cancellation_cleanup(&self) {
423 self.cancellation_cleanups_total
424 .fetch_add(1, Ordering::Relaxed);
425 }
426
427 pub fn record_trace_linkage(&self) {
429 self.trace_linkages_total.fetch_add(1, Ordering::Relaxed);
430 }
431
432 pub fn record_cx_created(&self) {
434 self.cx_created_total.fetch_add(1, Ordering::Relaxed);
435 }
436
437 pub fn record_cancel_propagation(&self) {
439 self.cancel_propagations_total
440 .fetch_add(1, Ordering::Relaxed);
441 }
442
443 #[must_use]
445 pub fn snapshot(&self) -> CxPropagationMetricsSnapshot {
446 CxPropagationMetricsSnapshot {
447 propagation_successes_total: self.propagation_successes_total.load(Ordering::Relaxed),
448 propagation_failures_total: self.propagation_failures_total.load(Ordering::Relaxed),
449 cancellation_cleanups_total: self.cancellation_cleanups_total.load(Ordering::Relaxed),
450 trace_linkages_total: self.trace_linkages_total.load(Ordering::Relaxed),
451 cx_created_total: self.cx_created_total.load(Ordering::Relaxed),
452 cancel_propagations_total: self.cancel_propagations_total.load(Ordering::Relaxed),
453 }
454 }
455
456 pub fn reset(&self) {
458 self.propagation_successes_total.store(0, Ordering::Relaxed);
459 self.propagation_failures_total.store(0, Ordering::Relaxed);
460 self.cancellation_cleanups_total.store(0, Ordering::Relaxed);
461 self.trace_linkages_total.store(0, Ordering::Relaxed);
462 self.cx_created_total.store(0, Ordering::Relaxed);
463 self.cancel_propagations_total.store(0, Ordering::Relaxed);
464 }
465}
466
467#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
469pub struct CxPropagationMetricsSnapshot {
470 pub propagation_successes_total: u64,
471 pub propagation_failures_total: u64,
472 pub cancellation_cleanups_total: u64,
473 pub trace_linkages_total: u64,
474 pub cx_created_total: u64,
475 pub cancel_propagations_total: u64,
476}
477
478impl CxPropagationMetricsSnapshot {
479 #[must_use]
482 #[allow(clippy::cast_precision_loss)]
483 pub fn failure_ratio(&self) -> f64 {
484 let total = self.propagation_successes_total + self.propagation_failures_total;
485 if total == 0 {
486 return 0.0;
487 }
488 self.propagation_failures_total as f64 / total as f64
489 }
490}
491
492impl std::fmt::Display for CxPropagationMetricsSnapshot {
493 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
494 write!(
495 f,
496 "cx_propagation(ok={} fail={} cancel_cleanup={} trace_link={} cx_new={} cancel_prop={} fail_ratio={:.4})",
497 self.propagation_successes_total,
498 self.propagation_failures_total,
499 self.cancellation_cleanups_total,
500 self.trace_linkages_total,
501 self.cx_created_total,
502 self.cancel_propagations_total,
503 self.failure_ratio(),
504 )
505 }
506}
507
508const UNKNOWN_SLOT_ID: usize = usize::MAX;
514
515pub static GLOBAL_TXN_SLOT_METRICS: TxnSlotMetrics = TxnSlotMetrics::new();
520
521pub struct TxnSlotMetrics {
526 pub fsqlite_txn_slots_active: AtomicU64,
528 pub fsqlite_txn_slot_crashes_detected_total: AtomicU64,
530}
531
532impl Default for TxnSlotMetrics {
533 fn default() -> Self {
534 Self::new()
535 }
536}
537
538impl TxnSlotMetrics {
539 #[must_use]
541 pub const fn new() -> Self {
542 Self {
543 fsqlite_txn_slots_active: AtomicU64::new(0),
544 fsqlite_txn_slot_crashes_detected_total: AtomicU64::new(0),
545 }
546 }
547
548 #[inline]
549 const fn normalize_slot_id(slot_id: Option<usize>) -> usize {
550 match slot_id {
551 Some(value) => value,
552 None => UNKNOWN_SLOT_ID,
553 }
554 }
555
556 fn log_context_from_env() -> (String, u64, String) {
557 let run_id = std::env::var("RUN_ID").unwrap_or_else(|_| "(none)".to_owned());
558 let trace_id = std::env::var("TRACE_ID")
559 .ok()
560 .and_then(|value| value.parse::<u64>().ok())
561 .unwrap_or(0);
562 let scenario_id = std::env::var("SCENARIO_ID").unwrap_or_else(|_| "(none)".to_owned());
563 (run_id, trace_id, scenario_id)
564 }
565
566 fn decrement_active_slots_saturating(&self) -> u64 {
567 loop {
568 let prev = self.fsqlite_txn_slots_active.load(Ordering::Relaxed);
569 let next = prev.saturating_sub(1);
570 if self
571 .fsqlite_txn_slots_active
572 .compare_exchange_weak(prev, next, Ordering::Relaxed, Ordering::Relaxed)
573 .is_ok()
574 {
575 return next;
576 }
577 }
578 }
579
580 pub fn record_slot_allocated(&self, slot_id: usize, process_id: u32) {
582 let started_at = Instant::now();
583 let active_after = self
584 .fsqlite_txn_slots_active
585 .fetch_add(1, Ordering::Relaxed)
586 .saturating_add(1);
587 let operation_elapsed_us = started_at.elapsed().as_micros().max(1);
588 let (run_id, trace_id, scenario_id) = Self::log_context_from_env();
589 let span = tracing::span!(
590 target: "fsqlite.txn_slot",
591 tracing::Level::INFO,
592 "txn_slot",
593 slot_id,
594 process_id,
595 run_id = %run_id.as_str(),
596 trace_id,
597 scenario_id = %scenario_id.as_str(),
598 operation = "alloc"
599 );
600 let _guard = span.enter();
601 tracing::info!(
602 fsqlite_txn_slots_active = active_after,
603 operation_elapsed_us,
604 run_id = %run_id.as_str(),
605 trace_id,
606 scenario_id = %scenario_id.as_str(),
607 failure_context = "none",
608 "transaction slot allocated"
609 );
610 }
611
612 pub fn record_slot_released(&self, slot_id: Option<usize>, process_id: u32) {
614 let started_at = Instant::now();
615 let active_after = self.decrement_active_slots_saturating();
616 let slot_id = Self::normalize_slot_id(slot_id);
617 let operation_elapsed_us = started_at.elapsed().as_micros().max(1);
618 let (run_id, trace_id, scenario_id) = Self::log_context_from_env();
619 let span = tracing::span!(
620 target: "fsqlite.txn_slot",
621 tracing::Level::INFO,
622 "txn_slot",
623 slot_id,
624 process_id,
625 run_id = %run_id.as_str(),
626 trace_id,
627 scenario_id = %scenario_id.as_str(),
628 operation = "release"
629 );
630 let _guard = span.enter();
631 tracing::info!(
632 fsqlite_txn_slots_active = active_after,
633 operation_elapsed_us,
634 run_id = %run_id.as_str(),
635 trace_id,
636 scenario_id = %scenario_id.as_str(),
637 failure_context = "none",
638 "transaction slot released"
639 );
640 }
641
642 pub fn record_crash_detected(
644 &self,
645 slot_id: Option<usize>,
646 process_id: u32,
647 orphan_txn_id: u64,
648 ) {
649 let started_at = Instant::now();
650 let total = self
651 .fsqlite_txn_slot_crashes_detected_total
652 .fetch_add(1, Ordering::Relaxed)
653 .saturating_add(1);
654 let slot_id = Self::normalize_slot_id(slot_id);
655 let operation_elapsed_us = started_at.elapsed().as_micros().max(1);
656 let (run_id, trace_id, scenario_id) = Self::log_context_from_env();
657 let span = tracing::span!(
658 target: "fsqlite.txn_slot",
659 tracing::Level::WARN,
660 "txn_slot",
661 slot_id,
662 process_id,
663 run_id = %run_id.as_str(),
664 trace_id,
665 scenario_id = %scenario_id.as_str(),
666 operation = "crash_detect"
667 );
668 let _guard = span.enter();
669 tracing::warn!(
670 orphan_txn_id,
671 fsqlite_txn_slot_crashes_detected_total = total,
672 operation_elapsed_us,
673 run_id = %run_id.as_str(),
674 trace_id,
675 scenario_id = %scenario_id.as_str(),
676 failure_context = "orphan_slot_reclaimed_during_cleanup",
677 "orphaned transaction slot crash detected"
678 );
679 }
680
681 #[must_use]
683 pub fn snapshot(&self) -> TxnSlotMetricsSnapshot {
684 TxnSlotMetricsSnapshot {
685 fsqlite_txn_slots_active: self.fsqlite_txn_slots_active.load(Ordering::Relaxed),
686 fsqlite_txn_slot_crashes_detected_total: self
687 .fsqlite_txn_slot_crashes_detected_total
688 .load(Ordering::Relaxed),
689 }
690 }
691
692 pub fn reset(&self) {
694 self.fsqlite_txn_slots_active.store(0, Ordering::Relaxed);
695 self.fsqlite_txn_slot_crashes_detected_total
696 .store(0, Ordering::Relaxed);
697 }
698}
699
700#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
702pub struct TxnSlotMetricsSnapshot {
703 pub fsqlite_txn_slots_active: u64,
704 pub fsqlite_txn_slot_crashes_detected_total: u64,
705}
706
707impl std::fmt::Display for TxnSlotMetricsSnapshot {
708 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
709 write!(
710 f,
711 "txn_slots(active={} crashes={})",
712 self.fsqlite_txn_slots_active, self.fsqlite_txn_slot_crashes_detected_total
713 )
714 }
715}
716
717#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
726pub enum ConflictEvent {
727 PageLockContention {
729 page: PageNumber,
731 requester: TxnId,
733 holder: TxnId,
735 timestamp_ns: u64,
737 },
738
739 FcwBaseDrift {
741 page: PageNumber,
743 loser: TxnId,
745 winner_commit_seq: CommitSeq,
747 merge_attempted: bool,
749 merge_succeeded: bool,
751 timestamp_ns: u64,
753 },
754
755 SsiAbort {
757 txn: TxnToken,
759 reason: SsiAbortCategory,
761 in_edge_count: usize,
763 out_edge_count: usize,
765 timestamp_ns: u64,
767 },
768
769 ConflictResolved {
771 txn: TxnId,
773 pages_merged: usize,
775 commit_seq: CommitSeq,
777 timestamp_ns: u64,
779 },
780}
781
782#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)]
784pub enum SsiAbortCategory {
785 Pivot,
787 CommittedPivot,
789 MarkedForAbort,
791}
792
793impl ConflictEvent {
794 #[must_use]
796 pub fn timestamp_ns(&self) -> u64 {
797 match self {
798 Self::PageLockContention { timestamp_ns, .. }
799 | Self::FcwBaseDrift { timestamp_ns, .. }
800 | Self::SsiAbort { timestamp_ns, .. }
801 | Self::ConflictResolved { timestamp_ns, .. } => *timestamp_ns,
802 }
803 }
804
805 #[must_use]
807 pub fn is_conflict(&self) -> bool {
808 !matches!(self, Self::ConflictResolved { .. })
809 }
810}
811
812pub trait ConflictObserver: Send + Sync {
822 fn on_event(&self, event: &ConflictEvent);
824}
825
826#[derive(Debug, Clone, Copy)]
829pub struct NoOpObserver;
830
831impl ConflictObserver for NoOpObserver {
832 #[inline(always)]
833 fn on_event(&self, _event: &ConflictEvent) {}
834}
835
836pub struct ConflictRingBuffer {
845 events: Mutex<RingBuf>,
846}
847
848struct RingBuf {
849 buf: Vec<ConflictEvent>,
850 capacity: usize,
851 head: usize,
852 len: usize,
853}
854
855impl RingBuf {
856 fn new(capacity: usize) -> Self {
857 Self {
858 buf: Vec::with_capacity(capacity),
859 capacity,
860 head: 0,
861 len: 0,
862 }
863 }
864
865 fn push(&mut self, event: ConflictEvent) {
866 if self.capacity == 0 {
867 return;
868 }
869 let idx = (self.head + self.len) % self.capacity;
870 if self.buf.len() < self.capacity {
871 self.buf.push(event);
872 } else {
873 self.buf[idx] = event;
874 }
875 if self.len == self.capacity {
876 self.head = (self.head + 1) % self.capacity;
877 } else {
878 self.len += 1;
879 }
880 }
881
882 fn drain_ordered(&self) -> Vec<ConflictEvent> {
883 let mut result = Vec::with_capacity(self.len);
884 for i in 0..self.len {
885 let idx = (self.head + i) % self.capacity;
886 result.push(self.buf[idx].clone());
887 }
888 result
889 }
890
891 fn clear(&mut self) {
892 self.buf.clear();
893 self.head = 0;
894 self.len = 0;
895 }
896}
897
898impl ConflictRingBuffer {
899 #[must_use]
901 pub fn new(capacity: usize) -> Self {
902 Self {
903 events: Mutex::new(RingBuf::new(capacity)),
904 }
905 }
906
907 pub fn push(&self, event: ConflictEvent) {
909 self.events.lock().push(event);
910 }
911
912 #[must_use]
914 pub fn snapshot(&self) -> Vec<ConflictEvent> {
915 self.events.lock().drain_ordered()
916 }
917
918 pub fn clear(&self) {
920 self.events.lock().clear();
921 }
922
923 #[must_use]
925 pub fn len(&self) -> usize {
926 self.events.lock().len
927 }
928
929 #[must_use]
931 pub fn is_empty(&self) -> bool {
932 self.len() == 0
933 }
934
935 #[must_use]
937 pub fn capacity(&self) -> usize {
938 self.events.lock().capacity
939 }
940}
941
942pub struct ConflictMetrics {
951 pub conflicts_total: AtomicU64,
953 pub page_contentions: AtomicU64,
955 pub fcw_drifts: AtomicU64,
957 pub fcw_merge_attempts: AtomicU64,
959 pub fcw_merge_successes: AtomicU64,
961 pub ssi_aborts: AtomicU64,
963 pub conflicts_resolved: AtomicU64,
965 page_hotspots: Mutex<HashMap<PageNumber, u64>>,
967 created_at: Instant,
969}
970
971impl ConflictMetrics {
972 #[must_use]
974 pub fn new() -> Self {
975 Self {
976 conflicts_total: AtomicU64::new(0),
977 page_contentions: AtomicU64::new(0),
978 fcw_drifts: AtomicU64::new(0),
979 fcw_merge_attempts: AtomicU64::new(0),
980 fcw_merge_successes: AtomicU64::new(0),
981 ssi_aborts: AtomicU64::new(0),
982 conflicts_resolved: AtomicU64::new(0),
983 page_hotspots: Mutex::new(HashMap::new()),
984 created_at: Instant::now(),
985 }
986 }
987
988 pub fn record(&self, event: &ConflictEvent) {
990 match event {
991 ConflictEvent::PageLockContention { page, .. } => {
992 self.conflicts_total.fetch_add(1, Ordering::Relaxed);
993 self.page_contentions.fetch_add(1, Ordering::Relaxed);
994 *self.page_hotspots.lock().entry(*page).or_insert(0) += 1;
995 }
996 ConflictEvent::FcwBaseDrift {
997 page,
998 merge_attempted,
999 merge_succeeded,
1000 ..
1001 } => {
1002 self.conflicts_total.fetch_add(1, Ordering::Relaxed);
1003 self.fcw_drifts.fetch_add(1, Ordering::Relaxed);
1004 if *merge_attempted {
1005 self.fcw_merge_attempts.fetch_add(1, Ordering::Relaxed);
1006 if *merge_succeeded {
1007 self.fcw_merge_successes.fetch_add(1, Ordering::Relaxed);
1008 }
1009 }
1010 *self.page_hotspots.lock().entry(*page).or_insert(0) += 1;
1011 }
1012 ConflictEvent::SsiAbort { .. } => {
1013 self.conflicts_total.fetch_add(1, Ordering::Relaxed);
1014 self.ssi_aborts.fetch_add(1, Ordering::Relaxed);
1015 }
1016 ConflictEvent::ConflictResolved { .. } => {
1017 self.conflicts_resolved.fetch_add(1, Ordering::Relaxed);
1018 }
1019 }
1020 }
1021
1022 pub fn reset(&self) {
1024 self.conflicts_total.store(0, Ordering::Relaxed);
1025 self.page_contentions.store(0, Ordering::Relaxed);
1026 self.fcw_drifts.store(0, Ordering::Relaxed);
1027 self.fcw_merge_attempts.store(0, Ordering::Relaxed);
1028 self.fcw_merge_successes.store(0, Ordering::Relaxed);
1029 self.ssi_aborts.store(0, Ordering::Relaxed);
1030 self.conflicts_resolved.store(0, Ordering::Relaxed);
1031 self.page_hotspots.lock().clear();
1032 }
1033
1034 #[must_use]
1036 pub fn elapsed(&self) -> std::time::Duration {
1037 self.created_at.elapsed()
1038 }
1039
1040 #[must_use]
1042 #[allow(clippy::cast_precision_loss)]
1043 pub fn conflicts_per_second(&self) -> f64 {
1044 let elapsed_secs = self.created_at.elapsed().as_secs_f64();
1045 if elapsed_secs < f64::EPSILON {
1046 return 0.0;
1047 }
1048 self.conflicts_total.load(Ordering::Relaxed) as f64 / elapsed_secs
1049 }
1050
1051 #[must_use]
1053 pub fn top_hotspots(&self, n: usize) -> Vec<(PageNumber, u64)> {
1054 let mut entries: Vec<(PageNumber, u64)> = {
1055 let map = self.page_hotspots.lock();
1056 map.iter().map(|(&k, &v)| (k, v)).collect()
1057 };
1058 entries.sort_by_key(|e| std::cmp::Reverse(e.1));
1059 entries.truncate(n);
1060 entries
1061 }
1062
1063 #[must_use]
1065 #[allow(clippy::cast_precision_loss)]
1066 pub fn snapshot(&self) -> ConflictMetricsSnapshot {
1067 ConflictMetricsSnapshot {
1068 conflicts_total: self.conflicts_total.load(Ordering::Relaxed),
1069 page_contentions: self.page_contentions.load(Ordering::Relaxed),
1070 fcw_drifts: self.fcw_drifts.load(Ordering::Relaxed),
1071 fcw_merge_attempts: self.fcw_merge_attempts.load(Ordering::Relaxed),
1072 fcw_merge_successes: self.fcw_merge_successes.load(Ordering::Relaxed),
1073 ssi_aborts: self.ssi_aborts.load(Ordering::Relaxed),
1074 conflicts_resolved: self.conflicts_resolved.load(Ordering::Relaxed),
1075 conflicts_per_second: self.conflicts_per_second(),
1076 elapsed_secs: self.created_at.elapsed().as_secs_f64(),
1077 top_hotspots: self.top_hotspots(10),
1078 }
1079 }
1080}
1081
1082impl Default for ConflictMetrics {
1083 fn default() -> Self {
1084 Self::new()
1085 }
1086}
1087
1088#[derive(Debug, Clone, Serialize)]
1090pub struct ConflictMetricsSnapshot {
1091 pub conflicts_total: u64,
1092 pub page_contentions: u64,
1093 pub fcw_drifts: u64,
1094 pub fcw_merge_attempts: u64,
1095 pub fcw_merge_successes: u64,
1096 pub ssi_aborts: u64,
1097 pub conflicts_resolved: u64,
1098 pub conflicts_per_second: f64,
1099 pub elapsed_secs: f64,
1100 pub top_hotspots: Vec<(PageNumber, u64)>,
1101}
1102
1103pub struct MetricsObserver {
1110 metrics: ConflictMetrics,
1111 log: ConflictRingBuffer,
1112 epoch: Instant,
1113}
1114
1115impl MetricsObserver {
1116 #[must_use]
1118 pub fn new(log_capacity: usize) -> Self {
1119 Self {
1120 metrics: ConflictMetrics::new(),
1121 log: ConflictRingBuffer::new(log_capacity),
1122 epoch: Instant::now(),
1123 }
1124 }
1125
1126 #[must_use]
1128 pub fn metrics(&self) -> &ConflictMetrics {
1129 &self.metrics
1130 }
1131
1132 #[must_use]
1134 pub fn log(&self) -> &ConflictRingBuffer {
1135 &self.log
1136 }
1137
1138 #[must_use]
1140 pub fn elapsed_ns(&self) -> u64 {
1141 #[allow(clippy::cast_possible_truncation)] {
1143 self.epoch.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64
1144 }
1145 }
1146
1147 pub fn reset(&self) {
1149 self.metrics.reset();
1150 self.log.clear();
1151 }
1152}
1153
1154impl ConflictObserver for MetricsObserver {
1155 fn on_event(&self, event: &ConflictEvent) {
1156 self.metrics.record(event);
1157 self.log.push(event.clone());
1158 }
1159}
1160
1161#[cfg(test)]
1166mod tests {
1167 use super::*;
1168
1169 fn page(n: u32) -> PageNumber {
1170 PageNumber::new(n).unwrap()
1171 }
1172
1173 fn txn(n: u64) -> TxnId {
1174 TxnId::new(n).unwrap()
1175 }
1176
1177 fn make_contention_event(pg: u32, req: u64, hold: u64) -> ConflictEvent {
1178 ConflictEvent::PageLockContention {
1179 page: page(pg),
1180 requester: txn(req),
1181 holder: txn(hold),
1182 timestamp_ns: 1000,
1183 }
1184 }
1185
1186 #[test]
1187 fn noop_observer_compiles_away() {
1188 let obs = NoOpObserver;
1189 let event = make_contention_event(1, 2, 3);
1190 obs.on_event(&event);
1191 }
1193
1194 #[test]
1195 fn ring_buffer_basic_push_and_snapshot() {
1196 let rb = ConflictRingBuffer::new(3);
1197 assert!(rb.is_empty());
1198
1199 rb.push(make_contention_event(1, 10, 20));
1200 rb.push(make_contention_event(2, 11, 21));
1201 assert_eq!(rb.len(), 2);
1202
1203 let snap = rb.snapshot();
1204 assert_eq!(snap.len(), 2);
1205 assert!(
1206 matches!(&snap[0], ConflictEvent::PageLockContention { page, .. } if page.get() == 1)
1207 );
1208 assert!(
1209 matches!(&snap[1], ConflictEvent::PageLockContention { page, .. } if page.get() == 2)
1210 );
1211 }
1212
1213 #[test]
1214 fn ring_buffer_wraps_on_overflow() {
1215 let rb = ConflictRingBuffer::new(2);
1216
1217 rb.push(make_contention_event(1, 10, 20));
1218 rb.push(make_contention_event(2, 11, 21));
1219 rb.push(make_contention_event(3, 12, 22)); assert_eq!(rb.len(), 2);
1222 let snap = rb.snapshot();
1223 assert!(
1225 matches!(&snap[0], ConflictEvent::PageLockContention { page, .. } if page.get() == 2)
1226 );
1227 assert!(
1228 matches!(&snap[1], ConflictEvent::PageLockContention { page, .. } if page.get() == 3)
1229 );
1230 }
1231
1232 #[test]
1233 fn ring_buffer_clear() {
1234 let rb = ConflictRingBuffer::new(10);
1235 rb.push(make_contention_event(1, 10, 20));
1236 rb.push(make_contention_event(2, 11, 21));
1237 assert_eq!(rb.len(), 2);
1238
1239 rb.clear();
1240 assert!(rb.is_empty());
1241 assert!(rb.snapshot().is_empty());
1242 }
1243
1244 #[test]
1245 fn ring_buffer_zero_capacity() {
1246 let rb = ConflictRingBuffer::new(0);
1247 rb.push(make_contention_event(1, 10, 20));
1248 assert!(rb.is_empty());
1249 }
1250
1251 #[test]
1252 fn conflict_metrics_basic_recording() {
1253 let m = ConflictMetrics::new();
1254
1255 m.record(&make_contention_event(1, 10, 20));
1256 m.record(&make_contention_event(1, 11, 20)); m.record(&make_contention_event(2, 12, 20));
1258
1259 assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 3);
1260 assert_eq!(m.page_contentions.load(Ordering::Relaxed), 3);
1261
1262 let hotspots = m.top_hotspots(5);
1263 assert_eq!(hotspots.len(), 2);
1264 assert_eq!(hotspots[0].0, page(1));
1265 assert_eq!(hotspots[0].1, 2);
1266 }
1267
1268 #[test]
1269 fn conflict_metrics_fcw_recording() {
1270 let m = ConflictMetrics::new();
1271
1272 m.record(&ConflictEvent::FcwBaseDrift {
1273 page: page(5),
1274 loser: txn(10),
1275 winner_commit_seq: CommitSeq::new(100),
1276 merge_attempted: true,
1277 merge_succeeded: true,
1278 timestamp_ns: 2000,
1279 });
1280
1281 assert_eq!(m.fcw_drifts.load(Ordering::Relaxed), 1);
1282 assert_eq!(m.fcw_merge_attempts.load(Ordering::Relaxed), 1);
1283 assert_eq!(m.fcw_merge_successes.load(Ordering::Relaxed), 1);
1284 }
1285
1286 #[test]
1287 fn conflict_metrics_ssi_recording() {
1288 let m = ConflictMetrics::new();
1289
1290 m.record(&ConflictEvent::SsiAbort {
1291 txn: TxnToken::new(txn(10), fsqlite_types::TxnEpoch::new(1)),
1292 reason: SsiAbortCategory::Pivot,
1293 in_edge_count: 1,
1294 out_edge_count: 1,
1295 timestamp_ns: 3000,
1296 });
1297
1298 assert_eq!(m.ssi_aborts.load(Ordering::Relaxed), 1);
1299 assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 1);
1300 }
1301
1302 #[test]
1303 fn conflict_metrics_reset() {
1304 let m = ConflictMetrics::new();
1305 m.record(&make_contention_event(1, 10, 20));
1306 assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 1);
1307
1308 m.reset();
1309 assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 0);
1310 assert_eq!(m.page_contentions.load(Ordering::Relaxed), 0);
1311 assert!(m.top_hotspots(5).is_empty());
1312 }
1313
1314 #[test]
1315 fn metrics_observer_records_both() {
1316 let obs = MetricsObserver::new(100);
1317 let event = make_contention_event(1, 10, 20);
1318 obs.on_event(&event);
1319
1320 assert_eq!(obs.metrics().conflicts_total.load(Ordering::Relaxed), 1);
1321 assert_eq!(obs.log().len(), 1);
1322 }
1323
1324 #[test]
1325 fn conflict_event_timestamp() {
1326 let event = make_contention_event(1, 10, 20);
1327 assert_eq!(event.timestamp_ns(), 1000);
1328 }
1329
1330 #[test]
1331 fn conflict_event_is_conflict() {
1332 assert!(make_contention_event(1, 10, 20).is_conflict());
1333 assert!(
1334 !ConflictEvent::ConflictResolved {
1335 txn: txn(1),
1336 pages_merged: 0,
1337 commit_seq: CommitSeq::new(1),
1338 timestamp_ns: 0,
1339 }
1340 .is_conflict()
1341 );
1342 }
1343
1344 #[test]
1345 fn metrics_snapshot_serializable() {
1346 let m = ConflictMetrics::new();
1347 m.record(&make_contention_event(1, 10, 20));
1348 let snap = m.snapshot();
1349 let json = serde_json::to_string(&snap).unwrap();
1350 assert!(json.contains("\"conflicts_total\":1"));
1351 }
1352
1353 #[test]
1358 fn ring_buffer_stress_many_pushes() {
1359 let cap = 10;
1361 let rb = ConflictRingBuffer::new(cap);
1362 for i in 1..=200_u32 {
1363 rb.push(make_contention_event(i, u64::from(i), u64::from(i) + 1));
1364 }
1365 assert_eq!(rb.len(), cap);
1366 let snap = rb.snapshot();
1367 assert_eq!(snap.len(), cap);
1368 assert!(matches!(
1370 &snap[0],
1371 ConflictEvent::PageLockContention { page, .. } if page.get() == 191
1372 ),);
1373 assert!(matches!(
1375 &snap[cap - 1],
1376 ConflictEvent::PageLockContention { page, .. } if page.get() == 200
1377 ),);
1378 }
1379
1380 #[test]
1381 fn ring_buffer_capacity_one() {
1382 let rb = ConflictRingBuffer::new(1);
1384 rb.push(make_contention_event(1, 10, 20));
1385 rb.push(make_contention_event(2, 11, 21));
1386 rb.push(make_contention_event(3, 12, 22));
1387 assert_eq!(rb.len(), 1);
1388 let snap = rb.snapshot();
1389 assert!(
1390 matches!(&snap[0], ConflictEvent::PageLockContention { page, .. } if page.get() == 3)
1391 );
1392 }
1393
1394 #[test]
1395 fn ring_buffer_clear_after_wrap() {
1396 let rb = ConflictRingBuffer::new(2);
1398 rb.push(make_contention_event(1, 10, 20));
1399 rb.push(make_contention_event(2, 11, 21));
1400 rb.push(make_contention_event(3, 12, 22)); assert_eq!(rb.len(), 2);
1402
1403 rb.clear();
1404 assert!(rb.is_empty());
1405 assert_eq!(rb.capacity(), 2);
1406
1407 rb.push(make_contention_event(4, 13, 23));
1409 assert_eq!(rb.len(), 1);
1410 let snap = rb.snapshot();
1411 assert!(
1412 matches!(&snap[0], ConflictEvent::PageLockContention { page, .. } if page.get() == 4)
1413 );
1414 }
1415
1416 #[test]
1417 fn metrics_all_fcw_merge_combinations() {
1418 let m = ConflictMetrics::new();
1420
1421 let cases = [
1422 (false, false),
1423 (true, false),
1424 (true, true),
1425 (false, false), ];
1427 for (attempted, succeeded) in cases {
1428 m.record(&ConflictEvent::FcwBaseDrift {
1429 page: page(1),
1430 loser: txn(1),
1431 winner_commit_seq: CommitSeq::new(1),
1432 merge_attempted: attempted,
1433 merge_succeeded: succeeded,
1434 timestamp_ns: 0,
1435 });
1436 }
1437
1438 assert_eq!(m.fcw_drifts.load(Ordering::Relaxed), 4);
1439 assert_eq!(m.fcw_merge_attempts.load(Ordering::Relaxed), 2);
1440 assert_eq!(m.fcw_merge_successes.load(Ordering::Relaxed), 1);
1441 }
1442
1443 #[test]
1444 fn metrics_all_ssi_abort_categories() {
1445 let m = ConflictMetrics::new();
1446
1447 for reason in [
1448 SsiAbortCategory::Pivot,
1449 SsiAbortCategory::CommittedPivot,
1450 SsiAbortCategory::MarkedForAbort,
1451 ] {
1452 m.record(&ConflictEvent::SsiAbort {
1453 txn: TxnToken::new(txn(1), fsqlite_types::TxnEpoch::new(1)),
1454 reason,
1455 in_edge_count: 1,
1456 out_edge_count: 1,
1457 timestamp_ns: 0,
1458 });
1459 }
1460
1461 assert_eq!(m.ssi_aborts.load(Ordering::Relaxed), 3);
1462 assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 3);
1463 }
1464
1465 #[test]
1466 fn trace_metrics_snapshot_and_reset() {
1467 reset_trace_metrics();
1468 record_trace_span_created();
1469 record_trace_span_created();
1470 record_trace_export(2, 17);
1471 record_trace_export_error();
1472 record_compat_trace_callback();
1473
1474 let snapshot = trace_metrics_snapshot();
1475 assert_eq!(snapshot.fsqlite_trace_spans_total, 2);
1476 assert_eq!(snapshot.fsqlite_trace_export_errors_total, 1);
1477 assert_eq!(snapshot.fsqlite_compat_trace_callbacks_total, 1);
1478
1479 reset_trace_metrics();
1480 let reset = trace_metrics_snapshot();
1481 assert_eq!(reset.fsqlite_trace_spans_total, 0);
1482 assert_eq!(reset.fsqlite_trace_export_errors_total, 0);
1483 assert_eq!(reset.fsqlite_compat_trace_callbacks_total, 0);
1484 }
1485
1486 #[test]
1487 fn io_uring_latency_snapshot_and_reset() {
1488 reset_io_uring_latency_metrics();
1489
1490 record_io_uring_read_latency(Duration::from_micros(40));
1491 record_io_uring_read_latency(Duration::from_micros(125));
1492 record_io_uring_write_latency(Duration::from_micros(55));
1493 record_io_uring_unix_fallback();
1494
1495 let snapshot = io_uring_latency_snapshot();
1496 assert_eq!(snapshot.read_samples_total, 2);
1497 assert_eq!(snapshot.write_samples_total, 1);
1498 assert_eq!(snapshot.unix_fallbacks_total, 1);
1499 assert!(snapshot.read_tail_violations_total <= snapshot.read_samples_total);
1500 assert!(snapshot.write_tail_violations_total <= snapshot.write_samples_total);
1501 assert!(snapshot.read_p99_latency_us >= 125);
1502 assert!(snapshot.write_p99_latency_us >= 55);
1503 assert!(snapshot.read_conformal_upper_bound_us >= snapshot.read_p99_latency_us);
1504 assert!(snapshot.write_conformal_upper_bound_us >= snapshot.write_p99_latency_us);
1505
1506 reset_io_uring_latency_metrics();
1507 let reset = io_uring_latency_snapshot();
1508 assert_eq!(reset.read_samples_total, 0);
1509 assert_eq!(reset.write_samples_total, 0);
1510 assert_eq!(reset.unix_fallbacks_total, 0);
1511 assert_eq!(reset.read_tail_violations_total, 0);
1512 assert_eq!(reset.write_tail_violations_total, 0);
1513 assert_eq!(reset.read_window_len, 0);
1514 assert_eq!(reset.write_window_len, 0);
1515 }
1516
1517 #[test]
1518 fn io_uring_latency_conformal_upper_bound_is_tail_safe() {
1519 let metrics = IoUringLatencyMetrics::new(16);
1520 let mut saw_violation = false;
1521 for latency in [20_u64, 22, 21, 23, 20, 24, 26, 200] {
1522 if metrics.record_read_latency(Duration::from_micros(latency)) {
1523 saw_violation = true;
1524 }
1525 }
1526
1527 let snapshot = metrics.snapshot();
1528 assert!(snapshot.read_p99_latency_us >= 200);
1529 assert!(snapshot.read_conformal_upper_bound_us >= snapshot.read_p99_latency_us);
1530 assert!(saw_violation);
1531 assert!(snapshot.read_tail_violations_total >= 1);
1532 }
1533
1534 #[test]
1535 fn trace_and_decision_ids_are_monotonic() {
1536 let first_trace = next_trace_id();
1537 let second_trace = next_trace_id();
1538 assert!(second_trace > first_trace);
1539
1540 let first_decision = next_decision_id();
1541 let second_decision = next_decision_id();
1542 assert!(second_decision > first_decision);
1543 }
1544
1545 #[test]
1546 fn metrics_conflict_resolved_not_counted_as_conflict() {
1547 let m = ConflictMetrics::new();
1549 for i in 1..=5_u64 {
1550 m.record(&ConflictEvent::ConflictResolved {
1551 txn: txn(i),
1552 pages_merged: 2,
1553 commit_seq: CommitSeq::new(i * 10),
1554 timestamp_ns: 0,
1555 });
1556 }
1557
1558 assert_eq!(m.conflicts_resolved.load(Ordering::Relaxed), 5);
1559 assert_eq!(m.conflicts_total.load(Ordering::Relaxed), 0);
1560 }
1561
1562 #[test]
1563 fn metrics_hotspot_ordering() {
1564 let m = ConflictMetrics::new();
1566 for _ in 0..3 {
1568 m.record(&make_contention_event(5, 1, 2));
1569 }
1570 m.record(&make_contention_event(10, 1, 2));
1571 for _ in 0..2 {
1572 m.record(&make_contention_event(15, 1, 2));
1573 }
1574
1575 let hotspots = m.top_hotspots(3);
1576 assert_eq!(hotspots.len(), 3);
1577 assert_eq!(hotspots[0], (page(5), 3));
1578 assert_eq!(hotspots[1], (page(15), 2));
1579 assert_eq!(hotspots[2], (page(10), 1));
1580 }
1581
1582 #[test]
1583 fn metrics_hotspot_truncation() {
1584 let m = ConflictMetrics::new();
1586 for i in 1..=20_u32 {
1587 m.record(&make_contention_event(i, 1, 2));
1588 }
1589 assert_eq!(m.top_hotspots(5).len(), 5);
1590 assert_eq!(m.top_hotspots(0).len(), 0);
1591 }
1592
1593 #[test]
1594 fn metrics_snapshot_all_fields() {
1595 let m = ConflictMetrics::new();
1597 m.record(&make_contention_event(1, 10, 20));
1598 m.record(&ConflictEvent::FcwBaseDrift {
1599 page: page(2),
1600 loser: txn(3),
1601 winner_commit_seq: CommitSeq::new(100),
1602 merge_attempted: true,
1603 merge_succeeded: false,
1604 timestamp_ns: 0,
1605 });
1606 m.record(&ConflictEvent::SsiAbort {
1607 txn: TxnToken::new(txn(4), fsqlite_types::TxnEpoch::new(1)),
1608 reason: SsiAbortCategory::Pivot,
1609 in_edge_count: 2,
1610 out_edge_count: 3,
1611 timestamp_ns: 0,
1612 });
1613 m.record(&ConflictEvent::ConflictResolved {
1614 txn: txn(5),
1615 pages_merged: 1,
1616 commit_seq: CommitSeq::new(200),
1617 timestamp_ns: 0,
1618 });
1619
1620 let snap = m.snapshot();
1621 assert_eq!(snap.conflicts_total, 3); assert_eq!(snap.page_contentions, 1);
1623 assert_eq!(snap.fcw_drifts, 1);
1624 assert_eq!(snap.fcw_merge_attempts, 1);
1625 assert_eq!(snap.fcw_merge_successes, 0);
1626 assert_eq!(snap.ssi_aborts, 1);
1627 assert_eq!(snap.conflicts_resolved, 1);
1628 assert!(snap.elapsed_secs >= 0.0);
1629 }
1630
1631 #[test]
1632 fn metrics_observer_log_preserves_order() {
1633 let obs = MetricsObserver::new(100);
1635 for i in 1..=5_u32 {
1636 obs.on_event(&make_contention_event(i, u64::from(i), u64::from(i) + 10));
1637 }
1638
1639 let events = obs.log().snapshot();
1640 assert_eq!(events.len(), 5);
1641 for (idx, event) in events.iter().enumerate() {
1642 let expected_page = u32::try_from(idx + 1).unwrap();
1643 assert!(matches!(
1644 event,
1645 ConflictEvent::PageLockContention { page, .. } if page.get() == expected_page
1646 ),);
1647 }
1648 }
1649
1650 #[test]
1651 fn metrics_observer_elapsed_ns_monotonic() {
1652 let obs = MetricsObserver::new(10);
1653 let t1 = obs.elapsed_ns();
1654 std::thread::yield_now();
1656 let t2 = obs.elapsed_ns();
1657 assert!(t2 >= t1, "elapsed_ns must be monotonically non-decreasing");
1658 }
1659
1660 #[test]
1661 fn conflict_event_serde_roundtrip() {
1662 let events = vec![
1664 make_contention_event(1, 2, 3),
1665 ConflictEvent::FcwBaseDrift {
1666 page: page(4),
1667 loser: txn(5),
1668 winner_commit_seq: CommitSeq::new(100),
1669 merge_attempted: true,
1670 merge_succeeded: true,
1671 timestamp_ns: 42,
1672 },
1673 ConflictEvent::SsiAbort {
1674 txn: TxnToken::new(txn(6), fsqlite_types::TxnEpoch::new(2)),
1675 reason: SsiAbortCategory::CommittedPivot,
1676 in_edge_count: 3,
1677 out_edge_count: 4,
1678 timestamp_ns: 99,
1679 },
1680 ConflictEvent::ConflictResolved {
1681 txn: txn(7),
1682 pages_merged: 5,
1683 commit_seq: CommitSeq::new(200),
1684 timestamp_ns: 123,
1685 },
1686 ];
1687
1688 for event in &events {
1689 let json = serde_json::to_string(event).unwrap();
1690 assert!(!json.is_empty(), "serialization should produce output");
1691 }
1692 }
1693
1694 #[test]
1695 fn conflict_event_is_conflict_all_variants() {
1696 assert!(make_contention_event(1, 2, 3).is_conflict());
1697
1698 assert!(
1699 ConflictEvent::FcwBaseDrift {
1700 page: page(1),
1701 loser: txn(1),
1702 winner_commit_seq: CommitSeq::new(1),
1703 merge_attempted: false,
1704 merge_succeeded: false,
1705 timestamp_ns: 0,
1706 }
1707 .is_conflict()
1708 );
1709
1710 assert!(
1711 ConflictEvent::SsiAbort {
1712 txn: TxnToken::new(txn(1), fsqlite_types::TxnEpoch::new(1)),
1713 reason: SsiAbortCategory::Pivot,
1714 in_edge_count: 0,
1715 out_edge_count: 0,
1716 timestamp_ns: 0,
1717 }
1718 .is_conflict()
1719 );
1720
1721 assert!(
1722 !ConflictEvent::ConflictResolved {
1723 txn: txn(1),
1724 pages_merged: 0,
1725 commit_seq: CommitSeq::new(1),
1726 timestamp_ns: 0,
1727 }
1728 .is_conflict()
1729 );
1730 }
1731
1732 #[test]
1737 fn cx_propagation_metrics_basic() {
1738 GLOBAL_CX_PROPAGATION_METRICS.reset();
1739
1740 GLOBAL_CX_PROPAGATION_METRICS.record_propagation_success();
1741 GLOBAL_CX_PROPAGATION_METRICS.record_propagation_success();
1742 GLOBAL_CX_PROPAGATION_METRICS.record_propagation_failure("test_site_1");
1743 GLOBAL_CX_PROPAGATION_METRICS.record_cancellation_cleanup();
1744 GLOBAL_CX_PROPAGATION_METRICS.record_trace_linkage();
1745 GLOBAL_CX_PROPAGATION_METRICS.record_cx_created();
1746 GLOBAL_CX_PROPAGATION_METRICS.record_cancel_propagation();
1747
1748 let snap = GLOBAL_CX_PROPAGATION_METRICS.snapshot();
1749 assert_eq!(snap.propagation_successes_total, 2);
1750 assert_eq!(snap.propagation_failures_total, 1);
1751 assert_eq!(snap.cancellation_cleanups_total, 1);
1752 assert_eq!(snap.trace_linkages_total, 1);
1753 assert_eq!(snap.cx_created_total, 1);
1754 assert_eq!(snap.cancel_propagations_total, 1);
1755 }
1756
1757 #[test]
1758 fn cx_propagation_metrics_reset() {
1759 GLOBAL_CX_PROPAGATION_METRICS.reset();
1760 GLOBAL_CX_PROPAGATION_METRICS.record_propagation_success();
1761 GLOBAL_CX_PROPAGATION_METRICS.record_propagation_failure("reset_test");
1762 assert!(
1763 GLOBAL_CX_PROPAGATION_METRICS
1764 .propagation_successes_total
1765 .load(Ordering::Relaxed)
1766 > 0
1767 );
1768
1769 GLOBAL_CX_PROPAGATION_METRICS.reset();
1770 let snap = GLOBAL_CX_PROPAGATION_METRICS.snapshot();
1771 assert_eq!(snap.propagation_successes_total, 0);
1772 assert_eq!(snap.propagation_failures_total, 0);
1773 assert_eq!(snap.cancellation_cleanups_total, 0);
1774 assert_eq!(snap.trace_linkages_total, 0);
1775 assert_eq!(snap.cx_created_total, 0);
1776 assert_eq!(snap.cancel_propagations_total, 0);
1777 }
1778
1779 #[test]
1780 #[allow(clippy::float_cmp)]
1781 fn cx_propagation_failure_ratio() {
1782 let m = CxPropagationMetrics::new();
1783
1784 assert_eq!(m.snapshot().failure_ratio(), 0.0);
1786
1787 m.record_propagation_success();
1789 assert!((m.snapshot().failure_ratio() - 0.0).abs() < f64::EPSILON);
1790
1791 m.record_propagation_failure("ratio_test");
1793 assert!((m.snapshot().failure_ratio() - 0.5).abs() < f64::EPSILON);
1794
1795 m.record_propagation_failure("ratio_test");
1797 m.record_propagation_failure("ratio_test");
1798 assert!((m.snapshot().failure_ratio() - 0.75).abs() < f64::EPSILON);
1799 }
1800
1801 #[test]
1802 fn cx_propagation_snapshot_display() {
1803 let m = CxPropagationMetrics::new();
1804 m.record_propagation_success();
1805 m.record_propagation_success();
1806 m.record_propagation_failure("display_test");
1807 let display = format!("{}", m.snapshot());
1808 assert!(display.contains("ok=2"));
1809 assert!(display.contains("fail=1"));
1810 assert!(display.contains("fail_ratio="));
1811 }
1812
1813 #[test]
1814 fn cx_propagation_snapshot_serializable() {
1815 let m = CxPropagationMetrics::new();
1816 m.record_propagation_success();
1817 m.record_trace_linkage();
1818 let snap = m.snapshot();
1819 let json = serde_json::to_string(&snap).unwrap();
1820 assert!(json.contains("\"propagation_successes_total\":1"));
1821 assert!(json.contains("\"trace_linkages_total\":1"));
1822 }
1823
1824 #[test]
1825 fn cx_propagation_independent_counters() {
1826 let m = CxPropagationMetrics::new();
1828 for _ in 0..5 {
1829 m.record_propagation_success();
1830 }
1831 for _ in 0..3 {
1832 m.record_cancellation_cleanup();
1833 }
1834 m.record_cx_created();
1835 m.record_cx_created();
1836
1837 let snap = m.snapshot();
1838 assert_eq!(snap.propagation_successes_total, 5);
1839 assert_eq!(snap.propagation_failures_total, 0);
1840 assert_eq!(snap.cancellation_cleanups_total, 3);
1841 assert_eq!(snap.trace_linkages_total, 0);
1842 assert_eq!(snap.cx_created_total, 2);
1843 assert_eq!(snap.cancel_propagations_total, 0);
1844 }
1845
1846 #[test]
1847 fn cx_propagation_concurrent_safety() {
1848 let m = &CxPropagationMetrics::new();
1850 let barrier = std::sync::Arc::new(std::sync::Barrier::new(4));
1851 std::thread::scope(|s| {
1852 for _ in 0..4 {
1853 let b = barrier.clone();
1854 s.spawn(move || {
1855 b.wait();
1856 for _ in 0..100 {
1857 m.record_propagation_success();
1858 m.record_propagation_failure("concurrent_test");
1859 m.record_cancellation_cleanup();
1860 m.record_trace_linkage();
1861 m.record_cx_created();
1862 m.record_cancel_propagation();
1863 }
1864 });
1865 }
1866 });
1867
1868 let snap = m.snapshot();
1869 assert_eq!(snap.propagation_successes_total, 400);
1870 assert_eq!(snap.propagation_failures_total, 400);
1871 assert_eq!(snap.cancellation_cleanups_total, 400);
1872 assert_eq!(snap.trace_linkages_total, 400);
1873 assert_eq!(snap.cx_created_total, 400);
1874 assert_eq!(snap.cancel_propagations_total, 400);
1875 }
1876
1877 #[test]
1882 fn txn_slot_metrics_alloc_release_and_crash() {
1883 let m = TxnSlotMetrics::new();
1884
1885 m.record_slot_allocated(3, 1001);
1886 m.record_slot_allocated(4, 1001);
1887 m.record_crash_detected(Some(4), 1001, 42);
1888 m.record_slot_released(Some(4), 1001);
1889
1890 let snap = m.snapshot();
1891 assert_eq!(snap.fsqlite_txn_slots_active, 1);
1892 assert_eq!(snap.fsqlite_txn_slot_crashes_detected_total, 1);
1893 }
1894
1895 #[test]
1896 fn txn_slot_metrics_release_saturates_at_zero() {
1897 let m = TxnSlotMetrics::new();
1898
1899 m.record_slot_released(None, 0);
1901 m.record_slot_released(None, 0);
1902
1903 let snap = m.snapshot();
1904 assert_eq!(snap.fsqlite_txn_slots_active, 0);
1905 assert_eq!(snap.fsqlite_txn_slot_crashes_detected_total, 0);
1906 }
1907
1908 #[test]
1909 fn txn_slot_metrics_snapshot_display_and_serde() {
1910 let m = TxnSlotMetrics::new();
1911 m.record_slot_allocated(7, 2222);
1912 m.record_crash_detected(None, 2222, 9001);
1913
1914 let snap = m.snapshot();
1915 let display = format!("{snap}");
1916 assert!(display.contains("txn_slots(active=1 crashes=1)"));
1917
1918 let json = serde_json::to_string(&snap).unwrap();
1919 assert!(json.contains("\"fsqlite_txn_slots_active\":1"));
1920 assert!(json.contains("\"fsqlite_txn_slot_crashes_detected_total\":1"));
1921 }
1922}