1use serde::Serialize;
17
18use crate::checkpoint::CheckpointMode;
19use crate::checksum::{
20 ChecksumFailureKind, RecoveryAction, WalChainInvalidReason, WalFecRepairOutcome,
21};
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
32pub enum WalTelemetryEvent {
33 FrameAppended {
35 frame_count: u32,
37 bytes_written: u64,
39 is_commit: bool,
41 timestamp_ns: u64,
43 },
44
45 ReplayStarted {
47 valid_frames: usize,
49 replayable_frames: usize,
51 timestamp_ns: u64,
53 },
54
55 ReplayCompleted {
57 frames_replayed: usize,
59 duration_us: u64,
61 timestamp_ns: u64,
63 },
64
65 CheckpointStarted {
67 mode: CheckpointMode,
69 frames_to_backfill: u32,
71 timestamp_ns: u64,
73 },
74
75 CheckpointCompleted {
77 mode: CheckpointMode,
79 frames_backfilled: u32,
81 wal_reset: bool,
83 duration_us: u64,
85 timestamp_ns: u64,
87 },
88
89 WalReset {
91 new_checkpoint_seq: u32,
93 timestamp_ns: u64,
95 },
96
97 ChecksumFailure {
99 frame_index: usize,
101 kind: ChecksumFailureKind,
103 action: RecoveryAction,
105 timestamp_ns: u64,
107 },
108
109 ChainValidated {
111 total_frames: usize,
113 valid: bool,
115 first_invalid_frame: Option<usize>,
117 reason: Option<WalChainInvalidReason>,
119 timestamp_ns: u64,
121 },
122
123 FecRepairAttempted {
125 outcome: WalFecRepairOutcome,
127 symbols_available: usize,
129 duration_us: u64,
131 timestamp_ns: u64,
133 },
134
135 GroupCommitFlushed {
137 batch_size: u32,
139 total_frames: u32,
141 latency_us: u64,
143 timestamp_ns: u64,
145 },
146}
147
148impl WalTelemetryEvent {
149 #[must_use]
151 pub fn timestamp_ns(&self) -> u64 {
152 match self {
153 Self::FrameAppended { timestamp_ns, .. }
154 | Self::ReplayStarted { timestamp_ns, .. }
155 | Self::ReplayCompleted { timestamp_ns, .. }
156 | Self::CheckpointStarted { timestamp_ns, .. }
157 | Self::CheckpointCompleted { timestamp_ns, .. }
158 | Self::WalReset { timestamp_ns, .. }
159 | Self::ChecksumFailure { timestamp_ns, .. }
160 | Self::ChainValidated { timestamp_ns, .. }
161 | Self::FecRepairAttempted { timestamp_ns, .. }
162 | Self::GroupCommitFlushed { timestamp_ns, .. } => *timestamp_ns,
163 }
164 }
165
166 #[must_use]
168 pub fn kind_str(&self) -> &'static str {
169 match self {
170 Self::FrameAppended { .. } => "frame_appended",
171 Self::ReplayStarted { .. } => "replay_started",
172 Self::ReplayCompleted { .. } => "replay_completed",
173 Self::CheckpointStarted { .. } => "checkpoint_started",
174 Self::CheckpointCompleted { .. } => "checkpoint_completed",
175 Self::WalReset { .. } => "wal_reset",
176 Self::ChecksumFailure { .. } => "checksum_failure",
177 Self::ChainValidated { .. } => "chain_validated",
178 Self::FecRepairAttempted { .. } => "fec_repair_attempted",
179 Self::GroupCommitFlushed { .. } => "group_commit_flushed",
180 }
181 }
182}
183
184pub trait WalTelemetryObserver: Send + Sync {
194 fn on_event(&self, event: &WalTelemetryEvent);
196}
197
198pub struct NoOpWalObserver;
200
201impl WalTelemetryObserver for NoOpWalObserver {
202 #[inline(always)]
203 fn on_event(&self, _event: &WalTelemetryEvent) {}
204}
205
206pub struct WalTelemetryRingBuffer {
208 events: fsqlite_types::sync_primitives::Mutex<WalRingBufferInner>,
209}
210
211struct WalRingBufferInner {
212 buf: Vec<WalTelemetryEvent>,
213 capacity: usize,
214 write_pos: usize,
215 count: usize,
216}
217
218impl WalTelemetryRingBuffer {
219 #[must_use]
221 pub fn new(capacity: usize) -> Self {
222 Self {
223 events: fsqlite_types::sync_primitives::Mutex::new(WalRingBufferInner {
224 buf: Vec::with_capacity(capacity),
225 capacity,
226 write_pos: 0,
227 count: 0,
228 }),
229 }
230 }
231
232 #[must_use]
234 pub fn drain(&self) -> Vec<WalTelemetryEvent> {
235 let inner = self.events.lock();
236 let n = inner.count.min(inner.capacity);
237 let mut result = Vec::with_capacity(n);
238 if n == 0 {
239 return result;
240 }
241 let start = if inner.count >= inner.capacity {
242 inner.write_pos
243 } else {
244 0
245 };
246 for i in 0..n {
247 let idx = (start + i) % inner.capacity;
248 result.push(inner.buf[idx].clone());
249 }
250 result
251 }
252
253 #[must_use]
255 pub fn len(&self) -> usize {
256 let inner = self.events.lock();
257 inner.count.min(inner.capacity)
258 }
259
260 #[must_use]
262 pub fn is_empty(&self) -> bool {
263 self.len() == 0
264 }
265}
266
267impl WalTelemetryObserver for WalTelemetryRingBuffer {
268 fn on_event(&self, event: &WalTelemetryEvent) {
269 let mut inner = self.events.lock();
270 let pos = inner.write_pos;
271 if inner.buf.len() < inner.capacity {
272 inner.buf.push(event.clone());
273 } else {
274 inner.buf[pos] = event.clone();
275 }
276 inner.write_pos = (pos + 1) % inner.capacity;
277 inner.count += 1;
278 }
279}
280
281#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
288pub struct WalTelemetrySnapshot {
289 pub wal: crate::metrics::WalMetricsSnapshot,
290 pub fec_repair: crate::metrics::WalFecRepairCountersSnapshot,
291 pub recovery: crate::metrics::WalRecoveryCountersSnapshot,
292 pub group_commit: crate::metrics::GroupCommitMetricsSnapshot,
293 pub consolidation: crate::group_commit::ConsolidationMetricsSnapshot,
294}
295
296#[must_use]
298pub fn wal_telemetry_snapshot() -> WalTelemetrySnapshot {
299 WalTelemetrySnapshot {
300 wal: crate::metrics::GLOBAL_WAL_METRICS.snapshot(),
301 fec_repair: crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.snapshot(),
302 recovery: crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.snapshot(),
303 group_commit: crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.snapshot(),
304 consolidation: crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.snapshot(),
305 }
306}
307
308#[cfg(test)]
313mod tests {
314 use super::*;
315 use crate::checkpoint::CheckpointMode;
316 use crate::checksum::{
317 ChecksumFailureKind, RecoveryAction, WalChainInvalidReason, WalFecRepairOutcome,
318 };
319
320 struct ResetTelemetryGlobals;
321
322 impl Drop for ResetTelemetryGlobals {
323 fn drop(&mut self) {
324 crate::metrics::GLOBAL_WAL_METRICS.reset();
325 crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.reset();
326 crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.reset();
327 crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.reset();
328 crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.reset();
329 }
330 }
331
332 fn all_event_variants() -> Vec<WalTelemetryEvent> {
335 vec![
336 WalTelemetryEvent::FrameAppended {
337 frame_count: 3,
338 bytes_written: 12_360,
339 is_commit: true,
340 timestamp_ns: 1_000_000,
341 },
342 WalTelemetryEvent::ReplayStarted {
343 valid_frames: 10,
344 replayable_frames: 8,
345 timestamp_ns: 2_000_000,
346 },
347 WalTelemetryEvent::ReplayCompleted {
348 frames_replayed: 8,
349 duration_us: 500,
350 timestamp_ns: 3_000_000,
351 },
352 WalTelemetryEvent::CheckpointStarted {
353 mode: CheckpointMode::Passive,
354 frames_to_backfill: 20,
355 timestamp_ns: 4_000_000,
356 },
357 WalTelemetryEvent::CheckpointCompleted {
358 mode: CheckpointMode::Restart,
359 frames_backfilled: 20,
360 wal_reset: true,
361 duration_us: 3500,
362 timestamp_ns: 5_000_000,
363 },
364 WalTelemetryEvent::WalReset {
365 new_checkpoint_seq: 7,
366 timestamp_ns: 6_000_000,
367 },
368 WalTelemetryEvent::ChecksumFailure {
369 frame_index: 4,
370 kind: ChecksumFailureKind::WalFrameChecksumMismatch,
371 action: RecoveryAction::AttemptWalFecRepair,
372 timestamp_ns: 7_000_000,
373 },
374 WalTelemetryEvent::ChainValidated {
375 total_frames: 10,
376 valid: false,
377 first_invalid_frame: Some(4),
378 reason: Some(WalChainInvalidReason::FrameChecksumMismatch),
379 timestamp_ns: 8_000_000,
380 },
381 WalTelemetryEvent::FecRepairAttempted {
382 outcome: WalFecRepairOutcome::Repaired,
383 symbols_available: 12,
384 duration_us: 200,
385 timestamp_ns: 9_000_000,
386 },
387 WalTelemetryEvent::GroupCommitFlushed {
388 batch_size: 4,
389 total_frames: 16,
390 latency_us: 1200,
391 timestamp_ns: 10_000_000,
392 },
393 ]
394 }
395
396 #[test]
399 fn conformance_every_variant_has_monotonic_timestamp() {
400 let events = all_event_variants();
401 let mut prev_ts = 0u64;
402 for event in &events {
403 let ts = event.timestamp_ns();
404 assert!(
405 ts > prev_ts,
406 "timestamp must be monotonic: {} <= {} for {:?}",
407 ts,
408 prev_ts,
409 event.kind_str()
410 );
411 prev_ts = ts;
412 }
413 }
414
415 #[test]
418 fn conformance_all_events_serialize_to_json() {
419 for event in all_event_variants() {
420 let json = serde_json::to_string(&event)
421 .unwrap_or_else(|e| panic!("failed to serialize {:?}: {e}", event.kind_str()));
422 assert!(
423 !json.is_empty(),
424 "serialized JSON must not be empty for {}",
425 event.kind_str()
426 );
427 let _: serde_json::Value = serde_json::from_str(&json)
429 .unwrap_or_else(|e| panic!("JSON not parseable for {}: {e}", event.kind_str()));
430 }
431 }
432
433 #[test]
436 fn conformance_wal_metrics_snapshot_serializes() {
437 let snap = crate::metrics::WalMetrics::new();
438 snap.record_frame_write(4096);
439 snap.record_checkpoint(5, 2000);
440 snap.record_wal_reset();
441 let s = snap.snapshot();
442 let json = serde_json::to_string(&s).expect("WalMetricsSnapshot must serialize");
443 assert!(json.contains("frames_written_total"));
444 assert!(json.contains("checkpoint_count"));
445 }
446
447 #[test]
448 fn conformance_fec_repair_snapshot_serializes() {
449 let c = crate::metrics::WalFecRepairCounters::new();
450 c.record_repair(true, 500);
451 c.record_encode();
452 let s = c.snapshot();
453 let json = serde_json::to_string(&s).expect("WalFecRepairCountersSnapshot must serialize");
454 assert!(json.contains("repairs_succeeded"));
455 assert!(json.contains("encode_ops"));
456 }
457
458 #[test]
459 fn conformance_recovery_snapshot_serializes() {
460 let r = crate::metrics::WalRecoveryCounters::new();
461 r.record_recovery(10, 2, 1);
462 let s = r.snapshot();
463 let json = serde_json::to_string(&s).expect("WalRecoveryCountersSnapshot must serialize");
464 assert!(json.contains("recovery_frames_total"));
465 assert!(json.contains("corruption_detected_total"));
466 }
467
468 #[test]
469 fn conformance_group_commit_snapshot_serializes() {
470 let g = crate::metrics::GroupCommitMetrics::new();
471 g.record_group_commit(3, 1000);
472 g.record_submission();
473 let s = g.snapshot();
474 let json = serde_json::to_string(&s).expect("GroupCommitMetricsSnapshot must serialize");
475 assert!(json.contains("group_commits_total"));
476 assert!(json.contains("submissions_total"));
477 }
478
479 #[test]
480 fn conformance_composite_snapshot_serializes() {
481 let snap = wal_telemetry_snapshot();
482 let json = serde_json::to_string(&snap).expect("WalTelemetrySnapshot must serialize");
483 assert!(json.contains("wal"));
485 assert!(json.contains("fec_repair"));
486 assert!(json.contains("recovery"));
487 assert!(json.contains("group_commit"));
488 assert!(json.contains("consolidation"));
489 }
490
491 #[test]
494 fn conformance_kind_str_unique_per_variant() {
495 let events = all_event_variants();
496 let kinds: Vec<&str> = events.iter().map(|e| e.kind_str()).collect();
497 for k in &kinds {
499 assert!(!k.is_empty(), "kind_str must not be empty");
500 }
501 let mut sorted = kinds.clone();
503 sorted.sort();
504 sorted.dedup();
505 assert_eq!(
506 kinds.len(),
507 sorted.len(),
508 "kind_str must be unique per variant"
509 );
510 }
511
512 #[test]
515 fn conformance_variant_count_matches_schema() {
516 assert_eq!(
520 all_event_variants().len(),
521 10,
522 "WalTelemetryEvent must have exactly 10 variants (update all_event_variants if adding)"
523 );
524 }
525
526 #[test]
529 fn conformance_checksum_failure_kinds_serialize() {
530 let kinds = [
531 ChecksumFailureKind::WalFrameChecksumMismatch,
532 ChecksumFailureKind::Xxh3PageChecksumMismatch,
533 ChecksumFailureKind::Crc32cSymbolMismatch,
534 ChecksumFailureKind::DbFileCorruption,
535 ];
536 for kind in kinds {
537 let json = serde_json::to_string(&kind)
538 .unwrap_or_else(|e| panic!("ChecksumFailureKind::{kind:?} serialize failed: {e}"));
539 assert!(!json.is_empty());
540 }
541 }
542
543 #[test]
544 fn conformance_recovery_actions_serialize() {
545 let actions = [
546 RecoveryAction::AttemptWalFecRepair,
547 RecoveryAction::TruncateWalAtFirstInvalidFrame,
548 RecoveryAction::EvictCacheAndRetryFromWal,
549 RecoveryAction::ExcludeCorruptedSymbolAndContinue,
550 RecoveryAction::ReportPersistentCorruption,
551 ];
552 for action in actions {
553 let json = serde_json::to_string(&action)
554 .unwrap_or_else(|e| panic!("RecoveryAction::{action:?} serialize failed: {e}"));
555 assert!(!json.is_empty());
556 }
557 }
558
559 #[test]
560 fn conformance_wal_chain_invalid_reasons_serialize() {
561 let reasons = [
562 WalChainInvalidReason::HeaderChecksumMismatch,
563 WalChainInvalidReason::TruncatedFrame,
564 WalChainInvalidReason::SaltMismatch,
565 WalChainInvalidReason::FrameSaltMismatch,
566 WalChainInvalidReason::FrameChecksumMismatch,
567 ];
568 for reason in reasons {
569 let json = serde_json::to_string(&reason).unwrap_or_else(|e| {
570 panic!("WalChainInvalidReason::{reason:?} serialize failed: {e}")
571 });
572 assert!(!json.is_empty());
573 }
574 }
575
576 #[test]
577 fn conformance_checkpoint_modes_serialize() {
578 let modes = [
579 CheckpointMode::Passive,
580 CheckpointMode::Full,
581 CheckpointMode::Restart,
582 CheckpointMode::Truncate,
583 ];
584 for mode in modes {
585 let json = serde_json::to_string(&mode)
586 .unwrap_or_else(|e| panic!("CheckpointMode::{mode:?} serialize failed: {e}"));
587 assert!(!json.is_empty());
588 }
589 }
590
591 #[test]
592 fn conformance_fec_repair_outcomes_serialize() {
593 let outcomes = [
594 WalFecRepairOutcome::Repaired,
595 WalFecRepairOutcome::InsufficientSymbols,
596 WalFecRepairOutcome::SourceHashMismatch,
597 ];
598 for outcome in outcomes {
599 let json = serde_json::to_string(&outcome).unwrap_or_else(|e| {
600 panic!("WalFecRepairOutcome::{outcome:?} serialize failed: {e}")
601 });
602 assert!(!json.is_empty());
603 }
604 }
605
606 #[test]
609 fn noop_observer_compiles_away() {
610 let obs = NoOpWalObserver;
611 let event = WalTelemetryEvent::FrameAppended {
612 frame_count: 1,
613 bytes_written: 4120,
614 is_commit: false,
615 timestamp_ns: 42,
616 };
617 obs.on_event(&event);
619 }
620
621 #[test]
622 fn ring_buffer_stores_events() {
623 let rb = WalTelemetryRingBuffer::new(4);
624 assert!(rb.is_empty());
625 for (i, event) in all_event_variants().into_iter().enumerate().take(3) {
626 let _ = i;
627 rb.on_event(&event);
628 }
629 assert_eq!(rb.len(), 3);
630 let drained = rb.drain();
631 assert_eq!(drained.len(), 3);
632 }
633
634 #[test]
635 fn ring_buffer_wraps_at_capacity() {
636 let rb = WalTelemetryRingBuffer::new(3);
637 let events = all_event_variants();
638 for event in events.iter().take(5) {
640 rb.on_event(event);
641 }
642 assert_eq!(rb.len(), 3);
643 let drained = rb.drain();
644 assert_eq!(drained.len(), 3);
645 assert_eq!(drained[0].kind_str(), events[2].kind_str());
647 assert_eq!(drained[1].kind_str(), events[3].kind_str());
648 assert_eq!(drained[2].kind_str(), events[4].kind_str());
649 }
650
651 #[test]
652 fn ring_buffer_drain_preserves_chronological_order() {
653 let rb = WalTelemetryRingBuffer::new(10);
654 let events = all_event_variants();
655 for event in &events {
656 rb.on_event(event);
657 }
658 let drained = rb.drain();
659 for pair in drained.windows(2) {
660 assert!(
661 pair[0].timestamp_ns() <= pair[1].timestamp_ns(),
662 "drain must be chronological"
663 );
664 }
665 }
666
667 #[test]
670 fn composite_snapshot_captures_all_globals() {
671 let _group_metrics_guard = crate::metrics::GLOBAL_GROUP_COMMIT_METRICS_TEST_LOCK
672 .lock()
673 .expect("global group commit metrics test lock poisoned");
674 let _guard = crate::group_commit::GLOBAL_CONSOLIDATION_METRICS_TEST_LOCK
675 .lock()
676 .expect("global consolidation metrics test lock poisoned");
677 let _reset = ResetTelemetryGlobals;
678 crate::metrics::GLOBAL_WAL_METRICS.reset();
680 crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.reset();
681 crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.reset();
682 crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.reset();
683 crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.reset();
684
685 crate::metrics::GLOBAL_WAL_METRICS.record_frame_write(4096);
687 crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.record_repair(true, 100);
688 crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.record_recovery(5, 1, 1);
689 crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.record_group_commit(2, 500);
690 crate::group_commit::GLOBAL_CONSOLIDATION_METRICS
691 .record_phase_timing(10, 20, 30, true, 40, 50, 60, 70, 80, 0);
692
693 let snap = wal_telemetry_snapshot();
694 assert_eq!(snap.wal.frames_written_total, 1);
695 assert_eq!(snap.fec_repair.repairs_succeeded, 1);
696 assert_eq!(snap.recovery.recovery_frames_total, 5);
697 assert_eq!(snap.group_commit.group_commits_total, 1);
698 assert_eq!(snap.consolidation.total_commits(), 1);
699 assert_eq!(snap.consolidation.inner_lock_wait_us_total, 50);
700 }
701
702 #[test]
703 fn ring_buffer_capacity_one_keeps_latest() {
704 let rb = WalTelemetryRingBuffer::new(1);
705 let events = all_event_variants();
706 for event in &events {
707 rb.on_event(event);
708 }
709 assert_eq!(rb.len(), 1);
710 let drained = rb.drain();
711 assert_eq!(drained.len(), 1);
712 assert_eq!(drained[0].kind_str(), events.last().unwrap().kind_str());
713 }
714
715 #[test]
716 fn ring_buffer_drain_on_empty_returns_empty_vec() {
717 let rb = WalTelemetryRingBuffer::new(8);
718 assert!(rb.is_empty());
719 let drained = rb.drain();
720 assert!(drained.is_empty());
721 }
722
723 #[test]
724 fn ring_buffer_is_not_empty_after_overflow() {
725 let rb = WalTelemetryRingBuffer::new(2);
726 for event in all_event_variants().iter().take(5) {
727 rb.on_event(event);
728 }
729 assert!(!rb.is_empty());
730 assert_eq!(rb.len(), 2);
731 }
732
733 #[test]
734 fn event_clone_and_equality() {
735 let event = WalTelemetryEvent::FrameAppended {
736 frame_count: 5,
737 bytes_written: 20_480,
738 is_commit: true,
739 timestamp_ns: 999,
740 };
741 let cloned = event.clone();
742 assert_eq!(event, cloned);
743
744 let different = WalTelemetryEvent::FrameAppended {
745 frame_count: 5,
746 bytes_written: 20_480,
747 is_commit: false,
748 timestamp_ns: 999,
749 };
750 assert_ne!(event, different);
751 }
752
753 #[test]
754 fn observer_trait_is_object_safe() {
755 let obs: Box<dyn WalTelemetryObserver> = Box::new(NoOpWalObserver);
756 let event = WalTelemetryEvent::WalReset {
757 new_checkpoint_seq: 1,
758 timestamp_ns: 42,
759 };
760 obs.on_event(&event);
761 }
762
763 #[test]
764 fn event_serialize_produces_valid_json() {
765 let event = WalTelemetryEvent::FrameAppended {
766 frame_count: 2,
767 bytes_written: 8240,
768 is_commit: true,
769 timestamp_ns: 42,
770 };
771 let json = serde_json::to_string(&event).expect("serialize");
772 assert!(json.contains("FrameAppended"));
773 assert!(json.contains("8240"));
774 assert!(json.contains("true"));
775 }
776
777 #[test]
778 fn kind_str_covers_all_ten_variants() {
779 let events = all_event_variants();
780 assert_eq!(
781 events.len(),
782 10,
783 "all_event_variants must cover 10 variants"
784 );
785 let kinds: Vec<&str> = events.iter().map(|e| e.kind_str()).collect();
786 let unique: std::collections::HashSet<&str> = kinds.iter().copied().collect();
787 assert_eq!(unique.len(), 10, "all kind_str values must be distinct");
788 }
789
790 #[test]
791 fn wal_telemetry_snapshot_debug_and_serialize() {
792 let _group_metrics_guard = crate::metrics::GLOBAL_GROUP_COMMIT_METRICS_TEST_LOCK
793 .lock()
794 .expect("global group commit metrics test lock poisoned");
795 let _guard = crate::group_commit::GLOBAL_CONSOLIDATION_METRICS_TEST_LOCK
796 .lock()
797 .expect("lock");
798 let _reset = ResetTelemetryGlobals;
799 crate::metrics::GLOBAL_WAL_METRICS.reset();
800 crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.reset();
801 crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.reset();
802 crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.reset();
803 crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.reset();
804
805 let snap = wal_telemetry_snapshot();
806 let dbg = format!("{snap:?}");
807 assert!(dbg.contains("WalTelemetrySnapshot"));
808 let json = serde_json::to_string(&snap).expect("serialize snapshot");
809 assert!(json.contains("wal"));
810 assert!(json.contains("fec_repair"));
811 }
812
813 #[test]
814 fn ring_buffer_drain_single_event() {
815 let rb = WalTelemetryRingBuffer::new(8);
816 let event = WalTelemetryEvent::WalReset {
817 new_checkpoint_seq: 99,
818 timestamp_ns: 1,
819 };
820 rb.on_event(&event);
821 assert_eq!(rb.len(), 1);
822 let drained = rb.drain();
823 assert_eq!(drained.len(), 1);
824 assert_eq!(drained[0], event);
825 }
826
827 #[test]
828 fn ring_buffer_wraps_at_capacity_with_uniform_events() {
829 let rb = WalTelemetryRingBuffer::new(2);
830 for i in 0..5u64 {
831 rb.on_event(&WalTelemetryEvent::WalReset {
832 new_checkpoint_seq: i as u32,
833 timestamp_ns: i,
834 });
835 }
836 assert_eq!(rb.len(), 2);
837 let drained = rb.drain();
838 assert_eq!(drained.len(), 2);
839 if let WalTelemetryEvent::WalReset {
840 new_checkpoint_seq, ..
841 } = &drained[0]
842 {
843 assert_eq!(*new_checkpoint_seq, 3);
844 } else {
845 panic!("expected WalReset");
846 }
847 }
848
849 #[test]
850 fn noop_observer_does_not_panic() {
851 let obs = NoOpWalObserver;
852 obs.on_event(&WalTelemetryEvent::WalReset {
853 new_checkpoint_seq: 0,
854 timestamp_ns: 0,
855 });
856 }
857
858 #[test]
859 fn ring_buffer_empty_drain_returns_empty() {
860 let rb = WalTelemetryRingBuffer::new(4);
861 assert!(rb.is_empty());
862 assert_eq!(rb.len(), 0);
863 let drained = rb.drain();
864 assert!(drained.is_empty());
865 }
866
867 #[test]
868 fn wal_telemetry_event_debug_clone_eq() {
869 let a = WalTelemetryEvent::FrameAppended {
870 frame_count: 3,
871 bytes_written: 12288,
872 is_commit: true,
873 timestamp_ns: 100,
874 };
875 let b = a.clone();
876 assert_eq!(a, b);
877 let c = WalTelemetryEvent::FrameAppended {
878 frame_count: 1,
879 bytes_written: 4096,
880 is_commit: false,
881 timestamp_ns: 200,
882 };
883 assert_ne!(a, c);
884 let dbg = format!("{a:?}");
885 assert!(dbg.contains("FrameAppended"));
886 }
887}