1use super::ipc::{self, ChildEvent};
18use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
19use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
20use crate::manifest::ManifestPart;
21use crate::plan::ResolvedRunPlan;
22
23fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
29 PlanSnapshot {
30 export_name: plan.export_name.clone(),
31 base_query: plan.base_query.clone(),
32 strategy: plan.strategy.mode_label().to_string(),
33 format: plan.format.label().to_string(),
34 compression: plan.compression.label().to_string(),
35 destination_type: plan.destination.destination_type.label().to_string(),
36 tuning_profile: plan.tuning_profile_label.clone(),
37 batch_size: plan.tuning.batch_size,
38 validate: plan.validate,
39 reconcile: plan.reconcile,
40 resume: plan.resume,
41 }
42}
43
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct ApplyContext {
53 pub plan_id: String,
55 pub forced: bool,
57 pub force_bypassed: Vec<String>,
62}
63
64#[derive(Debug, Clone)]
70pub struct RunSummary {
71 pub run_id: String,
72 pub export_name: String,
73 pub status: String,
74 pub total_rows: i64,
75 pub files_produced: usize,
76 pub bytes_written: u64,
77 pub files_committed: usize,
80 pub duration_ms: i64,
81 pub peak_rss_mb: i64,
82 pub retries: u32,
83 pub validated: Option<bool>,
84 pub schema_changed: Option<bool>,
85 pub quality_passed: Option<bool>,
86 pub error_message: Option<String>,
87 pub tuning_profile: String,
89 pub batch_size: usize,
91 pub batch_size_memory_mb: Option<usize>,
93 pub format: String,
94 pub mode: String,
95 pub compression: String,
96 pub pg_temp_bytes_delta: Option<i64>,
103 pub skip_reason: Option<String>,
109 pub source_count: Option<i64>,
111 pub reconciled: Option<bool>,
113 pub manifest_parts: Vec<ManifestPart>,
118 pub schema_fingerprint: Option<String>,
132 pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
138 pub apply_context: Option<ApplyContext>,
142 pub journal: RunJournal,
144}
145
146impl RunSummary {
147 pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
148 let run_id = format!(
149 "{}_{}",
150 plan.export_name,
151 chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
152 );
153 let mut journal = RunJournal::new(&run_id, &plan.export_name);
154 journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
155
156 ipc::emit_event(&ChildEvent::Started {
157 export_name: plan.export_name.clone(),
158 run_id: run_id.clone(),
159 mode: plan.strategy.mode_label().to_string(),
160 tuning_profile: plan.tuning_profile_label.clone(),
161 batch_size: plan.tuning.batch_size,
162 });
163
164 Self {
165 run_id,
166 export_name: plan.export_name.clone(),
167 status: "running".into(),
168 total_rows: 0,
169 files_produced: 0,
170 bytes_written: 0,
171 files_committed: 0,
172 duration_ms: 0,
173 peak_rss_mb: 0,
174 retries: 0,
175 validated: None,
176 schema_changed: None,
177 quality_passed: None,
178 error_message: None,
179 tuning_profile: plan.tuning_profile_label.clone(),
180 batch_size: plan.tuning.batch_size,
181 batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
182 format: plan.format.label().to_string(),
183 mode: plan.strategy.mode_label().to_string(),
184 compression: plan.compression.label().to_string(),
185 pg_temp_bytes_delta: None,
186 skip_reason: None,
187 source_count: None,
188 reconciled: None,
189 manifest_parts: Vec::new(),
190 schema_fingerprint: None,
191 manifest_verification: None,
192 apply_context: None,
193 journal,
194 }
195 }
196
197 #[doc(hidden)]
218 #[allow(dead_code)]
219 pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
220 let run_id = run_id.into();
221 let export_name = export_name.into();
222 let journal = RunJournal::new(&run_id, &export_name);
223 Self {
224 run_id,
225 export_name,
226 status: "running".into(),
227 total_rows: 0,
228 files_produced: 0,
229 bytes_written: 0,
230 files_committed: 0,
231 duration_ms: 0,
232 peak_rss_mb: 0,
233 retries: 0,
234 validated: None,
235 schema_changed: None,
236 quality_passed: None,
237 error_message: None,
238 tuning_profile: "balanced".into(),
239 batch_size: 1000,
240 batch_size_memory_mb: None,
241 format: "parquet".into(),
242 mode: "snapshot".into(),
243 compression: "zstd".into(),
244 pg_temp_bytes_delta: None,
245 skip_reason: None,
246 source_count: None,
247 reconciled: None,
248 manifest_parts: Vec::new(),
249 schema_fingerprint: None,
250 manifest_verification: None,
251 apply_context: None,
252 journal,
253 }
254 }
255
256 #[doc(hidden)]
263 #[allow(dead_code)]
264 pub fn with_status(mut self, status: impl Into<String>) -> Self {
265 let s = status.into();
266 if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
267 self.journal.record(RunEvent::RunCompleted {
268 status: s.clone(),
269 error_message: self.error_message.clone(),
270 duration_ms: self.duration_ms,
271 });
272 }
273 self.status = s;
274 self
275 }
276
277 #[doc(hidden)]
281 #[allow(dead_code)]
282 pub fn with_files_committed(mut self, n: usize) -> Self {
283 self.files_committed = n;
284 self
285 }
286
287 #[doc(hidden)]
291 #[allow(dead_code)]
292 pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
293 self.total_rows = parts.iter().map(|p| p.rows).sum();
294 self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
295 self.files_produced = parts.len();
296 self.files_committed = parts.len();
297 self.manifest_parts = parts;
298 self
299 }
300
301 #[doc(hidden)]
305 #[allow(dead_code)]
306 pub fn with_error(mut self, msg: impl Into<String>) -> Self {
307 self.error_message = Some(msg.into());
308 self
309 }
310
311 #[doc(hidden)]
316 #[allow(dead_code)]
317 pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
318 self.journal.record(RunEvent::PlanResolved(snap));
319 self
320 }
321
322 pub(super) fn print(&self) {
323 if ipc::capturing_events() {
327 ipc::emit_event(&ChildEvent::Finished {
328 export_name: self.export_name.clone(),
329 run_id: self.run_id.clone(),
330 status: self.status.clone(),
331 total_rows: self.total_rows,
332 files_produced: self.files_produced as u64,
333 bytes_written: self.bytes_written,
334 duration_ms: self.duration_ms,
335 peak_rss_mb: self.peak_rss_mb,
336 error_message: self.error_message.clone(),
337 });
338 return;
339 }
340
341 self.print_stderr_block();
342 }
343
344 pub(super) fn print_stderr_block(&self) {
349 let block = if multi_export_mode() {
350 self.render_compact()
351 } else {
352 self.render().trim_end_matches('\n').to_string()
358 };
359
360 use std::io::Write;
361 let mut buf = super::parent_ui::sanitize_terminal(&block);
368 buf.push('\n');
369 let stderr = std::io::stderr();
370 let mut handle = stderr.lock();
371 let _ = handle.write_all(buf.as_bytes());
372 let _ = handle.flush();
373 }
374
375 fn render_compact(&self) -> String {
380 const NAME_COL: usize = 22;
381 const MODE_COL: usize = 8;
382 let icon = match self.status.as_str() {
383 "success" => "✓",
384 "failed" => "✗",
385 _ => "•",
386 };
387 let body = if self.status == "failed" {
388 let err = self
389 .error_message
390 .as_deref()
391 .unwrap_or("(no error message recorded)");
392 let (cause, _) = strip_chunked_recovery_hint(err);
393 compact_error(cause)
397 } else {
398 let rss = if self.peak_rss_mb > 0 {
399 format!(" RSS {} MB", fmt_thousands(self.peak_rss_mb))
400 } else {
401 String::new()
402 };
403 format!(
404 "{} rows {} files {} {}{}",
405 fmt_thousands(self.total_rows),
406 fmt_thousands(self.files_produced as i64),
407 format_bytes(self.bytes_written),
408 fmt_duration_ms(self.duration_ms),
409 rss
410 )
411 };
412 format!(
413 "{} {:<name$} {:<mode$} {}",
414 icon,
415 self.export_name,
416 self.mode,
417 body,
418 name = NAME_COL,
419 mode = MODE_COL,
420 )
421 }
422
423 fn render(&self) -> String {
426 let mut rows: Vec<(&'static str, String)> = Vec::with_capacity(16);
431 rows.push(("run_id", self.run_id.clone()));
432 let status_value = match (&self.status, &self.skip_reason) {
433 (s, Some(reason)) if s == "skipped" => format!("{s} ({reason})"),
434 (s, _) => s.clone(),
435 };
436 rows.push(("status", status_value));
437
438 let tuning_value = match self.batch_size_memory_mb {
439 Some(mem) => format!(
440 "profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
441 self.tuning_profile,
442 fmt_thousands(self.batch_size as i64),
443 mem
444 ),
445 None => format!(
446 "profile={}, batch_size={}",
447 self.tuning_profile,
448 fmt_thousands(self.batch_size as i64)
449 ),
450 };
451 rows.push(("tuning", tuning_value));
452
453 rows.push(("rows", fmt_thousands(self.total_rows)));
454 rows.push(("files", fmt_thousands(self.files_produced as i64)));
455 if let Some(pos) = incremental_position_line(self.skip_reason.as_deref()) {
462 rows.push(("cursor", pos));
463 } else if let Some(window) = time_window_skip_line(&self.mode, self.skip_reason.as_deref())
464 {
465 rows.push(("window", window));
475 }
476 if self.bytes_written > 0 {
477 rows.push(("bytes", format_bytes(self.bytes_written)));
478 }
479 rows.push(("duration", fmt_duration_ms(self.duration_ms)));
480
481 if self.peak_rss_mb > 0 {
482 rows.push((
483 "peak RSS",
484 format!(
485 "{} MB (sampled during run)",
486 fmt_thousands(self.peak_rss_mb)
487 ),
488 ));
489 }
490 if let Some(temp) = self.pg_temp_bytes_delta {
491 if temp > 0 {
495 let temp_mb = temp as f64 / (1024.0 * 1024.0);
496 let label = if temp > 100 * 1024 * 1024 {
497 format!(
498 "{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
499 temp_mb
500 )
501 } else {
502 format!("{:.1} MB", temp_mb)
503 };
504 rows.push(("pg temp spill", label));
505 }
506 }
507 if self.format == "parquet" && self.compression != "zstd" {
508 rows.push(("compression", self.compression.clone()));
509 }
510 if self.retries > 0 {
511 rows.push(("retries", self.retries.to_string()));
512 }
513 if let Some(v) = self.validated {
514 rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
515 }
516 if let Some(sc) = self.schema_changed {
517 rows.push((
518 "schema",
519 if sc {
520 "CHANGED".into()
521 } else {
522 "unchanged".into()
523 },
524 ));
525 }
526 if let Some(q) = self.quality_passed {
527 rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
528 }
529 if let Some(reconciled) = self.reconciled {
530 let src = self
531 .source_count
532 .map(fmt_thousands)
533 .unwrap_or_else(|| "?".into());
534 let exported = fmt_thousands(self.total_rows);
535 let value = if reconciled {
536 format!("MATCH ({exported}/{src})")
537 } else {
538 format!("MISMATCH (exported {exported} vs source {src})")
539 };
540 rows.push(("reconcile", value));
541 }
542 if let Some(err) = &self.error_message {
543 rows.push(("error", err.trim_end().to_string()));
549 }
550
551 format_block(&self.export_name, &rows)
552 }
553
554 pub fn check_post_run_invariants(&self) -> Result<(), String> {
570 let parts_bytes: u64 = self.manifest_parts.iter().map(|p| p.size_bytes).sum();
571
572 if self.files_committed > self.manifest_parts.len() {
573 return Err(format!(
574 "summary.files_committed ({}) > manifest_parts.len() ({}) — \
575 a runner bumped files_committed without commit::record_part",
576 self.files_committed,
577 self.manifest_parts.len()
578 ));
579 }
580 if self.files_produced > self.manifest_parts.len() {
581 return Err(format!(
582 "summary.files_produced ({}) > manifest_parts.len() ({}) — \
583 a runner bumped files_produced without commit::record_part",
584 self.files_produced,
585 self.manifest_parts.len()
586 ));
587 }
588 if self.bytes_written > parts_bytes {
589 return Err(format!(
590 "summary.bytes_written ({}) > sum(manifest_parts.size_bytes) ({}) — \
591 a runner bumped bytes_written without commit::record_part",
592 self.bytes_written, parts_bytes
593 ));
594 }
595 if self.status == "success" && self.files_committed > 0 && self.manifest_parts.is_empty() {
596 return Err(format!(
597 "success run with files_committed={} has empty manifest_parts — \
598 cloud manifest (ADR-0012 M1) would ship with no part list \
599 (this is the gap parallel_checkpoint had before commit e9b0796)",
600 self.files_committed
601 ));
602 }
603 if self.status == "success" && self.total_rows > 0 && self.files_committed == 0 {
615 return Err(format!(
616 "summary.total_rows={} but files_committed=0 — rows extracted from \
617 source but no files committed (no output reached the destination)",
618 self.total_rows
619 ));
620 }
621 Ok(())
622 }
623}
624
625fn compact_error(raw: &str) -> String {
637 const MAX_CHARS: usize = 240;
638 if let Some(summary) = summarize_parallel_chunk_errors(raw) {
639 return clamp_chars(&summary, MAX_CHARS);
640 }
641 let collapsed: String = raw
642 .lines()
643 .map(str::trim_end)
644 .filter(|s| !s.is_empty())
645 .collect::<Vec<_>>()
646 .join("; ");
647 clamp_chars(&collapsed, MAX_CHARS)
648}
649
650fn incremental_position_line(skip_reason: Option<&str>) -> Option<String> {
658 let col = skip_reason?
659 .strip_prefix("no new rows since cursor '")?
660 .strip_suffix('\'')?;
661 Some(format!("'{col}' unchanged (no new rows this run)"))
662}
663
664fn time_window_skip_line(mode: &str, skip_reason: Option<&str>) -> Option<String> {
681 skip_reason?;
682 if mode != "timewindow" {
683 return None;
684 }
685 Some("rolling time window matched no rows — check `time_column`/`days_window`".to_string())
686}
687
688fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
689 let header_pos = raw.find("parallel checkpoint worker errors:")?;
690 let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
691 let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
692
693 let chunk_lines: Vec<&str> = tail
694 .lines()
695 .map(str::trim)
696 .filter(|l| l.starts_with("chunk "))
697 .collect();
698 if chunk_lines.is_empty() {
699 return None;
700 }
701 let first_chunk_full = chunk_lines[0];
702 let first_chunk_short = clamp_chars(first_chunk_full, 140);
704 let prefix = if prefix.is_empty() {
705 String::new()
706 } else {
707 format!("{}: ", prefix)
708 };
709 Some(format!(
710 "{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
711 prefix,
712 chunk_lines.len(),
713 first_chunk_short
714 ))
715}
716
717fn clamp_chars(s: &str, max_chars: usize) -> String {
718 if max_chars == 0 {
719 return String::new();
720 }
721 if s.chars().count() <= max_chars {
722 return s.to_string();
723 }
724 let keep = max_chars.saturating_sub(1);
725 let mut out: String = s.chars().take(keep).collect();
726 out.push('…');
727 out
728}
729
730fn format_block(name: &str, rows: &[(&str, String)]) -> String {
733 const HEADER_WIDTH: usize = 60;
734 let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
735
736 let prefix = format!("── {} ", name);
737 let prefix_chars = prefix.chars().count();
738 let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
739 let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
740 out.push('\n');
741 out.push_str(&prefix);
742 for _ in 0..dashes {
743 out.push('─');
744 }
745 out.push('\n');
746 let value_indent = " ".repeat(2 + (label_w + 1) + 2);
750 for (label, value) in rows {
751 let mut lines = value.split('\n');
754 let first = lines.next().unwrap_or("");
755 out.push_str(&format!(
756 " {:<width$} {}\n",
757 format!("{label}:"),
758 first,
759 width = label_w + 1
760 ));
761 for cont in lines {
762 out.push_str(&value_indent);
763 out.push_str(cont);
764 out.push('\n');
765 }
766 }
767 out
768}
769
770fn fmt_duration_ms(ms: i64) -> String {
771 if ms < 1000 {
772 return format!("{}ms", ms);
773 }
774 let total_secs = ms / 1000;
775 let h = total_secs / 3600;
776 let m = (total_secs % 3600) / 60;
777 let s_frac = (ms % 60_000) as f64 / 1000.0;
778 if h > 0 {
779 format!("{}h {:02}m {:04.1}s", h, m, s_frac)
780 } else if m > 0 {
781 format!("{}m {:04.1}s", m, s_frac)
782 } else {
783 format!("{:.1}s", ms as f64 / 1000.0)
784 }
785}
786
787fn fmt_thousands(n: i64) -> String {
791 let abs = n.unsigned_abs();
792 let s = abs.to_string();
793 let bytes = s.as_bytes();
794 let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
795 if n < 0 {
796 out.push('-');
797 }
798 for (i, b) in bytes.iter().enumerate() {
799 let from_end = bytes.len() - i;
800 if i > 0 && from_end.is_multiple_of(3) {
801 out.push(',');
802 }
803 out.push(*b as char);
804 }
805 out
806}
807
808#[cfg(test)]
809mod tests {
810 use super::*;
811
812 #[test]
813 fn fmt_thousands_handles_small_and_large() {
814 assert_eq!(fmt_thousands(0), "0");
815 assert_eq!(fmt_thousands(7), "7");
816 assert_eq!(fmt_thousands(999), "999");
817 assert_eq!(fmt_thousands(1_000), "1,000");
818 assert_eq!(fmt_thousands(1_000_908), "1,000,908");
819 assert_eq!(fmt_thousands(39_990_376), "39,990,376");
820 assert_eq!(fmt_thousands(-1_234), "-1,234");
821 assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
822 }
823
824 #[test]
825 fn fmt_duration_picks_unit() {
826 assert_eq!(fmt_duration_ms(0), "0ms");
827 assert_eq!(fmt_duration_ms(800), "800ms");
828 assert_eq!(fmt_duration_ms(1_500), "1.5s");
829 assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
830 assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
831 }
832
833 #[test]
834 fn format_block_pads_labels_uniformly() {
835 let rows = vec![
836 ("run_id", "abc".to_string()),
837 ("rows", "42".to_string()),
838 ("compression", "zstd".to_string()),
839 ];
840 let out = format_block("orders", &rows);
841
842 let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
844 assert_eq!(lines.len(), 3);
845 let value_starts: Vec<usize> = lines
846 .iter()
847 .map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
848 .collect();
849 let value_col = lines[0].rfind("abc").unwrap();
853 assert_eq!(lines[1].rfind("42").unwrap(), value_col);
854 assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
855 let _ = value_starts;
857 }
858
859 #[test]
860 fn format_block_header_has_consistent_width() {
861 let block_a = format_block("a", &[("rows", "1".into())]);
862 let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
863 let header_a = block_a.lines().nth(1).unwrap();
864 let header_b = block_b.lines().nth(1).unwrap();
865 assert_eq!(
866 header_a.chars().count(),
867 header_b.chars().count(),
868 "headers must be the same width regardless of name length: {:?} vs {:?}",
869 header_a,
870 header_b
871 );
872 }
873
874 #[test]
875 fn render_produces_a_single_string_with_trailing_newline() {
876 use crate::plan::{
877 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
878 MetaColumns, ResolvedRunPlan,
879 };
880 use crate::tuning::SourceTuning;
881 let plan = ResolvedRunPlan {
882 export_name: "orders".into(),
883 base_query: "SELECT 1".into(),
884 strategy: ExtractionStrategy::Snapshot,
885 format: FormatType::Parquet,
886 compression: CompressionType::default(),
887 compression_level: None,
888 max_file_size_bytes: None,
889 skip_empty: false,
890 meta_columns: MetaColumns::default(),
891 destination: DestinationConfig {
892 destination_type: DestinationType::Local,
893 path: Some("./out".into()),
894 ..Default::default()
895 },
896 quality: None,
897 tuning: SourceTuning::from_config(None),
898 tuning_profile_label: "balanced (default)".into(),
899 validate: false,
900 reconcile: false,
901 resume: false,
902 source: crate::config::SourceConfig {
903 source_type: crate::config::SourceType::Postgres,
904 url: Some("postgresql://localhost/test".into()),
905 url_env: None,
906 url_file: None,
907 host: None,
908 port: None,
909 user: None,
910 password: None,
911 password_env: None,
912 database: None,
913 environment: None,
914 tuning: None,
915 tls: None,
916 },
917 column_overrides: Default::default(),
918 verify: crate::config::VerifyMode::Size,
919 schema_drift_policy: Default::default(),
920 shape_drift_warn_factor: 2.0,
921 parquet: None,
922 };
923 let mut s = RunSummary::new(&plan);
924 s.status = "success".into();
925 s.total_rows = 1_000_908;
926 s.files_produced = 11;
927 s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
928 s.duration_ms = 68_400;
929 s.peak_rss_mb = 884;
930
931 let block = s.render();
932 assert!(
933 block.starts_with('\n'),
934 "block should start with a blank line"
935 );
936 assert!(block.ends_with('\n'), "block should end with a newline");
937 assert!(block.contains("── orders "));
938 assert!(
939 block.contains("1,000,908"),
940 "rows should be formatted with thousands separator: {}",
941 block
942 );
943 assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
944 assert!(!block.contains('\r'));
947
948 let line = s.render_compact();
950 assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
951 assert!(line.contains("orders"), "export name present: {:?}", line);
952 assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
953 assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
954 assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
955 assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
956 assert!(!line.contains('\n'), "single line: {:?}", line);
957 }
958
959 #[test]
960 fn compact_error_summarises_parallel_chunk_errors() {
961 let raw = "export 'page_views': parallel checkpoint worker errors:\n\
962 chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
963 chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
964 let out = compact_error(raw);
965 assert!(
966 out.contains("2 chunk(s)"),
967 "should report number of failed chunks: {:?}",
968 out
969 );
970 assert!(
971 out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
972 "should keep export prefix and use compact phrasing: {:?}",
973 out
974 );
975 assert!(
976 out.contains("chunk 4:"),
977 "should include the first chunk as an example: {:?}",
978 out
979 );
980 assert!(!out.contains('\n'), "single line output: {:?}", out);
981 assert!(
982 out.chars().count() <= 240,
983 "must be clamped to <=240 chars, got {}: {:?}",
984 out.chars().count(),
985 out
986 );
987 }
988
989 #[test]
990 fn compact_error_collapses_generic_multiline() {
991 let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
992 let out = compact_error(raw);
993 assert_eq!(
994 out, "first line of trouble; second line with detail; third line",
995 "newlines should collapse to '; ' and blanks dropped"
996 );
997 }
998
999 #[test]
1000 fn compact_error_clamps_excessively_long_lines() {
1001 let raw = "x".repeat(1_000);
1002 let out = compact_error(&raw);
1003 assert_eq!(out.chars().count(), 240);
1004 assert!(out.ends_with('…'));
1005 }
1006
1007 #[test]
1008 fn render_compact_strips_chunked_recovery_hint_for_failed() {
1009 use crate::plan::{
1010 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1011 MetaColumns, ResolvedRunPlan,
1012 };
1013 use crate::tuning::SourceTuning;
1014 let plan = ResolvedRunPlan {
1015 export_name: "events".into(),
1016 base_query: "SELECT 1".into(),
1017 strategy: ExtractionStrategy::Snapshot,
1018 format: FormatType::Parquet,
1019 compression: CompressionType::default(),
1020 compression_level: None,
1021 max_file_size_bytes: None,
1022 skip_empty: false,
1023 meta_columns: MetaColumns::default(),
1024 destination: DestinationConfig {
1025 destination_type: DestinationType::Local,
1026 path: Some("./out".into()),
1027 ..Default::default()
1028 },
1029 quality: None,
1030 tuning: SourceTuning::from_config(None),
1031 tuning_profile_label: "balanced (default)".into(),
1032 validate: false,
1033 reconcile: false,
1034 resume: false,
1035 source: crate::config::SourceConfig {
1036 source_type: crate::config::SourceType::Postgres,
1037 url: Some("postgresql://localhost/test".into()),
1038 url_env: None,
1039 url_file: None,
1040 host: None,
1041 port: None,
1042 user: None,
1043 password: None,
1044 password_env: None,
1045 database: None,
1046 environment: None,
1047 tuning: None,
1048 tls: None,
1049 },
1050 column_overrides: Default::default(),
1051 verify: crate::config::VerifyMode::Size,
1052 schema_drift_policy: Default::default(),
1053 shape_drift_warn_factor: 2.0,
1054 parquet: None,
1055 };
1056 let mut s = RunSummary::new(&plan);
1057 s.status = "failed".into();
1058 s.error_message = Some(
1059 "export 'events': --resume but no in-progress chunk checkpoint; \
1060 run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
1061 .to_string(),
1062 );
1063
1064 let line = s.render_compact();
1065 assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
1066 assert!(line.contains("events"), "name present: {:?}", line);
1067 assert!(
1068 line.contains("--resume but no in-progress chunk checkpoint"),
1069 "cause kept: {:?}",
1070 line
1071 );
1072 assert!(
1073 !line.contains("rivet state reset-chunks"),
1074 "recovery hint should be stripped from per-export line: {:?}",
1075 line
1076 );
1077 assert!(!line.contains('\n'), "single line: {:?}", line);
1078 }
1079
1080 fn plan_for(export_name: &str) -> crate::plan::ResolvedRunPlan {
1081 use crate::plan::{
1082 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1083 MetaColumns, ResolvedRunPlan,
1084 };
1085 use crate::tuning::SourceTuning;
1086 ResolvedRunPlan {
1087 export_name: export_name.into(),
1088 base_query: "SELECT 1".into(),
1089 strategy: ExtractionStrategy::Snapshot,
1090 format: FormatType::Parquet,
1091 compression: CompressionType::default(),
1092 compression_level: None,
1093 max_file_size_bytes: None,
1094 skip_empty: false,
1095 meta_columns: MetaColumns::default(),
1096 destination: DestinationConfig {
1097 destination_type: DestinationType::Local,
1098 path: Some("./out".into()),
1099 ..Default::default()
1100 },
1101 quality: None,
1102 tuning: SourceTuning::from_config(None),
1103 tuning_profile_label: "balanced (default)".into(),
1104 validate: false,
1105 reconcile: false,
1106 resume: false,
1107 source: crate::config::SourceConfig {
1108 source_type: crate::config::SourceType::Postgres,
1109 url: Some("postgresql://localhost/test".into()),
1110 url_env: None,
1111 url_file: None,
1112 host: None,
1113 port: None,
1114 user: None,
1115 password: None,
1116 password_env: None,
1117 database: None,
1118 environment: None,
1119 tuning: None,
1120 tls: None,
1121 },
1122 column_overrides: Default::default(),
1123 verify: crate::config::VerifyMode::Size,
1124 schema_drift_policy: Default::default(),
1125 shape_drift_warn_factor: 2.0,
1126 parquet: None,
1127 }
1128 }
1129
1130 #[test]
1131 fn render_preserves_multiline_error_block() {
1132 let mut s = RunSummary::new(&plan_for("orders"));
1136 s.status = "failed".into();
1137 s.error_message = Some(
1138 "export 'orders': 1 quality check(s) failed:\n \
1139 - row_count 10 below minimum 999999\n \
1140 Fix the source data, or adjust the thresholds under `quality:` in your config."
1141 .to_string(),
1142 );
1143
1144 let block = s.render();
1145 assert!(
1148 !block.contains("failed:;"),
1149 "error must not be '; '-flattened in the detailed block: {block}"
1150 );
1151 assert!(
1152 block.contains("- row_count 10 below minimum 999999"),
1153 "failing check line present: {block}"
1154 );
1155 let err_lines: Vec<&str> = block
1157 .lines()
1158 .filter(|l| {
1159 l.contains("quality check(s) failed")
1160 || l.contains("row_count 10 below minimum")
1161 || l.contains("Fix the source data")
1162 })
1163 .collect();
1164 assert_eq!(
1165 err_lines.len(),
1166 3,
1167 "all three error lines should render on separate lines: {block}"
1168 );
1169 for l in &err_lines {
1171 assert!(l.starts_with(' '), "error line should be indented: {l:?}");
1172 }
1173 }
1174
1175 #[test]
1176 fn render_surfaces_cursor_position_on_zero_new_incremental() {
1177 let mut s = RunSummary::new(&plan_for("orders"));
1181 s.status = "skipped".into();
1182 s.skip_reason = Some("no new rows since cursor 'updated_at'".into());
1183
1184 let block = s.render();
1185 let cursor_line = block
1186 .lines()
1187 .find(|l| l.trim_start().starts_with("cursor:"))
1188 .unwrap_or_else(|| panic!("expected a cursor: line in block: {block}"));
1189 assert!(
1190 cursor_line.contains("'updated_at'"),
1191 "cursor line names the column: {cursor_line:?}"
1192 );
1193 assert!(
1194 cursor_line.contains("unchanged"),
1195 "cursor line reports the position held: {cursor_line:?}"
1196 );
1197 }
1198
1199 #[test]
1200 fn incremental_position_line_only_for_cursor_skips() {
1201 assert_eq!(
1203 incremental_position_line(Some("no new rows since cursor 'ts'")),
1204 Some("'ts' unchanged (no new rows this run)".into())
1205 );
1206 assert_eq!(
1207 incremental_position_line(Some("source returned 0 rows")),
1208 None
1209 );
1210 assert_eq!(incremental_position_line(None), None);
1211 }
1212
1213 #[test]
1214 fn render_surfaces_window_position_on_zero_row_time_window() {
1215 let mut s = RunSummary::new(&plan_for("events"));
1221 s.status = "skipped".into();
1222 s.mode = "timewindow".into();
1223 s.skip_reason = Some("source returned 0 rows".into());
1224
1225 let block = s.render();
1226 let window_line = block
1227 .lines()
1228 .find(|l| l.trim_start().starts_with("window:"))
1229 .unwrap_or_else(|| panic!("expected a window: line in block: {block}"));
1230 assert!(
1231 window_line.contains("matched no rows"),
1232 "window line reports the empty window: {window_line:?}"
1233 );
1234 assert!(
1235 window_line.contains("time_column") && window_line.contains("days_window"),
1236 "window line points at the window config to check: {window_line:?}"
1237 );
1238 assert!(
1240 !block.lines().any(|l| l.trim_start().starts_with("cursor:")),
1241 "no cursor line for a non-cursor strategy: {block}"
1242 );
1243 }
1244
1245 #[test]
1246 fn time_window_skip_line_only_for_skipped_time_window() {
1247 assert_eq!(
1249 time_window_skip_line("timewindow", Some("source returned 0 rows")),
1250 Some("rolling time window matched no rows — check `time_column`/`days_window`".into())
1251 );
1252 assert_eq!(
1254 time_window_skip_line("incremental", Some("source returned 0 rows")),
1255 None
1256 );
1257 assert_eq!(
1258 time_window_skip_line("full", Some("source returned 0 rows")),
1259 None
1260 );
1261 assert_eq!(time_window_skip_line("timewindow", None), None);
1263 }
1264}