#![allow(dead_code)]
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanSnapshot {
pub export_name: String,
pub base_query: String,
pub strategy: String,
pub format: String,
pub compression: String,
pub destination_type: String,
pub tuning_profile: String,
pub batch_size: usize,
pub validate: bool,
pub reconcile: bool,
pub resume: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RunEvent {
PlanResolved(PlanSnapshot),
PlanWarning { rule: String, message: String },
FileWritten {
file_name: String,
rows: i64,
bytes: u64,
part_index: usize,
},
ChunkStarted {
chunk_index: i64,
start_key: String,
end_key: String,
},
ChunkCompleted {
chunk_index: i64,
rows: i64,
file_name: Option<String>,
},
ChunkFailed {
chunk_index: i64,
error: String,
attempt: i64,
},
RetryAttempted {
attempt: u32,
reason: String,
backoff_ms: u64,
},
QualityIssue { severity: String, message: String },
SchemaChanged {
added: Vec<String>,
removed: Vec<String>,
type_changed: Vec<(String, String, String)>,
},
Warning { context: String, message: String },
ValidationResult { passed: bool },
ReconciliationResult {
source_count: i64,
exported_rows: i64,
matched: bool,
},
RunCompleted {
status: String,
error_message: Option<String>,
duration_ms: i64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JournalEntry {
pub recorded_at: DateTime<Utc>,
pub event: RunEvent,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunJournal {
pub run_id: String,
pub export_name: String,
pub entries: Vec<JournalEntry>,
}
impl RunJournal {
pub fn new(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
Self {
run_id: run_id.into(),
export_name: export_name.into(),
entries: Vec::new(),
}
}
pub fn record(&mut self, event: RunEvent) {
self.entries.push(JournalEntry {
recorded_at: Utc::now(),
event,
});
}
pub fn plan_snapshot(&self) -> Option<&PlanSnapshot> {
self.entries.iter().find_map(|e| {
if let RunEvent::PlanResolved(s) = &e.event {
Some(s)
} else {
None
}
})
}
pub fn files(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| matches!(e.event, RunEvent::FileWritten { .. }))
.collect()
}
pub fn retries(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| matches!(e.event, RunEvent::RetryAttempted { .. }))
.collect()
}
pub fn chunk_events(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| {
matches!(
e.event,
RunEvent::ChunkStarted { .. }
| RunEvent::ChunkCompleted { .. }
| RunEvent::ChunkFailed { .. }
)
})
.collect()
}
pub fn quality_issues(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| matches!(e.event, RunEvent::QualityIssue { .. }))
.collect()
}
pub fn schema_changes(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| matches!(e.event, RunEvent::SchemaChanged { .. }))
.collect()
}
pub fn warnings(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| {
matches!(
e.event,
RunEvent::Warning { .. } | RunEvent::PlanWarning { .. }
)
})
.collect()
}
pub fn final_outcome(&self) -> Option<&JournalEntry> {
self.entries
.iter()
.rev()
.find(|e| matches!(e.event, RunEvent::RunCompleted { .. }))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn journal() -> RunJournal {
RunJournal::new("run-1", "orders")
}
fn snap() -> PlanSnapshot {
PlanSnapshot {
export_name: "orders".into(),
base_query: "SELECT 1".into(),
strategy: "snapshot".into(),
format: "parquet".into(),
compression: "zstd".into(),
destination_type: "local".into(),
tuning_profile: "balanced".into(),
batch_size: 1000,
validate: false,
reconcile: false,
resume: false,
}
}
#[test]
fn new_journal_is_empty() {
let j = journal();
assert_eq!(j.run_id, "run-1");
assert_eq!(j.export_name, "orders");
assert!(j.entries.is_empty());
}
#[test]
fn record_appends_entry() {
let mut j = journal();
j.record(RunEvent::Warning {
context: "test".into(),
message: "w".into(),
});
assert_eq!(j.entries.len(), 1);
}
#[test]
fn record_multiple_entries_in_order() {
let mut j = journal();
j.record(RunEvent::Warning {
context: "a".into(),
message: "1".into(),
});
j.record(RunEvent::Warning {
context: "b".into(),
message: "2".into(),
});
assert_eq!(j.entries.len(), 2);
}
#[test]
fn plan_snapshot_none_when_empty() {
assert!(journal().plan_snapshot().is_none());
}
#[test]
fn plan_snapshot_returns_first_resolved() {
let mut j = journal();
j.record(RunEvent::PlanResolved(snap()));
let s = j.plan_snapshot().unwrap();
assert_eq!(s.export_name, "orders");
assert_eq!(s.batch_size, 1000);
}
#[test]
fn files_empty_when_no_file_written() {
let mut j = journal();
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
assert!(j.files().is_empty());
}
#[test]
fn files_returns_file_written_entries() {
let mut j = journal();
j.record(RunEvent::FileWritten {
file_name: "f.parquet".into(),
rows: 100,
bytes: 4096,
part_index: 0,
});
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
j.record(RunEvent::FileWritten {
file_name: "g.parquet".into(),
rows: 50,
bytes: 2048,
part_index: 1,
});
assert_eq!(j.files().len(), 2);
}
#[test]
fn retries_empty_when_none_recorded() {
assert!(journal().retries().is_empty());
}
#[test]
fn retries_returns_retry_attempted_entries() {
let mut j = journal();
j.record(RunEvent::RetryAttempted {
attempt: 1,
reason: "timeout".into(),
backoff_ms: 500,
});
j.record(RunEvent::RetryAttempted {
attempt: 2,
reason: "timeout".into(),
backoff_ms: 1000,
});
assert_eq!(j.retries().len(), 2);
}
#[test]
fn chunk_events_collects_all_three_variant_types() {
let mut j = journal();
j.record(RunEvent::ChunkStarted {
chunk_index: 0,
start_key: "0".into(),
end_key: "100".into(),
});
j.record(RunEvent::ChunkCompleted {
chunk_index: 0,
rows: 100,
file_name: None,
});
j.record(RunEvent::ChunkFailed {
chunk_index: 1,
error: "err".into(),
attempt: 1,
});
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
assert_eq!(j.chunk_events().len(), 3);
}
#[test]
fn quality_issues_filters_correctly() {
let mut j = journal();
j.record(RunEvent::QualityIssue {
severity: "FAIL".into(),
message: "null check".into(),
});
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
assert_eq!(j.quality_issues().len(), 1);
}
#[test]
fn schema_changes_filters_correctly() {
let mut j = journal();
j.record(RunEvent::SchemaChanged {
added: vec!["new_col (Int64)".into()],
removed: vec![],
type_changed: vec![],
});
assert_eq!(j.schema_changes().len(), 1);
}
#[test]
fn warnings_includes_both_warning_and_plan_warning() {
let mut j = journal();
j.record(RunEvent::Warning {
context: "ctx".into(),
message: "w1".into(),
});
j.record(RunEvent::PlanWarning {
rule: "r".into(),
message: "w2".into(),
});
j.record(RunEvent::QualityIssue {
severity: "WARN".into(),
message: "q".into(),
});
assert_eq!(j.warnings().len(), 2);
}
#[test]
fn final_outcome_none_when_not_completed() {
let mut j = journal();
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
assert!(j.final_outcome().is_none());
}
#[test]
fn final_outcome_returns_last_run_completed() {
let mut j = journal();
j.record(RunEvent::RunCompleted {
status: "success".into(),
error_message: None,
duration_ms: 1234,
});
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
j.record(RunEvent::RunCompleted {
status: "failed".into(),
error_message: Some("err".into()),
duration_ms: 5678,
});
let outcome = j.final_outcome().unwrap();
if let RunEvent::RunCompleted { status, .. } = &outcome.event {
assert_eq!(status, "failed");
} else {
panic!("expected RunCompleted");
}
}
}