use std::sync::{Arc, Mutex};
use crate::core::{Error, Event, Result};
#[derive(Debug, Clone)]
pub struct CrashSimulationResult {
pub total_events: u64,
pub events_per_cycle: Vec<u64>,
pub total_cycles: u64,
pub crash_points: Vec<u64>,
}
#[derive(Debug, Clone, Default)]
pub struct CrashSimulationState {
pub collected_events: Arc<Mutex<Vec<Event>>>,
pub events_per_cycle: Arc<Mutex<Vec<u64>>>,
pub current_cycle: Arc<Mutex<u64>>,
pub crash_points: Arc<Mutex<Vec<u64>>>,
}
impl CrashSimulationState {
pub fn new(crash_points: Vec<u64>) -> Self {
Self {
collected_events: Arc::new(Mutex::new(Vec::new())),
events_per_cycle: Arc::new(Mutex::new(Vec::new())),
current_cycle: Arc::new(Mutex::new(0)),
crash_points: Arc::new(Mutex::new(crash_points)),
}
}
pub fn record_cycle(&self, events: Vec<Event>) -> Result<()> {
let count = events.len() as u64;
if let Ok(mut collected) = self.collected_events.lock() {
collected.extend(events);
}
if let Ok(mut per_cycle) = self.events_per_cycle.lock() {
per_cycle.push(count);
}
if let Ok(mut cycle) = self.current_cycle.lock() {
*cycle += 1;
}
Ok(())
}
pub fn should_crash_at(&self, event_count: u64) -> Result<bool> {
if let Ok(crashes) = self.crash_points.lock() {
if let Ok(cycle) = self.current_cycle.lock() {
let cycle_idx = *cycle as usize;
if cycle_idx < crashes.len() && crashes[cycle_idx] == event_count {
return Ok(true);
}
}
}
Ok(false)
}
pub fn get_collected_events(&self) -> Result<Vec<Event>> {
self.collected_events
.lock()
.map(|e| e.clone())
.map_err(|e| Error::StateError(format!("Failed to lock collected events: {}", e)))
}
pub fn get_events_per_cycle(&self) -> Result<Vec<u64>> {
self.events_per_cycle
.lock()
.map(|e| e.clone())
.map_err(|e| Error::StateError(format!("Failed to lock events per cycle: {}", e)))
}
pub fn get_total_cycles(&self) -> Result<u64> {
self.current_cycle
.lock()
.map(|c| *c)
.map_err(|e| Error::StateError(format!("Failed to lock current cycle: {}", e)))
}
pub fn finalize(&self) -> Result<CrashSimulationResult> {
let total_events = self
.collected_events
.lock()
.map(|e| e.len() as u64)
.unwrap_or(0);
let events_per_cycle = self.get_events_per_cycle()?;
let total_cycles = self.get_total_cycles()?;
let crash_points = self
.crash_points
.lock()
.map(|c| c.clone())
.unwrap_or_default();
Ok(CrashSimulationResult {
total_events,
events_per_cycle,
total_cycles,
crash_points,
})
}
}
pub struct CrashSimulationValidator;
impl CrashSimulationValidator {
pub fn validate(
result: &CrashSimulationResult,
expected_total_events: u64,
max_duplicate_rate: f64,
) -> Result<ValidationReport> {
let actual_total = result.total_events;
if actual_total < expected_total_events {
return Err(Error::SourceError(format!(
"Data loss detected: expected {}, got {}",
expected_total_events, actual_total
)));
}
let duplicates = actual_total - expected_total_events;
let duplicate_rate = if expected_total_events > 0 {
duplicates as f64 / expected_total_events as f64
} else {
0.0
};
if duplicate_rate > max_duplicate_rate {
return Err(Error::SourceError(format!(
"Excessive duplication: {:.2}% (allowed: {:.2}%)",
duplicate_rate * 100.0,
max_duplicate_rate * 100.0
)));
}
Ok(ValidationReport {
total_events_collected: actual_total,
expected_total_events,
duplicate_count: duplicates,
duplicate_rate,
total_cycles: result.total_cycles,
cycles_with_crashes: result.crash_points.len() as u64,
passed: true,
})
}
}
#[derive(Debug, Clone)]
pub struct ValidationReport {
pub total_events_collected: u64,
pub expected_total_events: u64,
pub duplicate_count: u64,
pub duplicate_rate: f64,
pub total_cycles: u64,
pub cycles_with_crashes: u64,
pub passed: bool,
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_event(id: u64) -> Event {
Event {
before: None,
after: Some(serde_json::json!({"id": id})),
op: crate::core::Operation::Insert,
source: crate::core::SourceMetadata {
source_name: "test".into(),
offset: id.to_string(),
timestamp: id,
},
ts: id,
schema: None,
table: "test_table".into(),
primary_key: None,
snapshot: None,
transaction: None,
envelope_version: crate::EVENT_ENVELOPE_VERSION,
before_is_key_only: false,
}
}
#[test]
fn test_crash_simulation_state_creation() {
let state = CrashSimulationState::new(vec![100, 250, 500]);
assert_eq!(state.get_total_cycles().unwrap(), 0);
assert_eq!(state.get_collected_events().unwrap().len(), 0);
}
#[test]
fn test_crash_simulation_record_cycle() {
let state = CrashSimulationState::new(vec![100, 250, 500]);
let events = vec![
create_test_event(1),
create_test_event(2),
create_test_event(3),
];
state.record_cycle(events).unwrap();
assert_eq!(state.get_total_cycles().unwrap(), 1);
assert_eq!(state.get_collected_events().unwrap().len(), 3);
assert_eq!(state.get_events_per_cycle().unwrap(), vec![3]);
}
#[test]
fn test_crash_simulation_should_crash_at() {
let state = CrashSimulationState::new(vec![100, 250, 500]);
assert!(!state.should_crash_at(99).unwrap());
assert!(state.should_crash_at(100).unwrap());
state.record_cycle(vec![]).unwrap();
assert!(!state.should_crash_at(100).unwrap());
assert!(state.should_crash_at(250).unwrap());
}
#[test]
fn test_crash_simulation_validator_no_loss() {
let result = CrashSimulationResult {
total_events: 1000,
events_per_cycle: vec![100, 200, 300, 400],
total_cycles: 4,
crash_points: vec![100, 250, 500],
};
let report = CrashSimulationValidator::validate(&result, 1000, 0.05).unwrap();
assert!(report.passed);
assert_eq!(report.duplicate_count, 0);
}
#[test]
fn test_crash_simulation_validator_with_acceptable_duplicates() {
let result = CrashSimulationResult {
total_events: 1050,
events_per_cycle: vec![105, 245, 350, 350],
total_cycles: 4,
crash_points: vec![100, 250, 500],
};
let report = CrashSimulationValidator::validate(&result, 1000, 0.1).unwrap();
assert!(report.passed);
assert_eq!(report.duplicate_count, 50);
assert!(report.duplicate_rate <= 0.1);
}
#[test]
fn test_crash_simulation_validator_data_loss_detected() {
let result = CrashSimulationResult {
total_events: 950,
events_per_cycle: vec![100, 200, 300, 300],
total_cycles: 4,
crash_points: vec![100, 250, 500],
};
let report = CrashSimulationValidator::validate(&result, 1000, 0.05);
assert!(report.is_err());
}
#[test]
fn test_crash_simulation_validator_excessive_duplicates() {
let result = CrashSimulationResult {
total_events: 1200,
events_per_cycle: vec![120, 280, 400, 400],
total_cycles: 4,
crash_points: vec![100, 250, 500],
};
let report = CrashSimulationValidator::validate(&result, 1000, 0.05);
assert!(report.is_err());
}
}