Skip to main content

fsqlite_wal/
telemetry.rs

1//! Machine-validated WAL telemetry schema.
2//!
3//! Defines structured telemetry events for WAL append, replay, checkpoint, and
4//! recovery paths.  Follows the zero-cost observer pattern established by
5//! `ConflictObserver` in `fsqlite-observability`: a trait with a no-op default
6//! implementation that the compiler elides entirely when unused.
7//!
8//! # Conformance rules
9//!
10//! 1. Every [`WalTelemetryEvent`] variant carries a monotonic `timestamp_ns`.
11//! 2. All events and snapshots implement `serde::Serialize` for JSON export.
12//! 3. Observers MUST NOT block, acquire page locks, or perform I/O.
13//! 4. Log targets use `fsqlite.wal::<subdomain>` naming convention.
14//! 5. Metric counters use `AtomicU64` with `Ordering::Relaxed`.
15
16use serde::Serialize;
17
18use crate::checkpoint::CheckpointMode;
19use crate::checksum::{
20    ChecksumFailureKind, RecoveryAction, WalChainInvalidReason, WalFecRepairOutcome,
21};
22
23// ---------------------------------------------------------------------------
24// Telemetry event schema
25// ---------------------------------------------------------------------------
26
27/// Structured telemetry event emitted by WAL operations.
28///
29/// Each variant captures the minimal context needed to diagnose the operation
30/// in post-mortem analysis or real-time dashboards.
31#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
32pub enum WalTelemetryEvent {
33    /// One or more frames appended to the WAL.
34    FrameAppended {
35        /// Number of frames written in this batch.
36        frame_count: u32,
37        /// Total bytes written (frame headers + page data).
38        bytes_written: u64,
39        /// Whether the last frame in the batch is a commit frame.
40        is_commit: bool,
41        /// Monotonic timestamp in nanoseconds.
42        timestamp_ns: u64,
43    },
44
45    /// WAL chain replay started during open or recovery.
46    ReplayStarted {
47        /// Total valid frames found in the WAL chain.
48        valid_frames: usize,
49        /// Frames eligible for replay (up to last commit boundary).
50        replayable_frames: usize,
51        /// Monotonic timestamp in nanoseconds.
52        timestamp_ns: u64,
53    },
54
55    /// WAL chain replay completed.
56    ReplayCompleted {
57        /// Frames successfully replayed to the page cache.
58        frames_replayed: usize,
59        /// Duration of replay in microseconds.
60        duration_us: u64,
61        /// Monotonic timestamp in nanoseconds.
62        timestamp_ns: u64,
63    },
64
65    /// Checkpoint operation started.
66    CheckpointStarted {
67        /// Checkpoint mode requested.
68        mode: CheckpointMode,
69        /// Frames eligible for backfill.
70        frames_to_backfill: u32,
71        /// Monotonic timestamp in nanoseconds.
72        timestamp_ns: u64,
73    },
74
75    /// Checkpoint operation completed.
76    CheckpointCompleted {
77        /// Checkpoint mode that was executed.
78        mode: CheckpointMode,
79        /// Frames actually backfilled to the database file.
80        frames_backfilled: u32,
81        /// Whether the WAL was reset after checkpoint.
82        wal_reset: bool,
83        /// Duration of checkpoint in microseconds.
84        duration_us: u64,
85        /// Monotonic timestamp in nanoseconds.
86        timestamp_ns: u64,
87    },
88
89    /// WAL file reset (post-checkpoint truncation or restart).
90    WalReset {
91        /// New checkpoint sequence number after reset.
92        new_checkpoint_seq: u32,
93        /// Monotonic timestamp in nanoseconds.
94        timestamp_ns: u64,
95    },
96
97    /// Checksum failure detected during chain validation or frame read.
98    ChecksumFailure {
99        /// Zero-based frame index where the failure occurred.
100        frame_index: usize,
101        /// Classification of the checksum failure.
102        kind: ChecksumFailureKind,
103        /// Recovery action selected for this failure.
104        action: RecoveryAction,
105        /// Monotonic timestamp in nanoseconds.
106        timestamp_ns: u64,
107    },
108
109    /// WAL chain validation completed (during open or recovery).
110    ChainValidated {
111        /// Total frames examined.
112        total_frames: usize,
113        /// Whether the chain is fully valid.
114        valid: bool,
115        /// First invalid frame index, if any.
116        first_invalid_frame: Option<usize>,
117        /// Reason for invalidity, if applicable.
118        reason: Option<WalChainInvalidReason>,
119        /// Monotonic timestamp in nanoseconds.
120        timestamp_ns: u64,
121    },
122
123    /// FEC repair attempted for a corrupted commit group.
124    FecRepairAttempted {
125        /// Outcome of the repair attempt.
126        outcome: WalFecRepairOutcome,
127        /// Number of repair symbols available.
128        symbols_available: usize,
129        /// Duration of the repair attempt in microseconds.
130        duration_us: u64,
131        /// Monotonic timestamp in nanoseconds.
132        timestamp_ns: u64,
133    },
134
135    /// Group commit flush completed.
136    GroupCommitFlushed {
137        /// Number of transactions in this group.
138        batch_size: u32,
139        /// Total frames written in the group.
140        total_frames: u32,
141        /// Flush latency in microseconds.
142        latency_us: u64,
143        /// Monotonic timestamp in nanoseconds.
144        timestamp_ns: u64,
145    },
146}
147
148impl WalTelemetryEvent {
149    /// Extract the monotonic timestamp from any event variant.
150    #[must_use]
151    pub fn timestamp_ns(&self) -> u64 {
152        match self {
153            Self::FrameAppended { timestamp_ns, .. }
154            | Self::ReplayStarted { timestamp_ns, .. }
155            | Self::ReplayCompleted { timestamp_ns, .. }
156            | Self::CheckpointStarted { timestamp_ns, .. }
157            | Self::CheckpointCompleted { timestamp_ns, .. }
158            | Self::WalReset { timestamp_ns, .. }
159            | Self::ChecksumFailure { timestamp_ns, .. }
160            | Self::ChainValidated { timestamp_ns, .. }
161            | Self::FecRepairAttempted { timestamp_ns, .. }
162            | Self::GroupCommitFlushed { timestamp_ns, .. } => *timestamp_ns,
163        }
164    }
165
166    /// Short classification label for this event kind.
167    #[must_use]
168    pub fn kind_str(&self) -> &'static str {
169        match self {
170            Self::FrameAppended { .. } => "frame_appended",
171            Self::ReplayStarted { .. } => "replay_started",
172            Self::ReplayCompleted { .. } => "replay_completed",
173            Self::CheckpointStarted { .. } => "checkpoint_started",
174            Self::CheckpointCompleted { .. } => "checkpoint_completed",
175            Self::WalReset { .. } => "wal_reset",
176            Self::ChecksumFailure { .. } => "checksum_failure",
177            Self::ChainValidated { .. } => "chain_validated",
178            Self::FecRepairAttempted { .. } => "fec_repair_attempted",
179            Self::GroupCommitFlushed { .. } => "group_commit_flushed",
180        }
181    }
182}
183
184// ---------------------------------------------------------------------------
185// Observer trait (zero-cost when unused)
186// ---------------------------------------------------------------------------
187
188/// Trait for receiving structured WAL telemetry events.
189///
190/// Mirrors the `ConflictObserver` pattern: implementations MUST NOT block,
191/// acquire page locks, or perform I/O.  The [`NoOpWalObserver`] default is
192/// compiled away entirely when the WAL is instantiated without telemetry.
193pub trait WalTelemetryObserver: Send + Sync {
194    /// Called for each telemetry event emitted by WAL operations.
195    fn on_event(&self, event: &WalTelemetryEvent);
196}
197
198/// No-op observer that compiles to zero instructions.
199pub struct NoOpWalObserver;
200
201impl WalTelemetryObserver for NoOpWalObserver {
202    #[inline(always)]
203    fn on_event(&self, _event: &WalTelemetryEvent) {}
204}
205
206/// Ring-buffer observer that stores the last N events for diagnostic queries.
207pub struct WalTelemetryRingBuffer {
208    events: fsqlite_types::sync_primitives::Mutex<WalRingBufferInner>,
209}
210
211struct WalRingBufferInner {
212    buf: Vec<WalTelemetryEvent>,
213    capacity: usize,
214    write_pos: usize,
215    count: usize,
216}
217
218impl WalTelemetryRingBuffer {
219    /// Create a ring buffer with the given capacity.
220    #[must_use]
221    pub fn new(capacity: usize) -> Self {
222        Self {
223            events: fsqlite_types::sync_primitives::Mutex::new(WalRingBufferInner {
224                buf: Vec::with_capacity(capacity),
225                capacity,
226                write_pos: 0,
227                count: 0,
228            }),
229        }
230    }
231
232    /// Drain the most recent events (up to capacity) in chronological order.
233    #[must_use]
234    pub fn drain(&self) -> Vec<WalTelemetryEvent> {
235        let inner = self.events.lock();
236        let n = inner.count.min(inner.capacity);
237        let mut result = Vec::with_capacity(n);
238        if n == 0 {
239            return result;
240        }
241        let start = if inner.count >= inner.capacity {
242            inner.write_pos
243        } else {
244            0
245        };
246        for i in 0..n {
247            let idx = (start + i) % inner.capacity;
248            result.push(inner.buf[idx].clone());
249        }
250        result
251    }
252
253    /// Number of events currently stored.
254    #[must_use]
255    pub fn len(&self) -> usize {
256        let inner = self.events.lock();
257        inner.count.min(inner.capacity)
258    }
259
260    /// Whether the buffer is empty.
261    #[must_use]
262    pub fn is_empty(&self) -> bool {
263        self.len() == 0
264    }
265}
266
267impl WalTelemetryObserver for WalTelemetryRingBuffer {
268    fn on_event(&self, event: &WalTelemetryEvent) {
269        let mut inner = self.events.lock();
270        let pos = inner.write_pos;
271        if inner.buf.len() < inner.capacity {
272            inner.buf.push(event.clone());
273        } else {
274            inner.buf[pos] = event.clone();
275        }
276        inner.write_pos = (pos + 1) % inner.capacity;
277        inner.count += 1;
278    }
279}
280
281// ---------------------------------------------------------------------------
282// Composite snapshot
283// ---------------------------------------------------------------------------
284
285/// Unified snapshot of all WAL telemetry counters (metrics + FEC + recovery +
286/// group commit).  Produced by [`wal_telemetry_snapshot`].
287#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
288pub struct WalTelemetrySnapshot {
289    pub wal: crate::metrics::WalMetricsSnapshot,
290    pub fec_repair: crate::metrics::WalFecRepairCountersSnapshot,
291    pub recovery: crate::metrics::WalRecoveryCountersSnapshot,
292    pub group_commit: crate::metrics::GroupCommitMetricsSnapshot,
293    pub consolidation: crate::group_commit::ConsolidationMetricsSnapshot,
294}
295
296/// Collect a point-in-time snapshot of all global WAL telemetry counters.
297#[must_use]
298pub fn wal_telemetry_snapshot() -> WalTelemetrySnapshot {
299    WalTelemetrySnapshot {
300        wal: crate::metrics::GLOBAL_WAL_METRICS.snapshot(),
301        fec_repair: crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.snapshot(),
302        recovery: crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.snapshot(),
303        group_commit: crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.snapshot(),
304        consolidation: crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.snapshot(),
305    }
306}
307
308// ===========================================================================
309// Tests — CI conformance validator
310// ===========================================================================
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use crate::checkpoint::CheckpointMode;
316    use crate::checksum::{
317        ChecksumFailureKind, RecoveryAction, WalChainInvalidReason, WalFecRepairOutcome,
318    };
319
320    struct ResetTelemetryGlobals;
321
322    impl Drop for ResetTelemetryGlobals {
323        fn drop(&mut self) {
324            crate::metrics::GLOBAL_WAL_METRICS.reset();
325            crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.reset();
326            crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.reset();
327            crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.reset();
328            crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.reset();
329        }
330    }
331
332    // ── Helper: build one event per variant ──
333
334    fn all_event_variants() -> Vec<WalTelemetryEvent> {
335        vec![
336            WalTelemetryEvent::FrameAppended {
337                frame_count: 3,
338                bytes_written: 12_360,
339                is_commit: true,
340                timestamp_ns: 1_000_000,
341            },
342            WalTelemetryEvent::ReplayStarted {
343                valid_frames: 10,
344                replayable_frames: 8,
345                timestamp_ns: 2_000_000,
346            },
347            WalTelemetryEvent::ReplayCompleted {
348                frames_replayed: 8,
349                duration_us: 500,
350                timestamp_ns: 3_000_000,
351            },
352            WalTelemetryEvent::CheckpointStarted {
353                mode: CheckpointMode::Passive,
354                frames_to_backfill: 20,
355                timestamp_ns: 4_000_000,
356            },
357            WalTelemetryEvent::CheckpointCompleted {
358                mode: CheckpointMode::Restart,
359                frames_backfilled: 20,
360                wal_reset: true,
361                duration_us: 3500,
362                timestamp_ns: 5_000_000,
363            },
364            WalTelemetryEvent::WalReset {
365                new_checkpoint_seq: 7,
366                timestamp_ns: 6_000_000,
367            },
368            WalTelemetryEvent::ChecksumFailure {
369                frame_index: 4,
370                kind: ChecksumFailureKind::WalFrameChecksumMismatch,
371                action: RecoveryAction::AttemptWalFecRepair,
372                timestamp_ns: 7_000_000,
373            },
374            WalTelemetryEvent::ChainValidated {
375                total_frames: 10,
376                valid: false,
377                first_invalid_frame: Some(4),
378                reason: Some(WalChainInvalidReason::FrameChecksumMismatch),
379                timestamp_ns: 8_000_000,
380            },
381            WalTelemetryEvent::FecRepairAttempted {
382                outcome: WalFecRepairOutcome::Repaired,
383                symbols_available: 12,
384                duration_us: 200,
385                timestamp_ns: 9_000_000,
386            },
387            WalTelemetryEvent::GroupCommitFlushed {
388                batch_size: 4,
389                total_frames: 16,
390                latency_us: 1200,
391                timestamp_ns: 10_000_000,
392            },
393        ]
394    }
395
396    // ── Conformance rule 1: every variant has a timestamp_ns ──
397
398    #[test]
399    fn conformance_every_variant_has_monotonic_timestamp() {
400        let events = all_event_variants();
401        let mut prev_ts = 0u64;
402        for event in &events {
403            let ts = event.timestamp_ns();
404            assert!(
405                ts > prev_ts,
406                "timestamp must be monotonic: {} <= {} for {:?}",
407                ts,
408                prev_ts,
409                event.kind_str()
410            );
411            prev_ts = ts;
412        }
413    }
414
415    // ── Conformance rule 2: all events serialize to valid JSON ──
416
417    #[test]
418    fn conformance_all_events_serialize_to_json() {
419        for event in all_event_variants() {
420            let json = serde_json::to_string(&event)
421                .unwrap_or_else(|e| panic!("failed to serialize {:?}: {e}", event.kind_str()));
422            assert!(
423                !json.is_empty(),
424                "serialized JSON must not be empty for {}",
425                event.kind_str()
426            );
427            // Verify it round-trips as valid JSON (parseable as Value).
428            let _: serde_json::Value = serde_json::from_str(&json)
429                .unwrap_or_else(|e| panic!("JSON not parseable for {}: {e}", event.kind_str()));
430        }
431    }
432
433    // ── Conformance rule 2b: all snapshots serialize to JSON ──
434
435    #[test]
436    fn conformance_wal_metrics_snapshot_serializes() {
437        let snap = crate::metrics::WalMetrics::new();
438        snap.record_frame_write(4096);
439        snap.record_checkpoint(5, 2000);
440        snap.record_wal_reset();
441        let s = snap.snapshot();
442        let json = serde_json::to_string(&s).expect("WalMetricsSnapshot must serialize");
443        assert!(json.contains("frames_written_total"));
444        assert!(json.contains("checkpoint_count"));
445    }
446
447    #[test]
448    fn conformance_fec_repair_snapshot_serializes() {
449        let c = crate::metrics::WalFecRepairCounters::new();
450        c.record_repair(true, 500);
451        c.record_encode();
452        let s = c.snapshot();
453        let json = serde_json::to_string(&s).expect("WalFecRepairCountersSnapshot must serialize");
454        assert!(json.contains("repairs_succeeded"));
455        assert!(json.contains("encode_ops"));
456    }
457
458    #[test]
459    fn conformance_recovery_snapshot_serializes() {
460        let r = crate::metrics::WalRecoveryCounters::new();
461        r.record_recovery(10, 2, 1);
462        let s = r.snapshot();
463        let json = serde_json::to_string(&s).expect("WalRecoveryCountersSnapshot must serialize");
464        assert!(json.contains("recovery_frames_total"));
465        assert!(json.contains("corruption_detected_total"));
466    }
467
468    #[test]
469    fn conformance_group_commit_snapshot_serializes() {
470        let g = crate::metrics::GroupCommitMetrics::new();
471        g.record_group_commit(3, 1000);
472        g.record_submission();
473        let s = g.snapshot();
474        let json = serde_json::to_string(&s).expect("GroupCommitMetricsSnapshot must serialize");
475        assert!(json.contains("group_commits_total"));
476        assert!(json.contains("submissions_total"));
477    }
478
479    #[test]
480    fn conformance_composite_snapshot_serializes() {
481        let snap = wal_telemetry_snapshot();
482        let json = serde_json::to_string(&snap).expect("WalTelemetrySnapshot must serialize");
483        // Must contain all five sub-sections.
484        assert!(json.contains("wal"));
485        assert!(json.contains("fec_repair"));
486        assert!(json.contains("recovery"));
487        assert!(json.contains("group_commit"));
488        assert!(json.contains("consolidation"));
489    }
490
491    // ── Conformance rule 3: kind_str covers every variant ──
492
493    #[test]
494    fn conformance_kind_str_unique_per_variant() {
495        let events = all_event_variants();
496        let kinds: Vec<&str> = events.iter().map(|e| e.kind_str()).collect();
497        // All must be non-empty.
498        for k in &kinds {
499            assert!(!k.is_empty(), "kind_str must not be empty");
500        }
501        // All must be unique.
502        let mut sorted = kinds.clone();
503        sorted.sort();
504        sorted.dedup();
505        assert_eq!(
506            kinds.len(),
507            sorted.len(),
508            "kind_str must be unique per variant"
509        );
510    }
511
512    // ── Conformance rule 4: event variant count is exhaustive ──
513
514    #[test]
515    fn conformance_variant_count_matches_schema() {
516        // This test locks the schema at 10 variants.  If a new variant is
517        // added, this test must be updated, forcing the author to also add
518        // the variant to `all_event_variants()` and the serialization tests.
519        assert_eq!(
520            all_event_variants().len(),
521            10,
522            "WalTelemetryEvent must have exactly 10 variants (update all_event_variants if adding)"
523        );
524    }
525
526    // ── Conformance rule 5: ChecksumFailureKind variants are exhaustive ──
527
528    #[test]
529    fn conformance_checksum_failure_kinds_serialize() {
530        let kinds = [
531            ChecksumFailureKind::WalFrameChecksumMismatch,
532            ChecksumFailureKind::Xxh3PageChecksumMismatch,
533            ChecksumFailureKind::Crc32cSymbolMismatch,
534            ChecksumFailureKind::DbFileCorruption,
535        ];
536        for kind in kinds {
537            let json = serde_json::to_string(&kind)
538                .unwrap_or_else(|e| panic!("ChecksumFailureKind::{kind:?} serialize failed: {e}"));
539            assert!(!json.is_empty());
540        }
541    }
542
543    #[test]
544    fn conformance_recovery_actions_serialize() {
545        let actions = [
546            RecoveryAction::AttemptWalFecRepair,
547            RecoveryAction::TruncateWalAtFirstInvalidFrame,
548            RecoveryAction::EvictCacheAndRetryFromWal,
549            RecoveryAction::ExcludeCorruptedSymbolAndContinue,
550            RecoveryAction::ReportPersistentCorruption,
551        ];
552        for action in actions {
553            let json = serde_json::to_string(&action)
554                .unwrap_or_else(|e| panic!("RecoveryAction::{action:?} serialize failed: {e}"));
555            assert!(!json.is_empty());
556        }
557    }
558
559    #[test]
560    fn conformance_wal_chain_invalid_reasons_serialize() {
561        let reasons = [
562            WalChainInvalidReason::HeaderChecksumMismatch,
563            WalChainInvalidReason::TruncatedFrame,
564            WalChainInvalidReason::SaltMismatch,
565            WalChainInvalidReason::FrameSaltMismatch,
566            WalChainInvalidReason::FrameChecksumMismatch,
567        ];
568        for reason in reasons {
569            let json = serde_json::to_string(&reason).unwrap_or_else(|e| {
570                panic!("WalChainInvalidReason::{reason:?} serialize failed: {e}")
571            });
572            assert!(!json.is_empty());
573        }
574    }
575
576    #[test]
577    fn conformance_checkpoint_modes_serialize() {
578        let modes = [
579            CheckpointMode::Passive,
580            CheckpointMode::Full,
581            CheckpointMode::Restart,
582            CheckpointMode::Truncate,
583        ];
584        for mode in modes {
585            let json = serde_json::to_string(&mode)
586                .unwrap_or_else(|e| panic!("CheckpointMode::{mode:?} serialize failed: {e}"));
587            assert!(!json.is_empty());
588        }
589    }
590
591    #[test]
592    fn conformance_fec_repair_outcomes_serialize() {
593        let outcomes = [
594            WalFecRepairOutcome::Repaired,
595            WalFecRepairOutcome::InsufficientSymbols,
596            WalFecRepairOutcome::SourceHashMismatch,
597        ];
598        for outcome in outcomes {
599            let json = serde_json::to_string(&outcome).unwrap_or_else(|e| {
600                panic!("WalFecRepairOutcome::{outcome:?} serialize failed: {e}")
601            });
602            assert!(!json.is_empty());
603        }
604    }
605
606    // ── Observer trait tests ──
607
608    #[test]
609    fn noop_observer_compiles_away() {
610        let obs = NoOpWalObserver;
611        let event = WalTelemetryEvent::FrameAppended {
612            frame_count: 1,
613            bytes_written: 4120,
614            is_commit: false,
615            timestamp_ns: 42,
616        };
617        // Should be a no-op; just verify it doesn't panic.
618        obs.on_event(&event);
619    }
620
621    #[test]
622    fn ring_buffer_stores_events() {
623        let rb = WalTelemetryRingBuffer::new(4);
624        assert!(rb.is_empty());
625        for (i, event) in all_event_variants().into_iter().enumerate().take(3) {
626            let _ = i;
627            rb.on_event(&event);
628        }
629        assert_eq!(rb.len(), 3);
630        let drained = rb.drain();
631        assert_eq!(drained.len(), 3);
632    }
633
634    #[test]
635    fn ring_buffer_wraps_at_capacity() {
636        let rb = WalTelemetryRingBuffer::new(3);
637        let events = all_event_variants();
638        // Push 5 events into a buffer of capacity 3.
639        for event in events.iter().take(5) {
640            rb.on_event(event);
641        }
642        assert_eq!(rb.len(), 3);
643        let drained = rb.drain();
644        assert_eq!(drained.len(), 3);
645        // Should have the last 3 events.
646        assert_eq!(drained[0].kind_str(), events[2].kind_str());
647        assert_eq!(drained[1].kind_str(), events[3].kind_str());
648        assert_eq!(drained[2].kind_str(), events[4].kind_str());
649    }
650
651    #[test]
652    fn ring_buffer_drain_preserves_chronological_order() {
653        let rb = WalTelemetryRingBuffer::new(10);
654        let events = all_event_variants();
655        for event in &events {
656            rb.on_event(event);
657        }
658        let drained = rb.drain();
659        for pair in drained.windows(2) {
660            assert!(
661                pair[0].timestamp_ns() <= pair[1].timestamp_ns(),
662                "drain must be chronological"
663            );
664        }
665    }
666
667    // ── Composite snapshot tests ──
668
669    #[test]
670    fn composite_snapshot_captures_all_globals() {
671        let _group_metrics_guard = crate::metrics::GLOBAL_GROUP_COMMIT_METRICS_TEST_LOCK
672            .lock()
673            .expect("global group commit metrics test lock poisoned");
674        let _guard = crate::group_commit::GLOBAL_CONSOLIDATION_METRICS_TEST_LOCK
675            .lock()
676            .expect("global consolidation metrics test lock poisoned");
677        let _reset = ResetTelemetryGlobals;
678        // Reset globals to known state.
679        crate::metrics::GLOBAL_WAL_METRICS.reset();
680        crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.reset();
681        crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.reset();
682        crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.reset();
683        crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.reset();
684
685        // Record some activity.
686        crate::metrics::GLOBAL_WAL_METRICS.record_frame_write(4096);
687        crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.record_repair(true, 100);
688        crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.record_recovery(5, 1, 1);
689        crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.record_group_commit(2, 500);
690        crate::group_commit::GLOBAL_CONSOLIDATION_METRICS
691            .record_phase_timing(10, 20, 30, true, 40, 50, 60, 70, 80, 0);
692
693        let snap = wal_telemetry_snapshot();
694        assert_eq!(snap.wal.frames_written_total, 1);
695        assert_eq!(snap.fec_repair.repairs_succeeded, 1);
696        assert_eq!(snap.recovery.recovery_frames_total, 5);
697        assert_eq!(snap.group_commit.group_commits_total, 1);
698        assert_eq!(snap.consolidation.total_commits(), 1);
699        assert_eq!(snap.consolidation.inner_lock_wait_us_total, 50);
700    }
701
702    #[test]
703    fn ring_buffer_capacity_one_keeps_latest() {
704        let rb = WalTelemetryRingBuffer::new(1);
705        let events = all_event_variants();
706        for event in &events {
707            rb.on_event(event);
708        }
709        assert_eq!(rb.len(), 1);
710        let drained = rb.drain();
711        assert_eq!(drained.len(), 1);
712        assert_eq!(drained[0].kind_str(), events.last().unwrap().kind_str());
713    }
714
715    #[test]
716    fn ring_buffer_drain_on_empty_returns_empty_vec() {
717        let rb = WalTelemetryRingBuffer::new(8);
718        assert!(rb.is_empty());
719        let drained = rb.drain();
720        assert!(drained.is_empty());
721    }
722
723    #[test]
724    fn ring_buffer_is_not_empty_after_overflow() {
725        let rb = WalTelemetryRingBuffer::new(2);
726        for event in all_event_variants().iter().take(5) {
727            rb.on_event(event);
728        }
729        assert!(!rb.is_empty());
730        assert_eq!(rb.len(), 2);
731    }
732
733    #[test]
734    fn event_clone_and_equality() {
735        let event = WalTelemetryEvent::FrameAppended {
736            frame_count: 5,
737            bytes_written: 20_480,
738            is_commit: true,
739            timestamp_ns: 999,
740        };
741        let cloned = event.clone();
742        assert_eq!(event, cloned);
743
744        let different = WalTelemetryEvent::FrameAppended {
745            frame_count: 5,
746            bytes_written: 20_480,
747            is_commit: false,
748            timestamp_ns: 999,
749        };
750        assert_ne!(event, different);
751    }
752
753    #[test]
754    fn observer_trait_is_object_safe() {
755        let obs: Box<dyn WalTelemetryObserver> = Box::new(NoOpWalObserver);
756        let event = WalTelemetryEvent::WalReset {
757            new_checkpoint_seq: 1,
758            timestamp_ns: 42,
759        };
760        obs.on_event(&event);
761    }
762
763    #[test]
764    fn event_serialize_produces_valid_json() {
765        let event = WalTelemetryEvent::FrameAppended {
766            frame_count: 2,
767            bytes_written: 8240,
768            is_commit: true,
769            timestamp_ns: 42,
770        };
771        let json = serde_json::to_string(&event).expect("serialize");
772        assert!(json.contains("FrameAppended"));
773        assert!(json.contains("8240"));
774        assert!(json.contains("true"));
775    }
776
777    #[test]
778    fn kind_str_covers_all_ten_variants() {
779        let events = all_event_variants();
780        assert_eq!(
781            events.len(),
782            10,
783            "all_event_variants must cover 10 variants"
784        );
785        let kinds: Vec<&str> = events.iter().map(|e| e.kind_str()).collect();
786        let unique: std::collections::HashSet<&str> = kinds.iter().copied().collect();
787        assert_eq!(unique.len(), 10, "all kind_str values must be distinct");
788    }
789
790    #[test]
791    fn wal_telemetry_snapshot_debug_and_serialize() {
792        let _group_metrics_guard = crate::metrics::GLOBAL_GROUP_COMMIT_METRICS_TEST_LOCK
793            .lock()
794            .expect("global group commit metrics test lock poisoned");
795        let _guard = crate::group_commit::GLOBAL_CONSOLIDATION_METRICS_TEST_LOCK
796            .lock()
797            .expect("lock");
798        let _reset = ResetTelemetryGlobals;
799        crate::metrics::GLOBAL_WAL_METRICS.reset();
800        crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.reset();
801        crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.reset();
802        crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.reset();
803        crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.reset();
804
805        let snap = wal_telemetry_snapshot();
806        let dbg = format!("{snap:?}");
807        assert!(dbg.contains("WalTelemetrySnapshot"));
808        let json = serde_json::to_string(&snap).expect("serialize snapshot");
809        assert!(json.contains("wal"));
810        assert!(json.contains("fec_repair"));
811    }
812
813    #[test]
814    fn ring_buffer_drain_single_event() {
815        let rb = WalTelemetryRingBuffer::new(8);
816        let event = WalTelemetryEvent::WalReset {
817            new_checkpoint_seq: 99,
818            timestamp_ns: 1,
819        };
820        rb.on_event(&event);
821        assert_eq!(rb.len(), 1);
822        let drained = rb.drain();
823        assert_eq!(drained.len(), 1);
824        assert_eq!(drained[0], event);
825    }
826
827    #[test]
828    fn ring_buffer_wraps_at_capacity_with_uniform_events() {
829        let rb = WalTelemetryRingBuffer::new(2);
830        for i in 0..5u64 {
831            rb.on_event(&WalTelemetryEvent::WalReset {
832                new_checkpoint_seq: i as u32,
833                timestamp_ns: i,
834            });
835        }
836        assert_eq!(rb.len(), 2);
837        let drained = rb.drain();
838        assert_eq!(drained.len(), 2);
839        if let WalTelemetryEvent::WalReset {
840            new_checkpoint_seq, ..
841        } = &drained[0]
842        {
843            assert_eq!(*new_checkpoint_seq, 3);
844        } else {
845            panic!("expected WalReset");
846        }
847    }
848
849    #[test]
850    fn noop_observer_does_not_panic() {
851        let obs = NoOpWalObserver;
852        obs.on_event(&WalTelemetryEvent::WalReset {
853            new_checkpoint_seq: 0,
854            timestamp_ns: 0,
855        });
856    }
857
858    #[test]
859    fn ring_buffer_empty_drain_returns_empty() {
860        let rb = WalTelemetryRingBuffer::new(4);
861        assert!(rb.is_empty());
862        assert_eq!(rb.len(), 0);
863        let drained = rb.drain();
864        assert!(drained.is_empty());
865    }
866
867    #[test]
868    fn wal_telemetry_event_debug_clone_eq() {
869        let a = WalTelemetryEvent::FrameAppended {
870            frame_count: 3,
871            bytes_written: 12288,
872            is_commit: true,
873            timestamp_ns: 100,
874        };
875        let b = a.clone();
876        assert_eq!(a, b);
877        let c = WalTelemetryEvent::FrameAppended {
878            frame_count: 1,
879            bytes_written: 4096,
880            is_commit: false,
881            timestamp_ns: 200,
882        };
883        assert_ne!(a, c);
884        let dbg = format!("{a:?}");
885        assert!(dbg.contains("FrameAppended"));
886    }
887}