1use super::ipc::{self, ChildEvent};
18use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
19use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
20use crate::manifest::ManifestPart;
21use crate::plan::ResolvedRunPlan;
22
23fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
29 PlanSnapshot {
30 export_name: plan.export_name.clone(),
31 base_query: plan.base_query.clone(),
32 strategy: plan.strategy.mode_label().to_string(),
33 format: plan.format.label().to_string(),
34 compression: plan.compression.label().to_string(),
35 destination_type: plan.destination.destination_type.label().to_string(),
36 tuning_profile: plan.tuning_profile_label.clone(),
37 batch_size: plan.tuning.batch_size,
38 validate: plan.validate,
39 reconcile: plan.reconcile,
40 resume: plan.resume,
41 }
42}
43
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct ApplyContext {
53 pub plan_id: String,
55 pub forced: bool,
57 pub force_bypassed: Vec<String>,
62}
63
64#[derive(Debug, Clone)]
70pub struct RunSummary {
71 pub run_id: String,
72 pub export_name: String,
73 pub status: String,
74 pub total_rows: i64,
75 pub files_produced: usize,
76 pub bytes_written: u64,
77 pub files_committed: usize,
80 pub duration_ms: i64,
81 pub peak_rss_mb: i64,
82 pub retries: u32,
83 pub validated: Option<bool>,
84 pub schema_changed: Option<bool>,
85 pub quality_passed: Option<bool>,
86 pub error_message: Option<String>,
87 pub tuning_profile: String,
89 pub batch_size: usize,
91 pub batch_size_memory_mb: Option<usize>,
93 pub format: String,
94 pub mode: String,
95 pub compression: String,
96 pub destination_uri: Option<String>,
101 pub pg_temp_bytes_delta: Option<i64>,
108 pub skip_reason: Option<String>,
114 pub source_count: Option<i64>,
116 pub reconciled: Option<bool>,
118 pub manifest_parts: Vec<ManifestPart>,
123 pub schema_fingerprint: Option<String>,
137 pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
143 pub apply_context: Option<ApplyContext>,
147 pub journal: RunJournal,
149}
150
151impl RunSummary {
152 pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
153 let run_id = format!(
154 "{}_{}",
155 plan.export_name,
156 chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
157 );
158 let mut journal = RunJournal::new(&run_id, &plan.export_name);
159 journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
160
161 ipc::emit_event(&ChildEvent::Started {
162 export_name: plan.export_name.clone(),
163 run_id: run_id.clone(),
164 mode: plan.strategy.mode_label().to_string(),
165 tuning_profile: plan.tuning_profile_label.clone(),
166 batch_size: plan.tuning.batch_size,
167 });
168
169 Self {
170 run_id,
171 export_name: plan.export_name.clone(),
172 status: "running".into(),
173 total_rows: 0,
174 files_produced: 0,
175 bytes_written: 0,
176 files_committed: 0,
177 duration_ms: 0,
178 peak_rss_mb: 0,
179 retries: 0,
180 validated: None,
181 schema_changed: None,
182 quality_passed: None,
183 error_message: None,
184 tuning_profile: plan.tuning_profile_label.clone(),
185 batch_size: plan.tuning.batch_size,
186 batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
187 format: plan.format.label().to_string(),
188 mode: plan.strategy.mode_label().to_string(),
189 compression: plan.compression.label().to_string(),
190 destination_uri: (!matches!(
191 plan.destination.destination_type,
192 crate::config::DestinationType::Stdout
193 ))
194 .then(|| crate::pipeline::finalize::destination_uri_for_manifest(&plan.destination)),
195 pg_temp_bytes_delta: None,
196 skip_reason: None,
197 source_count: None,
198 reconciled: None,
199 manifest_parts: Vec::new(),
200 schema_fingerprint: None,
201 manifest_verification: None,
202 apply_context: None,
203 journal,
204 }
205 }
206
207 #[doc(hidden)]
228 #[allow(dead_code)]
229 pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
230 let run_id = run_id.into();
231 let export_name = export_name.into();
232 let journal = RunJournal::new(&run_id, &export_name);
233 Self {
234 run_id,
235 export_name,
236 status: "running".into(),
237 total_rows: 0,
238 files_produced: 0,
239 bytes_written: 0,
240 files_committed: 0,
241 duration_ms: 0,
242 peak_rss_mb: 0,
243 retries: 0,
244 validated: None,
245 schema_changed: None,
246 quality_passed: None,
247 error_message: None,
248 tuning_profile: "balanced".into(),
249 batch_size: 1000,
250 batch_size_memory_mb: None,
251 format: "parquet".into(),
252 mode: "snapshot".into(),
253 compression: "zstd".into(),
254 destination_uri: None,
255 pg_temp_bytes_delta: None,
256 skip_reason: None,
257 source_count: None,
258 reconciled: None,
259 manifest_parts: Vec::new(),
260 schema_fingerprint: None,
261 manifest_verification: None,
262 apply_context: None,
263 journal,
264 }
265 }
266
267 #[doc(hidden)]
274 #[allow(dead_code)]
275 pub fn with_status(mut self, status: impl Into<String>) -> Self {
276 let s = status.into();
277 if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
278 self.journal.record(RunEvent::RunCompleted {
279 status: s.clone(),
280 error_message: self.error_message.clone(),
281 duration_ms: self.duration_ms,
282 });
283 }
284 self.status = s;
285 self
286 }
287
288 #[doc(hidden)]
292 #[allow(dead_code)]
293 pub fn with_files_committed(mut self, n: usize) -> Self {
294 self.files_committed = n;
295 self
296 }
297
298 #[doc(hidden)]
302 #[allow(dead_code)]
303 pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
304 self.total_rows = parts.iter().map(|p| p.rows).sum();
305 self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
306 self.files_produced = parts.len();
307 self.files_committed = parts.len();
308 self.manifest_parts = parts;
309 self
310 }
311
312 #[doc(hidden)]
316 #[allow(dead_code)]
317 pub fn with_error(mut self, msg: impl Into<String>) -> Self {
318 self.error_message = Some(msg.into());
319 self
320 }
321
322 #[doc(hidden)]
327 #[allow(dead_code)]
328 pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
329 self.journal.record(RunEvent::PlanResolved(snap));
330 self
331 }
332
333 pub(super) fn print(&self) {
334 if ipc::capturing_events() {
338 ipc::emit_event(&ChildEvent::Finished {
339 export_name: self.export_name.clone(),
340 run_id: self.run_id.clone(),
341 status: self.status.clone(),
342 total_rows: self.total_rows,
343 files_produced: self.files_produced as u64,
344 bytes_written: self.bytes_written,
345 duration_ms: self.duration_ms,
346 peak_rss_mb: self.peak_rss_mb,
347 error_message: self.error_message.clone(),
348 });
349 return;
350 }
351
352 self.print_stderr_block();
353 }
354
355 pub(super) fn print_stderr_block(&self) {
360 let block = if multi_export_mode() {
361 self.render_compact()
362 } else {
363 self.render().trim_end_matches('\n').to_string()
369 };
370
371 use std::io::Write;
372 let mut buf = super::parent_ui::sanitize_terminal(&block);
379 buf.push('\n');
380 let stderr = std::io::stderr();
381 let mut handle = stderr.lock();
382 let _ = handle.write_all(buf.as_bytes());
383 let _ = handle.flush();
384 }
385
386 fn render_compact(&self) -> String {
391 const NAME_COL: usize = 22;
392 const MODE_COL: usize = 8;
393 let icon = match self.status.as_str() {
394 "success" => "✓",
395 "failed" => "✗",
396 _ => "•",
397 };
398 let body = if self.status == "failed" {
399 let err = self
400 .error_message
401 .as_deref()
402 .unwrap_or("(no error message recorded)");
403 let (cause, _) = strip_chunked_recovery_hint(err);
404 compact_error(cause)
408 } else {
409 let rss = if self.peak_rss_mb > 0 {
410 format!(" RSS {} MB", fmt_thousands(self.peak_rss_mb))
411 } else {
412 String::new()
413 };
414 format!(
415 "{} rows {} files {} {}{}",
416 fmt_thousands(self.total_rows),
417 fmt_thousands(self.files_produced as i64),
418 format_bytes(self.bytes_written),
419 fmt_duration_ms(self.duration_ms),
420 rss
421 )
422 };
423 format!(
424 "{} {:<name$} {:<mode$} {}",
425 icon,
426 self.export_name,
427 self.mode,
428 body,
429 name = NAME_COL,
430 mode = MODE_COL,
431 )
432 }
433
434 fn render(&self) -> String {
437 let mut rows: Vec<(&'static str, String)> = Vec::with_capacity(16);
442 rows.push(("run_id", self.run_id.clone()));
443 let status_value = match (&self.status, &self.skip_reason) {
444 (s, Some(reason)) if s == "skipped" => format!("{s} ({reason})"),
445 (s, _) => s.clone(),
446 };
447 rows.push(("status", status_value));
448
449 let tuning_value = match self.batch_size_memory_mb {
450 Some(mem) => format!(
451 "profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
452 self.tuning_profile,
453 fmt_thousands(self.batch_size as i64),
454 mem
455 ),
456 None => format!(
457 "profile={}, batch_size={}",
458 self.tuning_profile,
459 fmt_thousands(self.batch_size as i64)
460 ),
461 };
462 rows.push(("tuning", tuning_value));
463
464 rows.push(("rows", fmt_thousands(self.total_rows)));
465 rows.push(("files", fmt_thousands(self.files_produced as i64)));
466 if self.files_produced > 0
470 && let Some(uri) = &self.destination_uri
471 {
472 rows.push(("output", uri.clone()));
473 }
474 if let Some(pos) = incremental_position_line(self.skip_reason.as_deref()) {
481 rows.push(("cursor", pos));
482 } else if let Some(window) = time_window_skip_line(&self.mode, self.skip_reason.as_deref())
483 {
484 rows.push(("window", window));
494 }
495 if self.bytes_written > 0 {
496 rows.push(("bytes", format_bytes(self.bytes_written)));
497 }
498 rows.push(("duration", fmt_duration_ms(self.duration_ms)));
499
500 if self.peak_rss_mb > 0 {
501 rows.push((
502 "peak RSS",
503 format!(
504 "{} MB (sampled during run)",
505 fmt_thousands(self.peak_rss_mb)
506 ),
507 ));
508 }
509 if let Some(temp) = self.pg_temp_bytes_delta {
510 if temp > 0 {
514 let temp_mb = temp as f64 / (1024.0 * 1024.0);
515 let label = if temp > 100 * 1024 * 1024 {
516 format!(
517 "{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
518 temp_mb
519 )
520 } else {
521 format!("{:.1} MB", temp_mb)
522 };
523 rows.push(("pg temp spill", label));
524 }
525 }
526 if self.format == "parquet" && self.compression != "zstd" {
527 rows.push(("compression", self.compression.clone()));
528 }
529 if self.retries > 0 {
530 rows.push(("retries", self.retries.to_string()));
531 }
532 if let Some(v) = self.validated {
533 rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
534 }
535 if let Some(sc) = self.schema_changed {
536 rows.push((
537 "schema",
538 if sc {
539 "CHANGED".into()
540 } else {
541 "unchanged".into()
542 },
543 ));
544 }
545 if let Some(q) = self.quality_passed {
546 rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
547 }
548 if let Some(reconciled) = self.reconciled {
549 let src = self
550 .source_count
551 .map(fmt_thousands)
552 .unwrap_or_else(|| "?".into());
553 let exported = fmt_thousands(self.total_rows);
554 let value = if reconciled {
555 format!("MATCH ({exported}/{src})")
556 } else {
557 format!("MISMATCH (exported {exported} vs source {src})")
558 };
559 rows.push(("reconcile", value));
560 }
561 if let Some(err) = &self.error_message {
562 rows.push(("error", err.trim_end().to_string()));
568 }
569
570 format_block(&self.export_name, &rows)
571 }
572
573 pub fn check_post_run_invariants(&self) -> Result<(), String> {
589 let parts_bytes: u64 = self.manifest_parts.iter().map(|p| p.size_bytes).sum();
590
591 if self.files_committed > self.manifest_parts.len() {
592 return Err(format!(
593 "summary.files_committed ({}) > manifest_parts.len() ({}) — \
594 a runner bumped files_committed without commit::record_part",
595 self.files_committed,
596 self.manifest_parts.len()
597 ));
598 }
599 if self.files_produced > self.manifest_parts.len() {
600 return Err(format!(
601 "summary.files_produced ({}) > manifest_parts.len() ({}) — \
602 a runner bumped files_produced without commit::record_part",
603 self.files_produced,
604 self.manifest_parts.len()
605 ));
606 }
607 if self.bytes_written > parts_bytes {
608 return Err(format!(
609 "summary.bytes_written ({}) > sum(manifest_parts.size_bytes) ({}) — \
610 a runner bumped bytes_written without commit::record_part",
611 self.bytes_written, parts_bytes
612 ));
613 }
614 if self.status == "success" && self.files_committed > 0 && self.manifest_parts.is_empty() {
615 return Err(format!(
616 "success run with files_committed={} has empty manifest_parts — \
617 cloud manifest (ADR-0012 M1) would ship with no part list \
618 (this is the gap parallel_checkpoint had before commit e9b0796)",
619 self.files_committed
620 ));
621 }
622 if self.status == "success" && self.total_rows > 0 && self.files_committed == 0 {
634 return Err(format!(
635 "summary.total_rows={} but files_committed=0 — rows extracted from \
636 source but no files committed (no output reached the destination)",
637 self.total_rows
638 ));
639 }
640 Ok(())
641 }
642}
643
644fn compact_error(raw: &str) -> String {
656 const MAX_CHARS: usize = 240;
657 if let Some(summary) = summarize_parallel_chunk_errors(raw) {
658 return clamp_chars(&summary, MAX_CHARS);
659 }
660 let collapsed: String = raw
661 .lines()
662 .map(str::trim_end)
663 .filter(|s| !s.is_empty())
664 .collect::<Vec<_>>()
665 .join("; ");
666 clamp_chars(&collapsed, MAX_CHARS)
667}
668
669fn incremental_position_line(skip_reason: Option<&str>) -> Option<String> {
677 let col = skip_reason?
678 .strip_prefix("no new rows since cursor '")?
679 .strip_suffix('\'')?;
680 Some(format!("'{col}' unchanged (no new rows this run)"))
681}
682
683fn time_window_skip_line(mode: &str, skip_reason: Option<&str>) -> Option<String> {
700 skip_reason?;
701 if mode != "timewindow" {
702 return None;
703 }
704 Some("rolling time window matched no rows — check `time_column`/`days_window`".to_string())
705}
706
707fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
708 let header_pos = raw.find("parallel checkpoint worker errors:")?;
709 let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
710 let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
711
712 let chunk_lines: Vec<&str> = tail
713 .lines()
714 .map(str::trim)
715 .filter(|l| l.starts_with("chunk "))
716 .collect();
717 if chunk_lines.is_empty() {
718 return None;
719 }
720 let first_chunk_full = chunk_lines[0];
721 let first_chunk_short = clamp_chars(first_chunk_full, 140);
723 let prefix = if prefix.is_empty() {
724 String::new()
725 } else {
726 format!("{}: ", prefix)
727 };
728 Some(format!(
729 "{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
730 prefix,
731 chunk_lines.len(),
732 first_chunk_short
733 ))
734}
735
736fn clamp_chars(s: &str, max_chars: usize) -> String {
737 if max_chars == 0 {
738 return String::new();
739 }
740 if s.chars().count() <= max_chars {
741 return s.to_string();
742 }
743 let keep = max_chars.saturating_sub(1);
744 let mut out: String = s.chars().take(keep).collect();
745 out.push('…');
746 out
747}
748
749fn format_block(name: &str, rows: &[(&str, String)]) -> String {
752 const HEADER_WIDTH: usize = 60;
753 let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
754
755 let prefix = format!("── {} ", name);
756 let prefix_chars = prefix.chars().count();
757 let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
758 let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
759 out.push('\n');
760 out.push_str(&prefix);
761 for _ in 0..dashes {
762 out.push('─');
763 }
764 out.push('\n');
765 let value_indent = " ".repeat(2 + (label_w + 1) + 2);
769 for (label, value) in rows {
770 let mut lines = value.split('\n');
773 let first = lines.next().unwrap_or("");
774 out.push_str(&format!(
775 " {:<width$} {}\n",
776 format!("{label}:"),
777 first,
778 width = label_w + 1
779 ));
780 for cont in lines {
781 out.push_str(&value_indent);
782 out.push_str(cont);
783 out.push('\n');
784 }
785 }
786 out
787}
788
789fn fmt_duration_ms(ms: i64) -> String {
790 if ms < 1000 {
791 return format!("{}ms", ms);
792 }
793 let total_secs = ms / 1000;
794 let h = total_secs / 3600;
795 let m = (total_secs % 3600) / 60;
796 let s_frac = (ms % 60_000) as f64 / 1000.0;
797 if h > 0 {
798 format!("{}h {:02}m {:04.1}s", h, m, s_frac)
799 } else if m > 0 {
800 format!("{}m {:04.1}s", m, s_frac)
801 } else {
802 format!("{:.1}s", ms as f64 / 1000.0)
803 }
804}
805
806fn fmt_thousands(n: i64) -> String {
810 let abs = n.unsigned_abs();
811 let s = abs.to_string();
812 let bytes = s.as_bytes();
813 let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
814 if n < 0 {
815 out.push('-');
816 }
817 for (i, b) in bytes.iter().enumerate() {
818 let from_end = bytes.len() - i;
819 if i > 0 && from_end.is_multiple_of(3) {
820 out.push(',');
821 }
822 out.push(*b as char);
823 }
824 out
825}
826
827#[cfg(test)]
828mod tests {
829 use super::*;
830
831 #[test]
832 fn fmt_thousands_handles_small_and_large() {
833 assert_eq!(fmt_thousands(0), "0");
834 assert_eq!(fmt_thousands(7), "7");
835 assert_eq!(fmt_thousands(999), "999");
836 assert_eq!(fmt_thousands(1_000), "1,000");
837 assert_eq!(fmt_thousands(1_000_908), "1,000,908");
838 assert_eq!(fmt_thousands(39_990_376), "39,990,376");
839 assert_eq!(fmt_thousands(-1_234), "-1,234");
840 assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
841 }
842
843 #[test]
844 fn fmt_duration_picks_unit() {
845 assert_eq!(fmt_duration_ms(0), "0ms");
846 assert_eq!(fmt_duration_ms(800), "800ms");
847 assert_eq!(fmt_duration_ms(1_500), "1.5s");
848 assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
849 assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
850 }
851
852 #[test]
853 fn format_block_pads_labels_uniformly() {
854 let rows = vec![
855 ("run_id", "abc".to_string()),
856 ("rows", "42".to_string()),
857 ("compression", "zstd".to_string()),
858 ];
859 let out = format_block("orders", &rows);
860
861 let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
863 assert_eq!(lines.len(), 3);
864 let value_starts: Vec<usize> = lines
865 .iter()
866 .map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
867 .collect();
868 let value_col = lines[0].rfind("abc").unwrap();
872 assert_eq!(lines[1].rfind("42").unwrap(), value_col);
873 assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
874 let _ = value_starts;
876 }
877
878 #[test]
879 fn format_block_header_has_consistent_width() {
880 let block_a = format_block("a", &[("rows", "1".into())]);
881 let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
882 let header_a = block_a.lines().nth(1).unwrap();
883 let header_b = block_b.lines().nth(1).unwrap();
884 assert_eq!(
885 header_a.chars().count(),
886 header_b.chars().count(),
887 "headers must be the same width regardless of name length: {:?} vs {:?}",
888 header_a,
889 header_b
890 );
891 }
892
893 #[test]
894 fn render_produces_a_single_string_with_trailing_newline() {
895 use crate::plan::{
896 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
897 MetaColumns, ResolvedRunPlan,
898 };
899 use crate::tuning::SourceTuning;
900 let plan = ResolvedRunPlan {
901 export_name: "orders".into(),
902 base_query: "SELECT 1".into(),
903 strategy: ExtractionStrategy::Snapshot,
904 format: FormatType::Parquet,
905 compression: CompressionType::default(),
906 compression_level: None,
907 max_file_size_bytes: None,
908 skip_empty: false,
909 meta_columns: MetaColumns::default(),
910 destination: DestinationConfig {
911 destination_type: DestinationType::Local,
912 path: Some("./out".into()),
913 ..Default::default()
914 },
915 quality: None,
916 tuning: SourceTuning::from_config(None),
917 tuning_profile_label: "balanced (default)".into(),
918 validate: false,
919 reconcile: false,
920 resume: false,
921 source: crate::config::SourceConfig {
922 source_type: crate::config::SourceType::Postgres,
923 url: Some("postgresql://localhost/test".into()),
924 url_env: None,
925 url_file: None,
926 host: None,
927 port: None,
928 user: None,
929 password: None,
930 password_env: None,
931 database: None,
932 environment: None,
933 tuning: None,
934 tls: None,
935 },
936 column_overrides: Default::default(),
937 verify: crate::config::VerifyMode::Size,
938 schema_drift_policy: Default::default(),
939 shape_drift_warn_factor: 2.0,
940 parquet: None,
941 };
942 let mut s = RunSummary::new(&plan);
943 s.status = "success".into();
944 s.total_rows = 1_000_908;
945 s.files_produced = 11;
946 s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
947 s.duration_ms = 68_400;
948 s.peak_rss_mb = 884;
949
950 let block = s.render();
951 assert!(
952 block.starts_with('\n'),
953 "block should start with a blank line"
954 );
955 assert!(block.ends_with('\n'), "block should end with a newline");
956 assert!(block.contains("── orders "));
957 assert!(
958 block.contains("1,000,908"),
959 "rows should be formatted with thousands separator: {}",
960 block
961 );
962 assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
963 assert!(!block.contains('\r'));
966
967 let line = s.render_compact();
969 assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
970 assert!(line.contains("orders"), "export name present: {:?}", line);
971 assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
972 assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
973 assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
974 assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
975 assert!(!line.contains('\n'), "single line: {:?}", line);
976 }
977
978 #[test]
979 fn compact_error_summarises_parallel_chunk_errors() {
980 let raw = "export 'page_views': parallel checkpoint worker errors:\n\
981 chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
982 chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
983 let out = compact_error(raw);
984 assert!(
985 out.contains("2 chunk(s)"),
986 "should report number of failed chunks: {:?}",
987 out
988 );
989 assert!(
990 out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
991 "should keep export prefix and use compact phrasing: {:?}",
992 out
993 );
994 assert!(
995 out.contains("chunk 4:"),
996 "should include the first chunk as an example: {:?}",
997 out
998 );
999 assert!(!out.contains('\n'), "single line output: {:?}", out);
1000 assert!(
1001 out.chars().count() <= 240,
1002 "must be clamped to <=240 chars, got {}: {:?}",
1003 out.chars().count(),
1004 out
1005 );
1006 }
1007
1008 #[test]
1009 fn compact_error_collapses_generic_multiline() {
1010 let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
1011 let out = compact_error(raw);
1012 assert_eq!(
1013 out, "first line of trouble; second line with detail; third line",
1014 "newlines should collapse to '; ' and blanks dropped"
1015 );
1016 }
1017
1018 #[test]
1019 fn compact_error_clamps_excessively_long_lines() {
1020 let raw = "x".repeat(1_000);
1021 let out = compact_error(&raw);
1022 assert_eq!(out.chars().count(), 240);
1023 assert!(out.ends_with('…'));
1024 }
1025
1026 #[test]
1027 fn render_compact_strips_chunked_recovery_hint_for_failed() {
1028 use crate::plan::{
1029 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1030 MetaColumns, ResolvedRunPlan,
1031 };
1032 use crate::tuning::SourceTuning;
1033 let plan = ResolvedRunPlan {
1034 export_name: "events".into(),
1035 base_query: "SELECT 1".into(),
1036 strategy: ExtractionStrategy::Snapshot,
1037 format: FormatType::Parquet,
1038 compression: CompressionType::default(),
1039 compression_level: None,
1040 max_file_size_bytes: None,
1041 skip_empty: false,
1042 meta_columns: MetaColumns::default(),
1043 destination: DestinationConfig {
1044 destination_type: DestinationType::Local,
1045 path: Some("./out".into()),
1046 ..Default::default()
1047 },
1048 quality: None,
1049 tuning: SourceTuning::from_config(None),
1050 tuning_profile_label: "balanced (default)".into(),
1051 validate: false,
1052 reconcile: false,
1053 resume: false,
1054 source: crate::config::SourceConfig {
1055 source_type: crate::config::SourceType::Postgres,
1056 url: Some("postgresql://localhost/test".into()),
1057 url_env: None,
1058 url_file: None,
1059 host: None,
1060 port: None,
1061 user: None,
1062 password: None,
1063 password_env: None,
1064 database: None,
1065 environment: None,
1066 tuning: None,
1067 tls: None,
1068 },
1069 column_overrides: Default::default(),
1070 verify: crate::config::VerifyMode::Size,
1071 schema_drift_policy: Default::default(),
1072 shape_drift_warn_factor: 2.0,
1073 parquet: None,
1074 };
1075 let mut s = RunSummary::new(&plan);
1076 s.status = "failed".into();
1077 s.error_message = Some(
1078 "export 'events': --resume but no in-progress chunk checkpoint; \
1079 run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
1080 .to_string(),
1081 );
1082
1083 let line = s.render_compact();
1084 assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
1085 assert!(line.contains("events"), "name present: {:?}", line);
1086 assert!(
1087 line.contains("--resume but no in-progress chunk checkpoint"),
1088 "cause kept: {:?}",
1089 line
1090 );
1091 assert!(
1092 !line.contains("rivet state reset-chunks"),
1093 "recovery hint should be stripped from per-export line: {:?}",
1094 line
1095 );
1096 assert!(!line.contains('\n'), "single line: {:?}", line);
1097 }
1098
1099 fn plan_for(export_name: &str) -> crate::plan::ResolvedRunPlan {
1100 use crate::plan::{
1101 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1102 MetaColumns, ResolvedRunPlan,
1103 };
1104 use crate::tuning::SourceTuning;
1105 ResolvedRunPlan {
1106 export_name: export_name.into(),
1107 base_query: "SELECT 1".into(),
1108 strategy: ExtractionStrategy::Snapshot,
1109 format: FormatType::Parquet,
1110 compression: CompressionType::default(),
1111 compression_level: None,
1112 max_file_size_bytes: None,
1113 skip_empty: false,
1114 meta_columns: MetaColumns::default(),
1115 destination: DestinationConfig {
1116 destination_type: DestinationType::Local,
1117 path: Some("./out".into()),
1118 ..Default::default()
1119 },
1120 quality: None,
1121 tuning: SourceTuning::from_config(None),
1122 tuning_profile_label: "balanced (default)".into(),
1123 validate: false,
1124 reconcile: false,
1125 resume: false,
1126 source: crate::config::SourceConfig {
1127 source_type: crate::config::SourceType::Postgres,
1128 url: Some("postgresql://localhost/test".into()),
1129 url_env: None,
1130 url_file: None,
1131 host: None,
1132 port: None,
1133 user: None,
1134 password: None,
1135 password_env: None,
1136 database: None,
1137 environment: None,
1138 tuning: None,
1139 tls: None,
1140 },
1141 column_overrides: Default::default(),
1142 verify: crate::config::VerifyMode::Size,
1143 schema_drift_policy: Default::default(),
1144 shape_drift_warn_factor: 2.0,
1145 parquet: None,
1146 }
1147 }
1148
1149 #[test]
1150 fn render_preserves_multiline_error_block() {
1151 let mut s = RunSummary::new(&plan_for("orders"));
1155 s.status = "failed".into();
1156 s.error_message = Some(
1157 "export 'orders': 1 quality check(s) failed:\n \
1158 - row_count 10 below minimum 999999\n \
1159 Fix the source data, or adjust the thresholds under `quality:` in your config."
1160 .to_string(),
1161 );
1162
1163 let block = s.render();
1164 assert!(
1167 !block.contains("failed:;"),
1168 "error must not be '; '-flattened in the detailed block: {block}"
1169 );
1170 assert!(
1171 block.contains("- row_count 10 below minimum 999999"),
1172 "failing check line present: {block}"
1173 );
1174 let err_lines: Vec<&str> = block
1176 .lines()
1177 .filter(|l| {
1178 l.contains("quality check(s) failed")
1179 || l.contains("row_count 10 below minimum")
1180 || l.contains("Fix the source data")
1181 })
1182 .collect();
1183 assert_eq!(
1184 err_lines.len(),
1185 3,
1186 "all three error lines should render on separate lines: {block}"
1187 );
1188 for l in &err_lines {
1190 assert!(l.starts_with(' '), "error line should be indented: {l:?}");
1191 }
1192 }
1193
1194 #[test]
1195 fn render_surfaces_cursor_position_on_zero_new_incremental() {
1196 let mut s = RunSummary::new(&plan_for("orders"));
1200 s.status = "skipped".into();
1201 s.skip_reason = Some("no new rows since cursor 'updated_at'".into());
1202
1203 let block = s.render();
1204 let cursor_line = block
1205 .lines()
1206 .find(|l| l.trim_start().starts_with("cursor:"))
1207 .unwrap_or_else(|| panic!("expected a cursor: line in block: {block}"));
1208 assert!(
1209 cursor_line.contains("'updated_at'"),
1210 "cursor line names the column: {cursor_line:?}"
1211 );
1212 assert!(
1213 cursor_line.contains("unchanged"),
1214 "cursor line reports the position held: {cursor_line:?}"
1215 );
1216 }
1217
1218 #[test]
1219 fn incremental_position_line_only_for_cursor_skips() {
1220 assert_eq!(
1222 incremental_position_line(Some("no new rows since cursor 'ts'")),
1223 Some("'ts' unchanged (no new rows this run)".into())
1224 );
1225 assert_eq!(
1226 incremental_position_line(Some("source returned 0 rows")),
1227 None
1228 );
1229 assert_eq!(incremental_position_line(None), None);
1230 }
1231
1232 #[test]
1233 fn render_surfaces_window_position_on_zero_row_time_window() {
1234 let mut s = RunSummary::new(&plan_for("events"));
1240 s.status = "skipped".into();
1241 s.mode = "timewindow".into();
1242 s.skip_reason = Some("source returned 0 rows".into());
1243
1244 let block = s.render();
1245 let window_line = block
1246 .lines()
1247 .find(|l| l.trim_start().starts_with("window:"))
1248 .unwrap_or_else(|| panic!("expected a window: line in block: {block}"));
1249 assert!(
1250 window_line.contains("matched no rows"),
1251 "window line reports the empty window: {window_line:?}"
1252 );
1253 assert!(
1254 window_line.contains("time_column") && window_line.contains("days_window"),
1255 "window line points at the window config to check: {window_line:?}"
1256 );
1257 assert!(
1259 !block.lines().any(|l| l.trim_start().starts_with("cursor:")),
1260 "no cursor line for a non-cursor strategy: {block}"
1261 );
1262 }
1263
1264 #[test]
1265 fn time_window_skip_line_only_for_skipped_time_window() {
1266 assert_eq!(
1268 time_window_skip_line("timewindow", Some("source returned 0 rows")),
1269 Some("rolling time window matched no rows — check `time_column`/`days_window`".into())
1270 );
1271 assert_eq!(
1273 time_window_skip_line("incremental", Some("source returned 0 rows")),
1274 None
1275 );
1276 assert_eq!(
1277 time_window_skip_line("full", Some("source returned 0 rows")),
1278 None
1279 );
1280 assert_eq!(time_window_skip_line("timewindow", None), None);
1282 }
1283}