#![allow(dead_code)]
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanSnapshot {
pub export_name: String,
pub base_query: String,
pub strategy: String,
pub format: String,
pub compression: String,
pub destination_type: String,
pub tuning_profile: String,
pub batch_size: usize,
pub validate: bool,
pub reconcile: bool,
pub resume: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RunEvent {
PlanResolved(PlanSnapshot),
PlanWarning { rule: String, message: String },
FileWritten {
file_name: String,
rows: i64,
bytes: u64,
part_index: usize,
},
ChunkStarted {
chunk_index: i64,
start_key: String,
end_key: String,
},
ChunkCompleted {
chunk_index: i64,
rows: i64,
file_name: Option<String>,
},
ChunkFailed {
chunk_index: i64,
error: String,
attempt: i64,
},
RetryAttempted {
attempt: u32,
reason: String,
backoff_ms: u64,
},
QualityIssue { severity: String, message: String },
SchemaChanged {
added: Vec<String>,
removed: Vec<String>,
type_changed: Vec<(String, String, String)>,
},
Warning { context: String, message: String },
ParallelismAdjusted {
from: usize,
to: usize,
reason: String,
},
ValidationResult { passed: bool },
ReconciliationResult {
source_count: i64,
exported_rows: i64,
matched: bool,
},
RunCompleted {
status: String,
error_message: Option<String>,
duration_ms: i64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JournalEntry {
pub recorded_at: DateTime<Utc>,
pub event: RunEvent,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunJournal {
pub run_id: String,
pub export_name: String,
pub entries: Vec<JournalEntry>,
}
impl RunJournal {
pub fn new(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
Self {
run_id: run_id.into(),
export_name: export_name.into(),
entries: Vec::new(),
}
}
pub fn record(&mut self, event: RunEvent) {
self.entries.push(JournalEntry {
recorded_at: Utc::now(),
event,
});
}
pub fn plan_snapshot(&self) -> Option<&PlanSnapshot> {
self.entries.iter().find_map(|e| {
if let RunEvent::PlanResolved(s) = &e.event {
Some(s)
} else {
None
}
})
}
pub fn files(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| matches!(e.event, RunEvent::FileWritten { .. }))
.collect()
}
pub fn retries(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| matches!(e.event, RunEvent::RetryAttempted { .. }))
.collect()
}
pub fn chunk_events(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| {
matches!(
e.event,
RunEvent::ChunkStarted { .. }
| RunEvent::ChunkCompleted { .. }
| RunEvent::ChunkFailed { .. }
)
})
.collect()
}
pub fn longest_chunk_ms(&self) -> Option<i64> {
let mut started: std::collections::HashMap<i64, DateTime<Utc>> =
std::collections::HashMap::new();
let mut max_ms: Option<i64> = None;
for e in &self.entries {
match &e.event {
RunEvent::ChunkStarted { chunk_index, .. } => {
started.insert(*chunk_index, e.recorded_at);
}
RunEvent::ChunkCompleted { chunk_index, .. } => {
if let Some(start) = started.get(chunk_index) {
let ms = (e.recorded_at - *start).num_milliseconds();
if ms >= 0 {
max_ms = Some(max_ms.map_or(ms, |m| m.max(ms)));
}
}
}
_ => {}
}
}
max_ms
}
pub fn quality_issues(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| matches!(e.event, RunEvent::QualityIssue { .. }))
.collect()
}
pub fn schema_changes(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| matches!(e.event, RunEvent::SchemaChanged { .. }))
.collect()
}
pub fn warnings(&self) -> Vec<&JournalEntry> {
self.entries
.iter()
.filter(|e| {
matches!(
e.event,
RunEvent::Warning { .. } | RunEvent::PlanWarning { .. }
)
})
.collect()
}
pub fn final_outcome(&self) -> Option<&JournalEntry> {
self.entries
.iter()
.rev()
.find(|e| matches!(e.event, RunEvent::RunCompleted { .. }))
}
}
#[cfg(test)]
impl RunJournal {
pub(crate) fn push_test_chunk_span(&mut self, chunk_index: i64, dur_ms: i64) {
let base = Utc::now();
self.entries.push(JournalEntry {
recorded_at: base,
event: RunEvent::ChunkStarted {
chunk_index,
start_key: "0".into(),
end_key: "1".into(),
},
});
self.entries.push(JournalEntry {
recorded_at: base + chrono::Duration::milliseconds(dur_ms),
event: RunEvent::ChunkCompleted {
chunk_index,
rows: 1,
file_name: None,
},
});
}
}
#[cfg(test)]
mod tests {
use super::*;
fn journal() -> RunJournal {
RunJournal::new("run-1", "orders")
}
fn snap() -> PlanSnapshot {
PlanSnapshot {
export_name: "orders".into(),
base_query: "SELECT 1".into(),
strategy: "snapshot".into(),
format: "parquet".into(),
compression: "zstd".into(),
destination_type: "local".into(),
tuning_profile: "balanced".into(),
batch_size: 1000,
validate: false,
reconcile: false,
resume: false,
}
}
#[test]
fn new_journal_is_empty() {
let j = journal();
assert_eq!(j.run_id, "run-1");
assert_eq!(j.export_name, "orders");
assert!(j.entries.is_empty());
}
#[test]
fn record_appends_entry() {
let mut j = journal();
j.record(RunEvent::Warning {
context: "test".into(),
message: "w".into(),
});
assert_eq!(j.entries.len(), 1);
}
#[test]
fn record_multiple_entries_in_order() {
let mut j = journal();
j.record(RunEvent::Warning {
context: "a".into(),
message: "1".into(),
});
j.record(RunEvent::Warning {
context: "b".into(),
message: "2".into(),
});
assert_eq!(j.entries.len(), 2);
}
#[test]
fn plan_snapshot_none_when_empty() {
assert!(journal().plan_snapshot().is_none());
}
#[test]
fn plan_snapshot_returns_first_resolved() {
let mut j = journal();
j.record(RunEvent::PlanResolved(snap()));
let s = j.plan_snapshot().unwrap();
assert_eq!(s.export_name, "orders");
assert_eq!(s.batch_size, 1000);
}
#[test]
fn files_empty_when_no_file_written() {
let mut j = journal();
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
assert!(j.files().is_empty());
}
#[test]
fn files_returns_file_written_entries() {
let mut j = journal();
j.record(RunEvent::FileWritten {
file_name: "f.parquet".into(),
rows: 100,
bytes: 4096,
part_index: 0,
});
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
j.record(RunEvent::FileWritten {
file_name: "g.parquet".into(),
rows: 50,
bytes: 2048,
part_index: 1,
});
assert_eq!(j.files().len(), 2);
}
#[test]
fn retries_empty_when_none_recorded() {
assert!(journal().retries().is_empty());
}
#[test]
fn retries_returns_retry_attempted_entries() {
let mut j = journal();
j.record(RunEvent::RetryAttempted {
attempt: 1,
reason: "timeout".into(),
backoff_ms: 500,
});
j.record(RunEvent::RetryAttempted {
attempt: 2,
reason: "timeout".into(),
backoff_ms: 1000,
});
assert_eq!(j.retries().len(), 2);
}
#[test]
fn chunk_events_collects_all_three_variant_types() {
let mut j = journal();
j.record(RunEvent::ChunkStarted {
chunk_index: 0,
start_key: "0".into(),
end_key: "100".into(),
});
j.record(RunEvent::ChunkCompleted {
chunk_index: 0,
rows: 100,
file_name: None,
});
j.record(RunEvent::ChunkFailed {
chunk_index: 1,
error: "err".into(),
attempt: 1,
});
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
assert_eq!(j.chunk_events().len(), 3);
}
#[test]
fn longest_chunk_ms_pairs_started_and_completed_by_index() {
use chrono::Duration;
let base = Utc::now();
let mut j = journal();
let push = |j: &mut RunJournal, off_ms: i64, event: RunEvent| {
j.entries.push(JournalEntry {
recorded_at: base + Duration::milliseconds(off_ms),
event,
});
};
let started = |i: i64| RunEvent::ChunkStarted {
chunk_index: i,
start_key: "0".into(),
end_key: "1".into(),
};
let done = |i: i64| RunEvent::ChunkCompleted {
chunk_index: i,
rows: 1,
file_name: None,
};
push(&mut j, 0, started(0));
push(&mut j, 50, started(1));
push(&mut j, 200, done(0)); push(&mut j, 850, done(1)); assert_eq!(j.longest_chunk_ms(), Some(800));
}
#[test]
fn longest_chunk_ms_none_without_paired_start() {
let mut j = journal();
j.record(RunEvent::ChunkCompleted {
chunk_index: 0,
rows: 1,
file_name: None,
});
assert!(j.longest_chunk_ms().is_none());
assert!(
journal().longest_chunk_ms().is_none(),
"empty journal → None"
);
}
#[test]
fn quality_issues_filters_correctly() {
let mut j = journal();
j.record(RunEvent::QualityIssue {
severity: "FAIL".into(),
message: "null check".into(),
});
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
assert_eq!(j.quality_issues().len(), 1);
}
#[test]
fn schema_changes_filters_correctly() {
let mut j = journal();
j.record(RunEvent::SchemaChanged {
added: vec!["new_col (Int64)".into()],
removed: vec![],
type_changed: vec![],
});
assert_eq!(j.schema_changes().len(), 1);
}
#[test]
fn warnings_includes_both_warning_and_plan_warning() {
let mut j = journal();
j.record(RunEvent::Warning {
context: "ctx".into(),
message: "w1".into(),
});
j.record(RunEvent::PlanWarning {
rule: "r".into(),
message: "w2".into(),
});
j.record(RunEvent::QualityIssue {
severity: "WARN".into(),
message: "q".into(),
});
assert_eq!(j.warnings().len(), 2);
}
#[test]
fn final_outcome_none_when_not_completed() {
let mut j = journal();
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
assert!(j.final_outcome().is_none());
}
#[test]
fn final_outcome_returns_last_run_completed() {
let mut j = journal();
j.record(RunEvent::RunCompleted {
status: "success".into(),
error_message: None,
duration_ms: 1234,
});
j.record(RunEvent::Warning {
context: "x".into(),
message: "y".into(),
});
j.record(RunEvent::RunCompleted {
status: "failed".into(),
error_message: Some("err".into()),
duration_ms: 5678,
});
let outcome = j.final_outcome().unwrap();
if let RunEvent::RunCompleted { status, .. } = &outcome.event {
assert_eq!(status, "failed");
} else {
panic!("expected RunCompleted");
}
}
}