1#![allow(dead_code)]
5use chrono::{DateTime, Utc};
30use serde::{Deserialize, Serialize};
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct PlanSnapshot {
39 pub export_name: String,
40 pub base_query: String,
41 pub strategy: String,
42 pub format: String,
43 pub compression: String,
44 pub destination_type: String,
45 pub tuning_profile: String,
46 pub batch_size: usize,
47 pub validate: bool,
48 pub reconcile: bool,
49 pub resume: bool,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum RunEvent {
64 PlanResolved(PlanSnapshot),
67 PlanWarning { rule: String, message: String },
69
70 FileWritten {
73 file_name: String,
74 rows: i64,
75 bytes: u64,
76 part_index: usize,
77 },
78 ChunkStarted {
80 chunk_index: i64,
81 start_key: String,
82 end_key: String,
83 },
84 ChunkCompleted {
86 chunk_index: i64,
87 rows: i64,
88 file_name: Option<String>,
89 },
90 ChunkFailed {
92 chunk_index: i64,
93 error: String,
94 attempt: i64,
95 },
96 RetryAttempted {
98 attempt: u32,
99 reason: String,
100 backoff_ms: u64,
101 },
102
103 QualityIssue { severity: String, message: String },
106 SchemaChanged {
108 added: Vec<String>,
110 removed: Vec<String>,
112 type_changed: Vec<(String, String, String)>,
114 },
115 Warning { context: String, message: String },
117 ParallelismAdjusted {
121 from: usize,
122 to: usize,
123 reason: String,
124 },
125
126 ValidationResult { passed: bool },
129 ReconciliationResult {
131 source_count: i64,
132 exported_rows: i64,
133 matched: bool,
134 },
135
136 RunCompleted {
139 status: String,
140 error_message: Option<String>,
141 duration_ms: i64,
142 },
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct JournalEntry {
150 pub recorded_at: DateTime<Utc>,
151 pub event: RunEvent,
152}
153
154#[derive(Debug, Clone, Default, Serialize, Deserialize)]
161pub struct RunJournal {
162 pub run_id: String,
163 pub export_name: String,
164 pub entries: Vec<JournalEntry>,
166}
167
168impl RunJournal {
169 pub fn new(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
170 Self {
171 run_id: run_id.into(),
172 export_name: export_name.into(),
173 entries: Vec::new(),
174 }
175 }
176
177 pub fn record(&mut self, event: RunEvent) {
179 self.entries.push(JournalEntry {
180 recorded_at: Utc::now(),
181 event,
182 });
183 }
184
185 pub fn plan_snapshot(&self) -> Option<&PlanSnapshot> {
189 self.entries.iter().find_map(|e| {
190 if let RunEvent::PlanResolved(s) = &e.event {
191 Some(s)
192 } else {
193 None
194 }
195 })
196 }
197
198 pub fn files(&self) -> Vec<&JournalEntry> {
202 self.entries
203 .iter()
204 .filter(|e| matches!(e.event, RunEvent::FileWritten { .. }))
205 .collect()
206 }
207
208 pub fn retries(&self) -> Vec<&JournalEntry> {
210 self.entries
211 .iter()
212 .filter(|e| matches!(e.event, RunEvent::RetryAttempted { .. }))
213 .collect()
214 }
215
216 pub fn chunk_events(&self) -> Vec<&JournalEntry> {
218 self.entries
219 .iter()
220 .filter(|e| {
221 matches!(
222 e.event,
223 RunEvent::ChunkStarted { .. }
224 | RunEvent::ChunkCompleted { .. }
225 | RunEvent::ChunkFailed { .. }
226 )
227 })
228 .collect()
229 }
230
231 pub fn longest_chunk_ms(&self) -> Option<i64> {
243 let mut started: std::collections::HashMap<i64, DateTime<Utc>> =
244 std::collections::HashMap::new();
245 let mut max_ms: Option<i64> = None;
246 for e in &self.entries {
247 match &e.event {
248 RunEvent::ChunkStarted { chunk_index, .. } => {
249 started.insert(*chunk_index, e.recorded_at);
250 }
251 RunEvent::ChunkCompleted { chunk_index, .. } => {
252 if let Some(start) = started.get(chunk_index) {
253 let ms = (e.recorded_at - *start).num_milliseconds();
254 if ms >= 0 {
255 max_ms = Some(max_ms.map_or(ms, |m| m.max(ms)));
256 }
257 }
258 }
259 _ => {}
260 }
261 }
262 max_ms
263 }
264
265 pub fn quality_issues(&self) -> Vec<&JournalEntry> {
269 self.entries
270 .iter()
271 .filter(|e| matches!(e.event, RunEvent::QualityIssue { .. }))
272 .collect()
273 }
274
275 pub fn schema_changes(&self) -> Vec<&JournalEntry> {
277 self.entries
278 .iter()
279 .filter(|e| matches!(e.event, RunEvent::SchemaChanged { .. }))
280 .collect()
281 }
282
283 pub fn warnings(&self) -> Vec<&JournalEntry> {
285 self.entries
286 .iter()
287 .filter(|e| {
288 matches!(
289 e.event,
290 RunEvent::Warning { .. } | RunEvent::PlanWarning { .. }
291 )
292 })
293 .collect()
294 }
295
296 pub fn final_outcome(&self) -> Option<&JournalEntry> {
300 self.entries
301 .iter()
302 .rev()
303 .find(|e| matches!(e.event, RunEvent::RunCompleted { .. }))
304 }
305}
306
307#[cfg(test)]
308impl RunJournal {
309 pub(crate) fn push_test_chunk_span(&mut self, chunk_index: i64, dur_ms: i64) {
315 let base = Utc::now();
316 self.entries.push(JournalEntry {
317 recorded_at: base,
318 event: RunEvent::ChunkStarted {
319 chunk_index,
320 start_key: "0".into(),
321 end_key: "1".into(),
322 },
323 });
324 self.entries.push(JournalEntry {
325 recorded_at: base + chrono::Duration::milliseconds(dur_ms),
326 event: RunEvent::ChunkCompleted {
327 chunk_index,
328 rows: 1,
329 file_name: None,
330 },
331 });
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 fn journal() -> RunJournal {
340 RunJournal::new("run-1", "orders")
341 }
342
343 fn snap() -> PlanSnapshot {
344 PlanSnapshot {
345 export_name: "orders".into(),
346 base_query: "SELECT 1".into(),
347 strategy: "snapshot".into(),
348 format: "parquet".into(),
349 compression: "zstd".into(),
350 destination_type: "local".into(),
351 tuning_profile: "balanced".into(),
352 batch_size: 1000,
353 validate: false,
354 reconcile: false,
355 resume: false,
356 }
357 }
358
359 #[test]
362 fn new_journal_is_empty() {
363 let j = journal();
364 assert_eq!(j.run_id, "run-1");
365 assert_eq!(j.export_name, "orders");
366 assert!(j.entries.is_empty());
367 }
368
369 #[test]
372 fn record_appends_entry() {
373 let mut j = journal();
374 j.record(RunEvent::Warning {
375 context: "test".into(),
376 message: "w".into(),
377 });
378 assert_eq!(j.entries.len(), 1);
379 }
380
381 #[test]
382 fn record_multiple_entries_in_order() {
383 let mut j = journal();
384 j.record(RunEvent::Warning {
385 context: "a".into(),
386 message: "1".into(),
387 });
388 j.record(RunEvent::Warning {
389 context: "b".into(),
390 message: "2".into(),
391 });
392 assert_eq!(j.entries.len(), 2);
393 }
394
395 #[test]
398 fn plan_snapshot_none_when_empty() {
399 assert!(journal().plan_snapshot().is_none());
400 }
401
402 #[test]
403 fn plan_snapshot_returns_first_resolved() {
404 let mut j = journal();
405 j.record(RunEvent::PlanResolved(snap()));
406 let s = j.plan_snapshot().unwrap();
407 assert_eq!(s.export_name, "orders");
408 assert_eq!(s.batch_size, 1000);
409 }
410
411 #[test]
414 fn files_empty_when_no_file_written() {
415 let mut j = journal();
416 j.record(RunEvent::Warning {
417 context: "x".into(),
418 message: "y".into(),
419 });
420 assert!(j.files().is_empty());
421 }
422
423 #[test]
424 fn files_returns_file_written_entries() {
425 let mut j = journal();
426 j.record(RunEvent::FileWritten {
427 file_name: "f.parquet".into(),
428 rows: 100,
429 bytes: 4096,
430 part_index: 0,
431 });
432 j.record(RunEvent::Warning {
433 context: "x".into(),
434 message: "y".into(),
435 });
436 j.record(RunEvent::FileWritten {
437 file_name: "g.parquet".into(),
438 rows: 50,
439 bytes: 2048,
440 part_index: 1,
441 });
442 assert_eq!(j.files().len(), 2);
443 }
444
445 #[test]
448 fn retries_empty_when_none_recorded() {
449 assert!(journal().retries().is_empty());
450 }
451
452 #[test]
453 fn retries_returns_retry_attempted_entries() {
454 let mut j = journal();
455 j.record(RunEvent::RetryAttempted {
456 attempt: 1,
457 reason: "timeout".into(),
458 backoff_ms: 500,
459 });
460 j.record(RunEvent::RetryAttempted {
461 attempt: 2,
462 reason: "timeout".into(),
463 backoff_ms: 1000,
464 });
465 assert_eq!(j.retries().len(), 2);
466 }
467
468 #[test]
471 fn chunk_events_collects_all_three_variant_types() {
472 let mut j = journal();
473 j.record(RunEvent::ChunkStarted {
474 chunk_index: 0,
475 start_key: "0".into(),
476 end_key: "100".into(),
477 });
478 j.record(RunEvent::ChunkCompleted {
479 chunk_index: 0,
480 rows: 100,
481 file_name: None,
482 });
483 j.record(RunEvent::ChunkFailed {
484 chunk_index: 1,
485 error: "err".into(),
486 attempt: 1,
487 });
488 j.record(RunEvent::Warning {
489 context: "x".into(),
490 message: "y".into(),
491 });
492 assert_eq!(j.chunk_events().len(), 3);
493 }
494
495 #[test]
498 fn longest_chunk_ms_pairs_started_and_completed_by_index() {
499 use chrono::Duration;
500 let base = Utc::now();
504 let mut j = journal();
505 let push = |j: &mut RunJournal, off_ms: i64, event: RunEvent| {
506 j.entries.push(JournalEntry {
507 recorded_at: base + Duration::milliseconds(off_ms),
508 event,
509 });
510 };
511 let started = |i: i64| RunEvent::ChunkStarted {
512 chunk_index: i,
513 start_key: "0".into(),
514 end_key: "1".into(),
515 };
516 let done = |i: i64| RunEvent::ChunkCompleted {
517 chunk_index: i,
518 rows: 1,
519 file_name: None,
520 };
521 push(&mut j, 0, started(0));
522 push(&mut j, 50, started(1));
523 push(&mut j, 200, done(0)); push(&mut j, 850, done(1)); assert_eq!(j.longest_chunk_ms(), Some(800));
526 }
527
528 #[test]
529 fn longest_chunk_ms_none_without_paired_start() {
530 let mut j = journal();
533 j.record(RunEvent::ChunkCompleted {
534 chunk_index: 0,
535 rows: 1,
536 file_name: None,
537 });
538 assert!(j.longest_chunk_ms().is_none());
539 assert!(
540 journal().longest_chunk_ms().is_none(),
541 "empty journal → None"
542 );
543 }
544
545 #[test]
548 fn quality_issues_filters_correctly() {
549 let mut j = journal();
550 j.record(RunEvent::QualityIssue {
551 severity: "FAIL".into(),
552 message: "null check".into(),
553 });
554 j.record(RunEvent::Warning {
555 context: "x".into(),
556 message: "y".into(),
557 });
558 assert_eq!(j.quality_issues().len(), 1);
559 }
560
561 #[test]
564 fn schema_changes_filters_correctly() {
565 let mut j = journal();
566 j.record(RunEvent::SchemaChanged {
567 added: vec!["new_col (Int64)".into()],
568 removed: vec![],
569 type_changed: vec![],
570 });
571 assert_eq!(j.schema_changes().len(), 1);
572 }
573
574 #[test]
577 fn warnings_includes_both_warning_and_plan_warning() {
578 let mut j = journal();
579 j.record(RunEvent::Warning {
580 context: "ctx".into(),
581 message: "w1".into(),
582 });
583 j.record(RunEvent::PlanWarning {
584 rule: "r".into(),
585 message: "w2".into(),
586 });
587 j.record(RunEvent::QualityIssue {
588 severity: "WARN".into(),
589 message: "q".into(),
590 });
591 assert_eq!(j.warnings().len(), 2);
592 }
593
594 #[test]
597 fn final_outcome_none_when_not_completed() {
598 let mut j = journal();
599 j.record(RunEvent::Warning {
600 context: "x".into(),
601 message: "y".into(),
602 });
603 assert!(j.final_outcome().is_none());
604 }
605
606 #[test]
607 fn final_outcome_returns_last_run_completed() {
608 let mut j = journal();
609 j.record(RunEvent::RunCompleted {
610 status: "success".into(),
611 error_message: None,
612 duration_ms: 1234,
613 });
614 j.record(RunEvent::Warning {
615 context: "x".into(),
616 message: "y".into(),
617 });
618 j.record(RunEvent::RunCompleted {
619 status: "failed".into(),
620 error_message: Some("err".into()),
621 duration_ms: 5678,
622 });
623 let outcome = j.final_outcome().unwrap();
624 if let RunEvent::RunCompleted { status, .. } = &outcome.event {
625 assert_eq!(status, "failed");
626 } else {
627 panic!("expected RunCompleted");
628 }
629 }
630}