pub mod determinism;
pub mod diagnosis;
pub mod minimize;
pub mod replay;
mod stats;
pub use determinism::{DeterminismResult, compare_traces, verify_determinism};
pub use diagnosis::{CausalEvent, DiagnosisReport, FaultCause, ViolationInfo, diagnose};
pub use minimize::{MinimizedTrace, minimize_faults, minimize_ticks};
pub use stats::SimStats;
use serde::{Deserialize, Serialize};
use vortex_core::NodeId;
pub type EventId = u64;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceEvent {
pub event_id: EventId,
pub tick: u64,
pub node_id: NodeId,
pub kind: TraceEventKind,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TraceEventKind {
MessageSent {
to: NodeId,
msg_type: String,
size_bytes: usize,
},
MessageDelivered {
from: NodeId,
msg_type: String,
size_bytes: usize,
},
MessageDropped {
from: NodeId,
to: NodeId,
reason: String,
},
TimerFired { timer_type: String },
StateTransition {
from_state: String,
to_state: String,
metadata: String,
},
FaultInjected { fault_type: String, details: String },
FaultHealed { fault_type: String, details: String },
StorageOp { op_type: String, key_count: usize },
Custom { tag: String, data: String },
}
pub struct SimTrace {
events: Vec<TraceEvent>,
next_event_id: EventId,
}
impl SimTrace {
pub fn new() -> Self {
Self {
events: Vec::new(),
next_event_id: 0,
}
}
pub fn record(&mut self, tick: u64, node_id: NodeId, kind: TraceEventKind) {
self.events.push(TraceEvent {
event_id: self.next_event_id,
tick,
node_id,
kind,
});
self.next_event_id += 1;
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn events(&self) -> &[TraceEvent] {
&self.events
}
pub fn events_for_node(&self, node_id: NodeId) -> Vec<&TraceEvent> {
self.events
.iter()
.filter(|e| e.node_id == node_id)
.collect()
}
pub fn events_matching<F: Fn(&TraceEventKind) -> bool>(&self, f: F) -> Vec<&TraceEvent> {
self.events.iter().filter(|e| f(&e.kind)).collect()
}
pub fn events_between(&self, start_tick: u64, end_tick: u64) -> Vec<&TraceEvent> {
self.events
.iter()
.filter(|e| e.tick >= start_tick && e.tick <= end_tick)
.collect()
}
pub fn last_n(&self, n: usize) -> &[TraceEvent] {
let start = self.events.len().saturating_sub(n);
&self.events[start..]
}
pub fn causal_chain(&self, event_id: EventId) -> Vec<&TraceEvent> {
let mut chain = Vec::new();
let mut current_id = event_id;
let mut visited_nodes: std::collections::HashSet<NodeId> = std::collections::HashSet::new();
let start = match self.events.iter().find(|e| e.event_id == current_id) {
Some(e) => e,
None => return chain,
};
chain.push(start);
visited_nodes.insert(start.node_id);
let mut current_tick = start.tick;
for event in self.events.iter().rev() {
if event.tick > current_tick || event.event_id >= current_id {
continue;
}
if visited_nodes.contains(&event.node_id) {
chain.push(event);
current_id = event.event_id;
current_tick = event.tick;
if let TraceEventKind::MessageDelivered { from, .. } = &event.kind {
visited_nodes.insert(*from);
}
if chain.len() >= 100 {
break;
}
}
}
chain
}
pub fn dump_text(&self) -> String {
let mut out = String::new();
for event in &self.events {
let kind_str = match &event.kind {
TraceEventKind::MessageSent {
to,
msg_type,
size_bytes,
} => format!("MSG_SENT to={to} type={msg_type} size={size_bytes}"),
TraceEventKind::MessageDelivered {
from,
msg_type,
size_bytes,
} => format!("MSG_RECV from={from} type={msg_type} size={size_bytes}"),
TraceEventKind::MessageDropped { from, to, reason } => {
format!("MSG_DROP {from}->{to} reason={reason}")
}
TraceEventKind::TimerFired { timer_type } => format!("TIMER {timer_type}"),
TraceEventKind::StateTransition {
from_state,
to_state,
metadata,
} => format!("STATE {from_state}->{to_state} {metadata}"),
TraceEventKind::FaultInjected {
fault_type,
details,
} => format!("FAULT+ {fault_type}: {details}"),
TraceEventKind::FaultHealed {
fault_type,
details,
} => format!("FAULT- {fault_type}: {details}"),
TraceEventKind::StorageOp { op_type, key_count } => {
format!("STORAGE {op_type} keys={key_count}")
}
TraceEventKind::Custom { tag, data } => format!("CUSTOM {tag}: {data}"),
};
out.push_str(&format!(
"[t={:06} e={:06} n={}] {}\n",
event.tick, event.event_id, event.node_id, kind_str
));
}
out
}
pub fn dump_json(&self) -> String {
serde_json::to_string_pretty(&self.events).unwrap_or_else(|_| "[]".to_string())
}
pub fn dump_jsonl(&self) -> String {
let mut out = String::new();
for event in &self.events {
if let Ok(json) = serde_json::to_string(event) {
out.push_str(&json);
out.push('\n');
}
}
out
}
pub fn clear(&mut self) {
self.events.clear();
self.next_event_id = 0;
}
}
impl Default for SimTrace {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_record_and_len() {
let mut trace = SimTrace::new();
assert!(trace.is_empty());
trace.record(
1,
1,
TraceEventKind::TimerFired {
timer_type: "election".into(),
},
);
trace.record(
2,
2,
TraceEventKind::StorageOp {
op_type: "put".into(),
key_count: 3,
},
);
assert_eq!(trace.len(), 2);
assert_eq!(trace.events()[0].event_id, 0);
assert_eq!(trace.events()[1].event_id, 1);
}
#[test]
fn test_events_for_node() {
let mut trace = SimTrace::new();
trace.record(
1,
1,
TraceEventKind::TimerFired {
timer_type: "a".into(),
},
);
trace.record(
2,
2,
TraceEventKind::TimerFired {
timer_type: "b".into(),
},
);
trace.record(
3,
1,
TraceEventKind::TimerFired {
timer_type: "c".into(),
},
);
let node1 = trace.events_for_node(1);
assert_eq!(node1.len(), 2);
}
#[test]
fn test_events_matching() {
let mut trace = SimTrace::new();
trace.record(
1,
1,
TraceEventKind::TimerFired {
timer_type: "x".into(),
},
);
trace.record(
2,
1,
TraceEventKind::StorageOp {
op_type: "put".into(),
key_count: 1,
},
);
trace.record(
3,
2,
TraceEventKind::TimerFired {
timer_type: "y".into(),
},
);
let timers = trace.events_matching(|k| matches!(k, TraceEventKind::TimerFired { .. }));
assert_eq!(timers.len(), 2);
}
#[test]
fn test_dump_text() {
let mut trace = SimTrace::new();
trace.record(
10,
1,
TraceEventKind::FaultInjected {
fault_type: "partition".into(),
details: "1<->2".into(),
},
);
let text = trace.dump_text();
assert!(text.contains("FAULT+ partition"));
assert!(text.contains("[t=000010"));
}
#[test]
fn test_json_roundtrip() {
let mut trace = SimTrace::new();
trace.record(
1,
1,
TraceEventKind::MessageSent {
to: 2,
msg_type: "AppendEntries".into(),
size_bytes: 128,
},
);
let json = trace.dump_json();
let parsed: Vec<TraceEvent> = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].tick, 1);
}
#[test]
fn test_deterministic_traces() {
fn build() -> String {
let mut trace = SimTrace::new();
trace.record(
1,
1,
TraceEventKind::StateTransition {
from_state: "Follower".into(),
to_state: "Leader".into(),
metadata: "term=1".into(),
},
);
trace.dump_json()
}
assert_eq!(build(), build());
}
#[test]
fn test_events_between() {
let mut trace = SimTrace::new();
for i in 0..10 {
trace.record(
i * 10,
1,
TraceEventKind::TimerFired {
timer_type: format!("t{i}"),
},
);
}
let between = trace.events_between(20, 50);
assert_eq!(between.len(), 4); }
#[test]
fn test_last_n() {
let mut trace = SimTrace::new();
for i in 0..10 {
trace.record(
i,
1,
TraceEventKind::TimerFired {
timer_type: format!("t{i}"),
},
);
}
let last3 = trace.last_n(3);
assert_eq!(last3.len(), 3);
assert_eq!(last3[0].tick, 7);
}
#[test]
fn test_clear() {
let mut trace = SimTrace::new();
trace.record(
1,
1,
TraceEventKind::TimerFired {
timer_type: "x".into(),
},
);
trace.clear();
assert!(trace.is_empty());
}
}