use serde::Serialize;
use crate::checkpoint::CheckpointMode;
use crate::checksum::{
ChecksumFailureKind, RecoveryAction, WalChainInvalidReason, WalFecRepairOutcome,
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub enum WalTelemetryEvent {
FrameAppended {
frame_count: u32,
bytes_written: u64,
is_commit: bool,
timestamp_ns: u64,
},
ReplayStarted {
valid_frames: usize,
replayable_frames: usize,
timestamp_ns: u64,
},
ReplayCompleted {
frames_replayed: usize,
duration_us: u64,
timestamp_ns: u64,
},
CheckpointStarted {
mode: CheckpointMode,
frames_to_backfill: u32,
timestamp_ns: u64,
},
CheckpointCompleted {
mode: CheckpointMode,
frames_backfilled: u32,
wal_reset: bool,
duration_us: u64,
timestamp_ns: u64,
},
WalReset {
new_checkpoint_seq: u32,
timestamp_ns: u64,
},
ChecksumFailure {
frame_index: usize,
kind: ChecksumFailureKind,
action: RecoveryAction,
timestamp_ns: u64,
},
ChainValidated {
total_frames: usize,
valid: bool,
first_invalid_frame: Option<usize>,
reason: Option<WalChainInvalidReason>,
timestamp_ns: u64,
},
FecRepairAttempted {
outcome: WalFecRepairOutcome,
symbols_available: usize,
duration_us: u64,
timestamp_ns: u64,
},
GroupCommitFlushed {
batch_size: u32,
total_frames: u32,
latency_us: u64,
timestamp_ns: u64,
},
}
impl WalTelemetryEvent {
#[must_use]
pub fn timestamp_ns(&self) -> u64 {
match self {
Self::FrameAppended { timestamp_ns, .. }
| Self::ReplayStarted { timestamp_ns, .. }
| Self::ReplayCompleted { timestamp_ns, .. }
| Self::CheckpointStarted { timestamp_ns, .. }
| Self::CheckpointCompleted { timestamp_ns, .. }
| Self::WalReset { timestamp_ns, .. }
| Self::ChecksumFailure { timestamp_ns, .. }
| Self::ChainValidated { timestamp_ns, .. }
| Self::FecRepairAttempted { timestamp_ns, .. }
| Self::GroupCommitFlushed { timestamp_ns, .. } => *timestamp_ns,
}
}
#[must_use]
pub fn kind_str(&self) -> &'static str {
match self {
Self::FrameAppended { .. } => "frame_appended",
Self::ReplayStarted { .. } => "replay_started",
Self::ReplayCompleted { .. } => "replay_completed",
Self::CheckpointStarted { .. } => "checkpoint_started",
Self::CheckpointCompleted { .. } => "checkpoint_completed",
Self::WalReset { .. } => "wal_reset",
Self::ChecksumFailure { .. } => "checksum_failure",
Self::ChainValidated { .. } => "chain_validated",
Self::FecRepairAttempted { .. } => "fec_repair_attempted",
Self::GroupCommitFlushed { .. } => "group_commit_flushed",
}
}
}
pub trait WalTelemetryObserver: Send + Sync {
fn on_event(&self, event: &WalTelemetryEvent);
}
pub struct NoOpWalObserver;
impl WalTelemetryObserver for NoOpWalObserver {
#[inline(always)]
fn on_event(&self, _event: &WalTelemetryEvent) {}
}
pub struct WalTelemetryRingBuffer {
events: fsqlite_types::sync_primitives::Mutex<WalRingBufferInner>,
}
struct WalRingBufferInner {
buf: Vec<WalTelemetryEvent>,
capacity: usize,
write_pos: usize,
count: usize,
}
impl WalTelemetryRingBuffer {
#[must_use]
pub fn new(capacity: usize) -> Self {
Self {
events: fsqlite_types::sync_primitives::Mutex::new(WalRingBufferInner {
buf: Vec::with_capacity(capacity),
capacity,
write_pos: 0,
count: 0,
}),
}
}
#[must_use]
pub fn drain(&self) -> Vec<WalTelemetryEvent> {
let inner = self.events.lock();
let n = inner.count.min(inner.capacity);
let mut result = Vec::with_capacity(n);
if n == 0 {
return result;
}
let start = if inner.count >= inner.capacity {
inner.write_pos
} else {
0
};
for i in 0..n {
let idx = (start + i) % inner.capacity;
result.push(inner.buf[idx].clone());
}
result
}
#[must_use]
pub fn len(&self) -> usize {
let inner = self.events.lock();
inner.count.min(inner.capacity)
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl WalTelemetryObserver for WalTelemetryRingBuffer {
fn on_event(&self, event: &WalTelemetryEvent) {
let mut inner = self.events.lock();
let pos = inner.write_pos;
if inner.buf.len() < inner.capacity {
inner.buf.push(event.clone());
} else {
inner.buf[pos] = event.clone();
}
inner.write_pos = (pos + 1) % inner.capacity;
inner.count += 1;
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct WalTelemetrySnapshot {
pub wal: crate::metrics::WalMetricsSnapshot,
pub fec_repair: crate::metrics::WalFecRepairCountersSnapshot,
pub recovery: crate::metrics::WalRecoveryCountersSnapshot,
pub group_commit: crate::metrics::GroupCommitMetricsSnapshot,
pub consolidation: crate::group_commit::ConsolidationMetricsSnapshot,
}
#[must_use]
pub fn wal_telemetry_snapshot() -> WalTelemetrySnapshot {
WalTelemetrySnapshot {
wal: crate::metrics::GLOBAL_WAL_METRICS.snapshot(),
fec_repair: crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.snapshot(),
recovery: crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.snapshot(),
group_commit: crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.snapshot(),
consolidation: crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.snapshot(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::checkpoint::CheckpointMode;
use crate::checksum::{
ChecksumFailureKind, RecoveryAction, WalChainInvalidReason, WalFecRepairOutcome,
};
struct ResetTelemetryGlobals;
impl Drop for ResetTelemetryGlobals {
fn drop(&mut self) {
crate::metrics::GLOBAL_WAL_METRICS.reset();
crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.reset();
crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.reset();
crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.reset();
crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.reset();
}
}
fn all_event_variants() -> Vec<WalTelemetryEvent> {
vec![
WalTelemetryEvent::FrameAppended {
frame_count: 3,
bytes_written: 12_360,
is_commit: true,
timestamp_ns: 1_000_000,
},
WalTelemetryEvent::ReplayStarted {
valid_frames: 10,
replayable_frames: 8,
timestamp_ns: 2_000_000,
},
WalTelemetryEvent::ReplayCompleted {
frames_replayed: 8,
duration_us: 500,
timestamp_ns: 3_000_000,
},
WalTelemetryEvent::CheckpointStarted {
mode: CheckpointMode::Passive,
frames_to_backfill: 20,
timestamp_ns: 4_000_000,
},
WalTelemetryEvent::CheckpointCompleted {
mode: CheckpointMode::Restart,
frames_backfilled: 20,
wal_reset: true,
duration_us: 3500,
timestamp_ns: 5_000_000,
},
WalTelemetryEvent::WalReset {
new_checkpoint_seq: 7,
timestamp_ns: 6_000_000,
},
WalTelemetryEvent::ChecksumFailure {
frame_index: 4,
kind: ChecksumFailureKind::WalFrameChecksumMismatch,
action: RecoveryAction::AttemptWalFecRepair,
timestamp_ns: 7_000_000,
},
WalTelemetryEvent::ChainValidated {
total_frames: 10,
valid: false,
first_invalid_frame: Some(4),
reason: Some(WalChainInvalidReason::FrameChecksumMismatch),
timestamp_ns: 8_000_000,
},
WalTelemetryEvent::FecRepairAttempted {
outcome: WalFecRepairOutcome::Repaired,
symbols_available: 12,
duration_us: 200,
timestamp_ns: 9_000_000,
},
WalTelemetryEvent::GroupCommitFlushed {
batch_size: 4,
total_frames: 16,
latency_us: 1200,
timestamp_ns: 10_000_000,
},
]
}
#[test]
fn conformance_every_variant_has_monotonic_timestamp() {
let events = all_event_variants();
let mut prev_ts = 0u64;
for event in &events {
let ts = event.timestamp_ns();
assert!(
ts > prev_ts,
"timestamp must be monotonic: {} <= {} for {:?}",
ts,
prev_ts,
event.kind_str()
);
prev_ts = ts;
}
}
#[test]
fn conformance_all_events_serialize_to_json() {
for event in all_event_variants() {
let json = serde_json::to_string(&event)
.unwrap_or_else(|e| panic!("failed to serialize {:?}: {e}", event.kind_str()));
assert!(
!json.is_empty(),
"serialized JSON must not be empty for {}",
event.kind_str()
);
let _: serde_json::Value = serde_json::from_str(&json)
.unwrap_or_else(|e| panic!("JSON not parseable for {}: {e}", event.kind_str()));
}
}
#[test]
fn conformance_wal_metrics_snapshot_serializes() {
let snap = crate::metrics::WalMetrics::new();
snap.record_frame_write(4096);
snap.record_checkpoint(5, 2000);
snap.record_wal_reset();
let s = snap.snapshot();
let json = serde_json::to_string(&s).expect("WalMetricsSnapshot must serialize");
assert!(json.contains("frames_written_total"));
assert!(json.contains("checkpoint_count"));
}
#[test]
fn conformance_fec_repair_snapshot_serializes() {
let c = crate::metrics::WalFecRepairCounters::new();
c.record_repair(true, 500);
c.record_encode();
let s = c.snapshot();
let json = serde_json::to_string(&s).expect("WalFecRepairCountersSnapshot must serialize");
assert!(json.contains("repairs_succeeded"));
assert!(json.contains("encode_ops"));
}
#[test]
fn conformance_recovery_snapshot_serializes() {
let r = crate::metrics::WalRecoveryCounters::new();
r.record_recovery(10, 2, 1);
let s = r.snapshot();
let json = serde_json::to_string(&s).expect("WalRecoveryCountersSnapshot must serialize");
assert!(json.contains("recovery_frames_total"));
assert!(json.contains("corruption_detected_total"));
}
#[test]
fn conformance_group_commit_snapshot_serializes() {
let g = crate::metrics::GroupCommitMetrics::new();
g.record_group_commit(3, 1000);
g.record_submission();
let s = g.snapshot();
let json = serde_json::to_string(&s).expect("GroupCommitMetricsSnapshot must serialize");
assert!(json.contains("group_commits_total"));
assert!(json.contains("submissions_total"));
}
#[test]
fn conformance_composite_snapshot_serializes() {
let snap = wal_telemetry_snapshot();
let json = serde_json::to_string(&snap).expect("WalTelemetrySnapshot must serialize");
assert!(json.contains("wal"));
assert!(json.contains("fec_repair"));
assert!(json.contains("recovery"));
assert!(json.contains("group_commit"));
assert!(json.contains("consolidation"));
}
#[test]
fn conformance_kind_str_unique_per_variant() {
let events = all_event_variants();
let kinds: Vec<&str> = events.iter().map(|e| e.kind_str()).collect();
for k in &kinds {
assert!(!k.is_empty(), "kind_str must not be empty");
}
let mut sorted = kinds.clone();
sorted.sort();
sorted.dedup();
assert_eq!(
kinds.len(),
sorted.len(),
"kind_str must be unique per variant"
);
}
#[test]
fn conformance_variant_count_matches_schema() {
assert_eq!(
all_event_variants().len(),
10,
"WalTelemetryEvent must have exactly 10 variants (update all_event_variants if adding)"
);
}
#[test]
fn conformance_checksum_failure_kinds_serialize() {
let kinds = [
ChecksumFailureKind::WalFrameChecksumMismatch,
ChecksumFailureKind::Xxh3PageChecksumMismatch,
ChecksumFailureKind::Crc32cSymbolMismatch,
ChecksumFailureKind::DbFileCorruption,
];
for kind in kinds {
let json = serde_json::to_string(&kind)
.unwrap_or_else(|e| panic!("ChecksumFailureKind::{kind:?} serialize failed: {e}"));
assert!(!json.is_empty());
}
}
#[test]
fn conformance_recovery_actions_serialize() {
let actions = [
RecoveryAction::AttemptWalFecRepair,
RecoveryAction::TruncateWalAtFirstInvalidFrame,
RecoveryAction::EvictCacheAndRetryFromWal,
RecoveryAction::ExcludeCorruptedSymbolAndContinue,
RecoveryAction::ReportPersistentCorruption,
];
for action in actions {
let json = serde_json::to_string(&action)
.unwrap_or_else(|e| panic!("RecoveryAction::{action:?} serialize failed: {e}"));
assert!(!json.is_empty());
}
}
#[test]
fn conformance_wal_chain_invalid_reasons_serialize() {
let reasons = [
WalChainInvalidReason::HeaderChecksumMismatch,
WalChainInvalidReason::TruncatedFrame,
WalChainInvalidReason::SaltMismatch,
WalChainInvalidReason::FrameSaltMismatch,
WalChainInvalidReason::FrameChecksumMismatch,
];
for reason in reasons {
let json = serde_json::to_string(&reason).unwrap_or_else(|e| {
panic!("WalChainInvalidReason::{reason:?} serialize failed: {e}")
});
assert!(!json.is_empty());
}
}
#[test]
fn conformance_checkpoint_modes_serialize() {
let modes = [
CheckpointMode::Passive,
CheckpointMode::Full,
CheckpointMode::Restart,
CheckpointMode::Truncate,
];
for mode in modes {
let json = serde_json::to_string(&mode)
.unwrap_or_else(|e| panic!("CheckpointMode::{mode:?} serialize failed: {e}"));
assert!(!json.is_empty());
}
}
#[test]
fn conformance_fec_repair_outcomes_serialize() {
let outcomes = [
WalFecRepairOutcome::Repaired,
WalFecRepairOutcome::InsufficientSymbols,
WalFecRepairOutcome::SourceHashMismatch,
];
for outcome in outcomes {
let json = serde_json::to_string(&outcome).unwrap_or_else(|e| {
panic!("WalFecRepairOutcome::{outcome:?} serialize failed: {e}")
});
assert!(!json.is_empty());
}
}
#[test]
fn noop_observer_compiles_away() {
let obs = NoOpWalObserver;
let event = WalTelemetryEvent::FrameAppended {
frame_count: 1,
bytes_written: 4120,
is_commit: false,
timestamp_ns: 42,
};
obs.on_event(&event);
}
#[test]
fn ring_buffer_stores_events() {
let rb = WalTelemetryRingBuffer::new(4);
assert!(rb.is_empty());
for (i, event) in all_event_variants().into_iter().enumerate().take(3) {
let _ = i;
rb.on_event(&event);
}
assert_eq!(rb.len(), 3);
let drained = rb.drain();
assert_eq!(drained.len(), 3);
}
#[test]
fn ring_buffer_wraps_at_capacity() {
let rb = WalTelemetryRingBuffer::new(3);
let events = all_event_variants();
for event in events.iter().take(5) {
rb.on_event(event);
}
assert_eq!(rb.len(), 3);
let drained = rb.drain();
assert_eq!(drained.len(), 3);
assert_eq!(drained[0].kind_str(), events[2].kind_str());
assert_eq!(drained[1].kind_str(), events[3].kind_str());
assert_eq!(drained[2].kind_str(), events[4].kind_str());
}
#[test]
fn ring_buffer_drain_preserves_chronological_order() {
let rb = WalTelemetryRingBuffer::new(10);
let events = all_event_variants();
for event in &events {
rb.on_event(event);
}
let drained = rb.drain();
for pair in drained.windows(2) {
assert!(
pair[0].timestamp_ns() <= pair[1].timestamp_ns(),
"drain must be chronological"
);
}
}
#[test]
fn composite_snapshot_captures_all_globals() {
let _group_metrics_guard = crate::metrics::GLOBAL_GROUP_COMMIT_METRICS_TEST_LOCK
.lock()
.expect("global group commit metrics test lock poisoned");
let _guard = crate::group_commit::GLOBAL_CONSOLIDATION_METRICS_TEST_LOCK
.lock()
.expect("global consolidation metrics test lock poisoned");
let _reset = ResetTelemetryGlobals;
crate::metrics::GLOBAL_WAL_METRICS.reset();
crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.reset();
crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.reset();
crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.reset();
crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.reset();
crate::metrics::GLOBAL_WAL_METRICS.record_frame_write(4096);
crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.record_repair(true, 100);
crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.record_recovery(5, 1, 1);
crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.record_group_commit(2, 500);
crate::group_commit::GLOBAL_CONSOLIDATION_METRICS
.record_phase_timing(10, 20, 30, true, 40, 50, 60, 70, 80, 0);
let snap = wal_telemetry_snapshot();
assert_eq!(snap.wal.frames_written_total, 1);
assert_eq!(snap.fec_repair.repairs_succeeded, 1);
assert_eq!(snap.recovery.recovery_frames_total, 5);
assert_eq!(snap.group_commit.group_commits_total, 1);
assert_eq!(snap.consolidation.total_commits(), 1);
assert_eq!(snap.consolidation.inner_lock_wait_us_total, 50);
}
#[test]
fn ring_buffer_capacity_one_keeps_latest() {
let rb = WalTelemetryRingBuffer::new(1);
let events = all_event_variants();
for event in &events {
rb.on_event(event);
}
assert_eq!(rb.len(), 1);
let drained = rb.drain();
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].kind_str(), events.last().unwrap().kind_str());
}
#[test]
fn ring_buffer_drain_on_empty_returns_empty_vec() {
let rb = WalTelemetryRingBuffer::new(8);
assert!(rb.is_empty());
let drained = rb.drain();
assert!(drained.is_empty());
}
#[test]
fn ring_buffer_is_not_empty_after_overflow() {
let rb = WalTelemetryRingBuffer::new(2);
for event in all_event_variants().iter().take(5) {
rb.on_event(event);
}
assert!(!rb.is_empty());
assert_eq!(rb.len(), 2);
}
#[test]
fn event_clone_and_equality() {
let event = WalTelemetryEvent::FrameAppended {
frame_count: 5,
bytes_written: 20_480,
is_commit: true,
timestamp_ns: 999,
};
let cloned = event.clone();
assert_eq!(event, cloned);
let different = WalTelemetryEvent::FrameAppended {
frame_count: 5,
bytes_written: 20_480,
is_commit: false,
timestamp_ns: 999,
};
assert_ne!(event, different);
}
#[test]
fn observer_trait_is_object_safe() {
let obs: Box<dyn WalTelemetryObserver> = Box::new(NoOpWalObserver);
let event = WalTelemetryEvent::WalReset {
new_checkpoint_seq: 1,
timestamp_ns: 42,
};
obs.on_event(&event);
}
#[test]
fn event_serialize_produces_valid_json() {
let event = WalTelemetryEvent::FrameAppended {
frame_count: 2,
bytes_written: 8240,
is_commit: true,
timestamp_ns: 42,
};
let json = serde_json::to_string(&event).expect("serialize");
assert!(json.contains("FrameAppended"));
assert!(json.contains("8240"));
assert!(json.contains("true"));
}
#[test]
fn kind_str_covers_all_ten_variants() {
let events = all_event_variants();
assert_eq!(
events.len(),
10,
"all_event_variants must cover 10 variants"
);
let kinds: Vec<&str> = events.iter().map(|e| e.kind_str()).collect();
let unique: std::collections::HashSet<&str> = kinds.iter().copied().collect();
assert_eq!(unique.len(), 10, "all kind_str values must be distinct");
}
#[test]
fn wal_telemetry_snapshot_debug_and_serialize() {
let _group_metrics_guard = crate::metrics::GLOBAL_GROUP_COMMIT_METRICS_TEST_LOCK
.lock()
.expect("global group commit metrics test lock poisoned");
let _guard = crate::group_commit::GLOBAL_CONSOLIDATION_METRICS_TEST_LOCK
.lock()
.expect("lock");
let _reset = ResetTelemetryGlobals;
crate::metrics::GLOBAL_WAL_METRICS.reset();
crate::metrics::GLOBAL_WAL_FEC_REPAIR_METRICS.reset();
crate::metrics::GLOBAL_WAL_RECOVERY_METRICS.reset();
crate::metrics::GLOBAL_GROUP_COMMIT_METRICS.reset();
crate::group_commit::GLOBAL_CONSOLIDATION_METRICS.reset();
let snap = wal_telemetry_snapshot();
let dbg = format!("{snap:?}");
assert!(dbg.contains("WalTelemetrySnapshot"));
let json = serde_json::to_string(&snap).expect("serialize snapshot");
assert!(json.contains("wal"));
assert!(json.contains("fec_repair"));
}
#[test]
fn ring_buffer_drain_single_event() {
let rb = WalTelemetryRingBuffer::new(8);
let event = WalTelemetryEvent::WalReset {
new_checkpoint_seq: 99,
timestamp_ns: 1,
};
rb.on_event(&event);
assert_eq!(rb.len(), 1);
let drained = rb.drain();
assert_eq!(drained.len(), 1);
assert_eq!(drained[0], event);
}
#[test]
fn ring_buffer_wraps_at_capacity_with_uniform_events() {
let rb = WalTelemetryRingBuffer::new(2);
for i in 0..5u64 {
rb.on_event(&WalTelemetryEvent::WalReset {
new_checkpoint_seq: i as u32,
timestamp_ns: i,
});
}
assert_eq!(rb.len(), 2);
let drained = rb.drain();
assert_eq!(drained.len(), 2);
if let WalTelemetryEvent::WalReset {
new_checkpoint_seq, ..
} = &drained[0]
{
assert_eq!(*new_checkpoint_seq, 3);
} else {
panic!("expected WalReset");
}
}
#[test]
fn noop_observer_does_not_panic() {
let obs = NoOpWalObserver;
obs.on_event(&WalTelemetryEvent::WalReset {
new_checkpoint_seq: 0,
timestamp_ns: 0,
});
}
#[test]
fn ring_buffer_empty_drain_returns_empty() {
let rb = WalTelemetryRingBuffer::new(4);
assert!(rb.is_empty());
assert_eq!(rb.len(), 0);
let drained = rb.drain();
assert!(drained.is_empty());
}
#[test]
fn wal_telemetry_event_debug_clone_eq() {
let a = WalTelemetryEvent::FrameAppended {
frame_count: 3,
bytes_written: 12288,
is_commit: true,
timestamp_ns: 100,
};
let b = a.clone();
assert_eq!(a, b);
let c = WalTelemetryEvent::FrameAppended {
frame_count: 1,
bytes_written: 4096,
is_commit: false,
timestamp_ns: 200,
};
assert_ne!(a, c);
let dbg = format!("{a:?}");
assert!(dbg.contains("FrameAppended"));
}
}