1#![allow(dead_code)]
5use chrono::{DateTime, Utc};
30use serde::{Deserialize, Serialize};
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct PlanSnapshot {
39 pub export_name: String,
40 pub base_query: String,
41 pub strategy: String,
42 pub format: String,
43 pub compression: String,
44 pub destination_type: String,
45 pub tuning_profile: String,
46 pub batch_size: usize,
47 pub validate: bool,
48 pub reconcile: bool,
49 pub resume: bool,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum RunEvent {
64 PlanResolved(PlanSnapshot),
67 PlanWarning { rule: String, message: String },
69
70 FileWritten {
73 file_name: String,
74 rows: i64,
75 bytes: u64,
76 part_index: usize,
77 },
78 ChunkStarted {
80 chunk_index: i64,
81 start_key: String,
82 end_key: String,
83 },
84 ChunkCompleted {
86 chunk_index: i64,
87 rows: i64,
88 file_name: Option<String>,
89 },
90 ChunkFailed {
92 chunk_index: i64,
93 error: String,
94 attempt: i64,
95 },
96 RetryAttempted {
98 attempt: u32,
99 reason: String,
100 backoff_ms: u64,
101 },
102
103 QualityIssue { severity: String, message: String },
106 SchemaChanged {
108 added: Vec<String>,
110 removed: Vec<String>,
112 type_changed: Vec<(String, String, String)>,
114 },
115 Warning { context: String, message: String },
117 ParallelismAdjusted {
121 from: usize,
122 to: usize,
123 reason: String,
124 },
125
126 ValidationResult { passed: bool },
129 ReconciliationResult {
131 source_count: i64,
132 exported_rows: i64,
133 matched: bool,
134 },
135
136 RunCompleted {
139 status: String,
140 error_message: Option<String>,
141 duration_ms: i64,
142 },
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct JournalEntry {
150 pub recorded_at: DateTime<Utc>,
151 pub event: RunEvent,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct RunJournal {
162 pub run_id: String,
163 pub export_name: String,
164 pub entries: Vec<JournalEntry>,
166}
167
168impl RunJournal {
169 pub fn new(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
170 Self {
171 run_id: run_id.into(),
172 export_name: export_name.into(),
173 entries: Vec::new(),
174 }
175 }
176
177 pub fn record(&mut self, event: RunEvent) {
179 self.entries.push(JournalEntry {
180 recorded_at: Utc::now(),
181 event,
182 });
183 }
184
185 pub fn plan_snapshot(&self) -> Option<&PlanSnapshot> {
189 self.entries.iter().find_map(|e| {
190 if let RunEvent::PlanResolved(s) = &e.event {
191 Some(s)
192 } else {
193 None
194 }
195 })
196 }
197
198 pub fn files(&self) -> Vec<&JournalEntry> {
202 self.entries
203 .iter()
204 .filter(|e| matches!(e.event, RunEvent::FileWritten { .. }))
205 .collect()
206 }
207
208 pub fn retries(&self) -> Vec<&JournalEntry> {
210 self.entries
211 .iter()
212 .filter(|e| matches!(e.event, RunEvent::RetryAttempted { .. }))
213 .collect()
214 }
215
216 pub fn chunk_events(&self) -> Vec<&JournalEntry> {
218 self.entries
219 .iter()
220 .filter(|e| {
221 matches!(
222 e.event,
223 RunEvent::ChunkStarted { .. }
224 | RunEvent::ChunkCompleted { .. }
225 | RunEvent::ChunkFailed { .. }
226 )
227 })
228 .collect()
229 }
230
231 pub fn quality_issues(&self) -> Vec<&JournalEntry> {
235 self.entries
236 .iter()
237 .filter(|e| matches!(e.event, RunEvent::QualityIssue { .. }))
238 .collect()
239 }
240
241 pub fn schema_changes(&self) -> Vec<&JournalEntry> {
243 self.entries
244 .iter()
245 .filter(|e| matches!(e.event, RunEvent::SchemaChanged { .. }))
246 .collect()
247 }
248
249 pub fn warnings(&self) -> Vec<&JournalEntry> {
251 self.entries
252 .iter()
253 .filter(|e| {
254 matches!(
255 e.event,
256 RunEvent::Warning { .. } | RunEvent::PlanWarning { .. }
257 )
258 })
259 .collect()
260 }
261
262 pub fn final_outcome(&self) -> Option<&JournalEntry> {
266 self.entries
267 .iter()
268 .rev()
269 .find(|e| matches!(e.event, RunEvent::RunCompleted { .. }))
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276
277 fn journal() -> RunJournal {
278 RunJournal::new("run-1", "orders")
279 }
280
281 fn snap() -> PlanSnapshot {
282 PlanSnapshot {
283 export_name: "orders".into(),
284 base_query: "SELECT 1".into(),
285 strategy: "snapshot".into(),
286 format: "parquet".into(),
287 compression: "zstd".into(),
288 destination_type: "local".into(),
289 tuning_profile: "balanced".into(),
290 batch_size: 1000,
291 validate: false,
292 reconcile: false,
293 resume: false,
294 }
295 }
296
297 #[test]
300 fn new_journal_is_empty() {
301 let j = journal();
302 assert_eq!(j.run_id, "run-1");
303 assert_eq!(j.export_name, "orders");
304 assert!(j.entries.is_empty());
305 }
306
307 #[test]
310 fn record_appends_entry() {
311 let mut j = journal();
312 j.record(RunEvent::Warning {
313 context: "test".into(),
314 message: "w".into(),
315 });
316 assert_eq!(j.entries.len(), 1);
317 }
318
319 #[test]
320 fn record_multiple_entries_in_order() {
321 let mut j = journal();
322 j.record(RunEvent::Warning {
323 context: "a".into(),
324 message: "1".into(),
325 });
326 j.record(RunEvent::Warning {
327 context: "b".into(),
328 message: "2".into(),
329 });
330 assert_eq!(j.entries.len(), 2);
331 }
332
333 #[test]
336 fn plan_snapshot_none_when_empty() {
337 assert!(journal().plan_snapshot().is_none());
338 }
339
340 #[test]
341 fn plan_snapshot_returns_first_resolved() {
342 let mut j = journal();
343 j.record(RunEvent::PlanResolved(snap()));
344 let s = j.plan_snapshot().unwrap();
345 assert_eq!(s.export_name, "orders");
346 assert_eq!(s.batch_size, 1000);
347 }
348
349 #[test]
352 fn files_empty_when_no_file_written() {
353 let mut j = journal();
354 j.record(RunEvent::Warning {
355 context: "x".into(),
356 message: "y".into(),
357 });
358 assert!(j.files().is_empty());
359 }
360
361 #[test]
362 fn files_returns_file_written_entries() {
363 let mut j = journal();
364 j.record(RunEvent::FileWritten {
365 file_name: "f.parquet".into(),
366 rows: 100,
367 bytes: 4096,
368 part_index: 0,
369 });
370 j.record(RunEvent::Warning {
371 context: "x".into(),
372 message: "y".into(),
373 });
374 j.record(RunEvent::FileWritten {
375 file_name: "g.parquet".into(),
376 rows: 50,
377 bytes: 2048,
378 part_index: 1,
379 });
380 assert_eq!(j.files().len(), 2);
381 }
382
383 #[test]
386 fn retries_empty_when_none_recorded() {
387 assert!(journal().retries().is_empty());
388 }
389
390 #[test]
391 fn retries_returns_retry_attempted_entries() {
392 let mut j = journal();
393 j.record(RunEvent::RetryAttempted {
394 attempt: 1,
395 reason: "timeout".into(),
396 backoff_ms: 500,
397 });
398 j.record(RunEvent::RetryAttempted {
399 attempt: 2,
400 reason: "timeout".into(),
401 backoff_ms: 1000,
402 });
403 assert_eq!(j.retries().len(), 2);
404 }
405
406 #[test]
409 fn chunk_events_collects_all_three_variant_types() {
410 let mut j = journal();
411 j.record(RunEvent::ChunkStarted {
412 chunk_index: 0,
413 start_key: "0".into(),
414 end_key: "100".into(),
415 });
416 j.record(RunEvent::ChunkCompleted {
417 chunk_index: 0,
418 rows: 100,
419 file_name: None,
420 });
421 j.record(RunEvent::ChunkFailed {
422 chunk_index: 1,
423 error: "err".into(),
424 attempt: 1,
425 });
426 j.record(RunEvent::Warning {
427 context: "x".into(),
428 message: "y".into(),
429 });
430 assert_eq!(j.chunk_events().len(), 3);
431 }
432
433 #[test]
436 fn quality_issues_filters_correctly() {
437 let mut j = journal();
438 j.record(RunEvent::QualityIssue {
439 severity: "FAIL".into(),
440 message: "null check".into(),
441 });
442 j.record(RunEvent::Warning {
443 context: "x".into(),
444 message: "y".into(),
445 });
446 assert_eq!(j.quality_issues().len(), 1);
447 }
448
449 #[test]
452 fn schema_changes_filters_correctly() {
453 let mut j = journal();
454 j.record(RunEvent::SchemaChanged {
455 added: vec!["new_col (Int64)".into()],
456 removed: vec![],
457 type_changed: vec![],
458 });
459 assert_eq!(j.schema_changes().len(), 1);
460 }
461
462 #[test]
465 fn warnings_includes_both_warning_and_plan_warning() {
466 let mut j = journal();
467 j.record(RunEvent::Warning {
468 context: "ctx".into(),
469 message: "w1".into(),
470 });
471 j.record(RunEvent::PlanWarning {
472 rule: "r".into(),
473 message: "w2".into(),
474 });
475 j.record(RunEvent::QualityIssue {
476 severity: "WARN".into(),
477 message: "q".into(),
478 });
479 assert_eq!(j.warnings().len(), 2);
480 }
481
482 #[test]
485 fn final_outcome_none_when_not_completed() {
486 let mut j = journal();
487 j.record(RunEvent::Warning {
488 context: "x".into(),
489 message: "y".into(),
490 });
491 assert!(j.final_outcome().is_none());
492 }
493
494 #[test]
495 fn final_outcome_returns_last_run_completed() {
496 let mut j = journal();
497 j.record(RunEvent::RunCompleted {
498 status: "success".into(),
499 error_message: None,
500 duration_ms: 1234,
501 });
502 j.record(RunEvent::Warning {
503 context: "x".into(),
504 message: "y".into(),
505 });
506 j.record(RunEvent::RunCompleted {
507 status: "failed".into(),
508 error_message: Some("err".into()),
509 duration_ms: 5678,
510 });
511 let outcome = j.final_outcome().unwrap();
512 if let RunEvent::RunCompleted { status, .. } = &outcome.event {
513 assert_eq!(status, "failed");
514 } else {
515 panic!("expected RunCompleted");
516 }
517 }
518}