1#![allow(dead_code)]
5use chrono::{DateTime, Utc};
30use serde::{Deserialize, Serialize};
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct PlanSnapshot {
39 pub export_name: String,
40 pub base_query: String,
41 pub strategy: String,
42 pub format: String,
43 pub compression: String,
44 pub destination_type: String,
45 pub tuning_profile: String,
46 pub batch_size: usize,
47 pub validate: bool,
48 pub reconcile: bool,
49 pub resume: bool,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum RunEvent {
64 PlanResolved(PlanSnapshot),
67 PlanWarning { rule: String, message: String },
69
70 FileWritten {
73 file_name: String,
74 rows: i64,
75 bytes: u64,
76 part_index: usize,
77 },
78 ChunkStarted {
80 chunk_index: i64,
81 start_key: String,
82 end_key: String,
83 },
84 ChunkCompleted {
86 chunk_index: i64,
87 rows: i64,
88 file_name: Option<String>,
89 },
90 ChunkFailed {
92 chunk_index: i64,
93 error: String,
94 attempt: i64,
95 },
96 RetryAttempted {
98 attempt: u32,
99 reason: String,
100 backoff_ms: u64,
101 },
102
103 QualityIssue { severity: String, message: String },
106 SchemaChanged {
108 added: Vec<String>,
110 removed: Vec<String>,
112 type_changed: Vec<(String, String, String)>,
114 },
115 Warning { context: String, message: String },
117
118 ValidationResult { passed: bool },
121 ReconciliationResult {
123 source_count: i64,
124 exported_rows: i64,
125 matched: bool,
126 },
127
128 RunCompleted {
131 status: String,
132 error_message: Option<String>,
133 duration_ms: i64,
134 },
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct JournalEntry {
142 pub recorded_at: DateTime<Utc>,
143 pub event: RunEvent,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct RunJournal {
154 pub run_id: String,
155 pub export_name: String,
156 pub entries: Vec<JournalEntry>,
158}
159
160impl RunJournal {
161 pub fn new(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
162 Self {
163 run_id: run_id.into(),
164 export_name: export_name.into(),
165 entries: Vec::new(),
166 }
167 }
168
169 pub fn record(&mut self, event: RunEvent) {
171 self.entries.push(JournalEntry {
172 recorded_at: Utc::now(),
173 event,
174 });
175 }
176
177 pub fn plan_snapshot(&self) -> Option<&PlanSnapshot> {
181 self.entries.iter().find_map(|e| {
182 if let RunEvent::PlanResolved(s) = &e.event {
183 Some(s)
184 } else {
185 None
186 }
187 })
188 }
189
190 pub fn files(&self) -> Vec<&JournalEntry> {
194 self.entries
195 .iter()
196 .filter(|e| matches!(e.event, RunEvent::FileWritten { .. }))
197 .collect()
198 }
199
200 pub fn retries(&self) -> Vec<&JournalEntry> {
202 self.entries
203 .iter()
204 .filter(|e| matches!(e.event, RunEvent::RetryAttempted { .. }))
205 .collect()
206 }
207
208 pub fn chunk_events(&self) -> Vec<&JournalEntry> {
210 self.entries
211 .iter()
212 .filter(|e| {
213 matches!(
214 e.event,
215 RunEvent::ChunkStarted { .. }
216 | RunEvent::ChunkCompleted { .. }
217 | RunEvent::ChunkFailed { .. }
218 )
219 })
220 .collect()
221 }
222
223 pub fn quality_issues(&self) -> Vec<&JournalEntry> {
227 self.entries
228 .iter()
229 .filter(|e| matches!(e.event, RunEvent::QualityIssue { .. }))
230 .collect()
231 }
232
233 pub fn schema_changes(&self) -> Vec<&JournalEntry> {
235 self.entries
236 .iter()
237 .filter(|e| matches!(e.event, RunEvent::SchemaChanged { .. }))
238 .collect()
239 }
240
241 pub fn warnings(&self) -> Vec<&JournalEntry> {
243 self.entries
244 .iter()
245 .filter(|e| {
246 matches!(
247 e.event,
248 RunEvent::Warning { .. } | RunEvent::PlanWarning { .. }
249 )
250 })
251 .collect()
252 }
253
254 pub fn final_outcome(&self) -> Option<&JournalEntry> {
258 self.entries
259 .iter()
260 .rev()
261 .find(|e| matches!(e.event, RunEvent::RunCompleted { .. }))
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268
269 fn journal() -> RunJournal {
270 RunJournal::new("run-1", "orders")
271 }
272
273 fn snap() -> PlanSnapshot {
274 PlanSnapshot {
275 export_name: "orders".into(),
276 base_query: "SELECT 1".into(),
277 strategy: "snapshot".into(),
278 format: "parquet".into(),
279 compression: "zstd".into(),
280 destination_type: "local".into(),
281 tuning_profile: "balanced".into(),
282 batch_size: 1000,
283 validate: false,
284 reconcile: false,
285 resume: false,
286 }
287 }
288
289 #[test]
292 fn new_journal_is_empty() {
293 let j = journal();
294 assert_eq!(j.run_id, "run-1");
295 assert_eq!(j.export_name, "orders");
296 assert!(j.entries.is_empty());
297 }
298
299 #[test]
302 fn record_appends_entry() {
303 let mut j = journal();
304 j.record(RunEvent::Warning {
305 context: "test".into(),
306 message: "w".into(),
307 });
308 assert_eq!(j.entries.len(), 1);
309 }
310
311 #[test]
312 fn record_multiple_entries_in_order() {
313 let mut j = journal();
314 j.record(RunEvent::Warning {
315 context: "a".into(),
316 message: "1".into(),
317 });
318 j.record(RunEvent::Warning {
319 context: "b".into(),
320 message: "2".into(),
321 });
322 assert_eq!(j.entries.len(), 2);
323 }
324
325 #[test]
328 fn plan_snapshot_none_when_empty() {
329 assert!(journal().plan_snapshot().is_none());
330 }
331
332 #[test]
333 fn plan_snapshot_returns_first_resolved() {
334 let mut j = journal();
335 j.record(RunEvent::PlanResolved(snap()));
336 let s = j.plan_snapshot().unwrap();
337 assert_eq!(s.export_name, "orders");
338 assert_eq!(s.batch_size, 1000);
339 }
340
341 #[test]
344 fn files_empty_when_no_file_written() {
345 let mut j = journal();
346 j.record(RunEvent::Warning {
347 context: "x".into(),
348 message: "y".into(),
349 });
350 assert!(j.files().is_empty());
351 }
352
353 #[test]
354 fn files_returns_file_written_entries() {
355 let mut j = journal();
356 j.record(RunEvent::FileWritten {
357 file_name: "f.parquet".into(),
358 rows: 100,
359 bytes: 4096,
360 part_index: 0,
361 });
362 j.record(RunEvent::Warning {
363 context: "x".into(),
364 message: "y".into(),
365 });
366 j.record(RunEvent::FileWritten {
367 file_name: "g.parquet".into(),
368 rows: 50,
369 bytes: 2048,
370 part_index: 1,
371 });
372 assert_eq!(j.files().len(), 2);
373 }
374
375 #[test]
378 fn retries_empty_when_none_recorded() {
379 assert!(journal().retries().is_empty());
380 }
381
382 #[test]
383 fn retries_returns_retry_attempted_entries() {
384 let mut j = journal();
385 j.record(RunEvent::RetryAttempted {
386 attempt: 1,
387 reason: "timeout".into(),
388 backoff_ms: 500,
389 });
390 j.record(RunEvent::RetryAttempted {
391 attempt: 2,
392 reason: "timeout".into(),
393 backoff_ms: 1000,
394 });
395 assert_eq!(j.retries().len(), 2);
396 }
397
398 #[test]
401 fn chunk_events_collects_all_three_variant_types() {
402 let mut j = journal();
403 j.record(RunEvent::ChunkStarted {
404 chunk_index: 0,
405 start_key: "0".into(),
406 end_key: "100".into(),
407 });
408 j.record(RunEvent::ChunkCompleted {
409 chunk_index: 0,
410 rows: 100,
411 file_name: None,
412 });
413 j.record(RunEvent::ChunkFailed {
414 chunk_index: 1,
415 error: "err".into(),
416 attempt: 1,
417 });
418 j.record(RunEvent::Warning {
419 context: "x".into(),
420 message: "y".into(),
421 });
422 assert_eq!(j.chunk_events().len(), 3);
423 }
424
425 #[test]
428 fn quality_issues_filters_correctly() {
429 let mut j = journal();
430 j.record(RunEvent::QualityIssue {
431 severity: "FAIL".into(),
432 message: "null check".into(),
433 });
434 j.record(RunEvent::Warning {
435 context: "x".into(),
436 message: "y".into(),
437 });
438 assert_eq!(j.quality_issues().len(), 1);
439 }
440
441 #[test]
444 fn schema_changes_filters_correctly() {
445 let mut j = journal();
446 j.record(RunEvent::SchemaChanged {
447 added: vec!["new_col (Int64)".into()],
448 removed: vec![],
449 type_changed: vec![],
450 });
451 assert_eq!(j.schema_changes().len(), 1);
452 }
453
454 #[test]
457 fn warnings_includes_both_warning_and_plan_warning() {
458 let mut j = journal();
459 j.record(RunEvent::Warning {
460 context: "ctx".into(),
461 message: "w1".into(),
462 });
463 j.record(RunEvent::PlanWarning {
464 rule: "r".into(),
465 message: "w2".into(),
466 });
467 j.record(RunEvent::QualityIssue {
468 severity: "WARN".into(),
469 message: "q".into(),
470 });
471 assert_eq!(j.warnings().len(), 2);
472 }
473
474 #[test]
477 fn final_outcome_none_when_not_completed() {
478 let mut j = journal();
479 j.record(RunEvent::Warning {
480 context: "x".into(),
481 message: "y".into(),
482 });
483 assert!(j.final_outcome().is_none());
484 }
485
486 #[test]
487 fn final_outcome_returns_last_run_completed() {
488 let mut j = journal();
489 j.record(RunEvent::RunCompleted {
490 status: "success".into(),
491 error_message: None,
492 duration_ms: 1234,
493 });
494 j.record(RunEvent::Warning {
495 context: "x".into(),
496 message: "y".into(),
497 });
498 j.record(RunEvent::RunCompleted {
499 status: "failed".into(),
500 error_message: Some("err".into()),
501 duration_ms: 5678,
502 });
503 let outcome = j.final_outcome().unwrap();
504 if let RunEvent::RunCompleted { status, .. } = &outcome.event {
505 assert_eq!(status, "failed");
506 } else {
507 panic!("expected RunCompleted");
508 }
509 }
510}