1use super::ipc::{self, ChildEvent};
18use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
19use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
20use crate::manifest::ManifestPart;
21use crate::plan::ResolvedRunPlan;
22
23fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
29 PlanSnapshot {
30 export_name: plan.export_name.clone(),
31 base_query: plan.base_query.clone(),
32 strategy: plan.strategy.mode_label().to_string(),
33 format: plan.format.label().to_string(),
34 compression: plan.compression.label().to_string(),
35 destination_type: plan.destination.destination_type.label().to_string(),
36 tuning_profile: plan.tuning_profile_label.clone(),
37 batch_size: plan.tuning.batch_size,
38 validate: plan.validate,
39 reconcile: plan.reconcile,
40 resume: plan.resume,
41 }
42}
43
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct ApplyContext {
53 pub plan_id: String,
55 pub forced: bool,
57 pub force_bypassed: Vec<String>,
62}
63
64#[derive(Debug, Clone)]
70pub struct RunSummary {
71 pub run_id: String,
72 pub export_name: String,
73 pub status: String,
74 pub total_rows: i64,
75 pub files_produced: usize,
76 pub bytes_written: u64,
77 pub files_committed: usize,
80 pub duration_ms: i64,
81 pub peak_rss_mb: i64,
82 pub retries: u32,
83 pub validated: Option<bool>,
84 pub schema_changed: Option<bool>,
85 pub quality_passed: Option<bool>,
86 pub error_message: Option<String>,
87 pub tuning_profile: String,
89 pub batch_size: usize,
91 pub batch_size_memory_mb: Option<usize>,
93 pub format: String,
94 pub mode: String,
95 pub compression: String,
96 pub pg_temp_bytes_delta: Option<i64>,
103 pub skip_reason: Option<String>,
109 pub source_count: Option<i64>,
111 pub reconciled: Option<bool>,
113 pub manifest_parts: Vec<ManifestPart>,
118 pub schema_fingerprint: Option<String>,
132 pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
138 pub apply_context: Option<ApplyContext>,
142 pub journal: RunJournal,
144}
145
146impl RunSummary {
147 pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
148 let run_id = format!(
149 "{}_{}",
150 plan.export_name,
151 chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
152 );
153 let mut journal = RunJournal::new(&run_id, &plan.export_name);
154 journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
155
156 ipc::emit_event(&ChildEvent::Started {
157 export_name: plan.export_name.clone(),
158 run_id: run_id.clone(),
159 mode: plan.strategy.mode_label().to_string(),
160 tuning_profile: plan.tuning_profile_label.clone(),
161 batch_size: plan.tuning.batch_size,
162 });
163
164 Self {
165 run_id,
166 export_name: plan.export_name.clone(),
167 status: "running".into(),
168 total_rows: 0,
169 files_produced: 0,
170 bytes_written: 0,
171 files_committed: 0,
172 duration_ms: 0,
173 peak_rss_mb: 0,
174 retries: 0,
175 validated: None,
176 schema_changed: None,
177 quality_passed: None,
178 error_message: None,
179 tuning_profile: plan.tuning_profile_label.clone(),
180 batch_size: plan.tuning.batch_size,
181 batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
182 format: plan.format.label().to_string(),
183 mode: plan.strategy.mode_label().to_string(),
184 compression: plan.compression.label().to_string(),
185 pg_temp_bytes_delta: None,
186 skip_reason: None,
187 source_count: None,
188 reconciled: None,
189 manifest_parts: Vec::new(),
190 schema_fingerprint: None,
191 manifest_verification: None,
192 apply_context: None,
193 journal,
194 }
195 }
196
197 #[doc(hidden)]
218 #[allow(dead_code)]
219 pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
220 let run_id = run_id.into();
221 let export_name = export_name.into();
222 let journal = RunJournal::new(&run_id, &export_name);
223 Self {
224 run_id,
225 export_name,
226 status: "running".into(),
227 total_rows: 0,
228 files_produced: 0,
229 bytes_written: 0,
230 files_committed: 0,
231 duration_ms: 0,
232 peak_rss_mb: 0,
233 retries: 0,
234 validated: None,
235 schema_changed: None,
236 quality_passed: None,
237 error_message: None,
238 tuning_profile: "balanced".into(),
239 batch_size: 1000,
240 batch_size_memory_mb: None,
241 format: "parquet".into(),
242 mode: "snapshot".into(),
243 compression: "zstd".into(),
244 pg_temp_bytes_delta: None,
245 skip_reason: None,
246 source_count: None,
247 reconciled: None,
248 manifest_parts: Vec::new(),
249 schema_fingerprint: None,
250 manifest_verification: None,
251 apply_context: None,
252 journal,
253 }
254 }
255
256 #[doc(hidden)]
263 #[allow(dead_code)]
264 pub fn with_status(mut self, status: impl Into<String>) -> Self {
265 let s = status.into();
266 if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
267 self.journal.record(RunEvent::RunCompleted {
268 status: s.clone(),
269 error_message: self.error_message.clone(),
270 duration_ms: self.duration_ms,
271 });
272 }
273 self.status = s;
274 self
275 }
276
277 #[doc(hidden)]
281 #[allow(dead_code)]
282 pub fn with_files_committed(mut self, n: usize) -> Self {
283 self.files_committed = n;
284 self
285 }
286
287 #[doc(hidden)]
291 #[allow(dead_code)]
292 pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
293 self.total_rows = parts.iter().map(|p| p.rows).sum();
294 self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
295 self.files_produced = parts.len();
296 self.files_committed = parts.len();
297 self.manifest_parts = parts;
298 self
299 }
300
301 #[doc(hidden)]
305 #[allow(dead_code)]
306 pub fn with_error(mut self, msg: impl Into<String>) -> Self {
307 self.error_message = Some(msg.into());
308 self
309 }
310
311 #[doc(hidden)]
316 #[allow(dead_code)]
317 pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
318 self.journal.record(RunEvent::PlanResolved(snap));
319 self
320 }
321
322 pub(super) fn print(&self) {
323 if ipc::capturing_events() {
327 ipc::emit_event(&ChildEvent::Finished {
328 export_name: self.export_name.clone(),
329 run_id: self.run_id.clone(),
330 status: self.status.clone(),
331 total_rows: self.total_rows,
332 files_produced: self.files_produced as u64,
333 bytes_written: self.bytes_written,
334 duration_ms: self.duration_ms,
335 peak_rss_mb: self.peak_rss_mb,
336 error_message: self.error_message.clone(),
337 });
338 return;
339 }
340
341 self.print_stderr_block();
342 }
343
344 pub(super) fn print_stderr_block(&self) {
349 let block = if multi_export_mode() {
350 self.render_compact()
351 } else {
352 self.render().trim_end_matches('\n').to_string()
358 };
359
360 use std::io::Write;
361 let mut buf = block;
362 buf.push('\n');
363 let stderr = std::io::stderr();
364 let mut handle = stderr.lock();
365 let _ = handle.write_all(buf.as_bytes());
366 let _ = handle.flush();
367 }
368
369 fn render_compact(&self) -> String {
374 const NAME_COL: usize = 22;
375 const MODE_COL: usize = 8;
376 let icon = match self.status.as_str() {
377 "success" => "✓",
378 "failed" => "✗",
379 _ => "•",
380 };
381 let body = if self.status == "failed" {
382 let err = self
383 .error_message
384 .as_deref()
385 .unwrap_or("(no error message recorded)");
386 let (cause, _) = strip_chunked_recovery_hint(err);
387 compact_error(cause)
391 } else {
392 let rss = if self.peak_rss_mb > 0 {
393 format!(" RSS {} MB", fmt_thousands(self.peak_rss_mb))
394 } else {
395 String::new()
396 };
397 format!(
398 "{} rows {} files {} {}{}",
399 fmt_thousands(self.total_rows),
400 fmt_thousands(self.files_produced as i64),
401 format_bytes(self.bytes_written),
402 fmt_duration_ms(self.duration_ms),
403 rss
404 )
405 };
406 format!(
407 "{} {:<name$} {:<mode$} {}",
408 icon,
409 self.export_name,
410 self.mode,
411 body,
412 name = NAME_COL,
413 mode = MODE_COL,
414 )
415 }
416
417 fn render(&self) -> String {
420 let mut rows: Vec<(&'static str, String)> = Vec::with_capacity(16);
425 rows.push(("run_id", self.run_id.clone()));
426 let status_value = match (&self.status, &self.skip_reason) {
427 (s, Some(reason)) if s == "skipped" => format!("{s} ({reason})"),
428 (s, _) => s.clone(),
429 };
430 rows.push(("status", status_value));
431
432 let tuning_value = match self.batch_size_memory_mb {
433 Some(mem) => format!(
434 "profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
435 self.tuning_profile,
436 fmt_thousands(self.batch_size as i64),
437 mem
438 ),
439 None => format!(
440 "profile={}, batch_size={}",
441 self.tuning_profile,
442 fmt_thousands(self.batch_size as i64)
443 ),
444 };
445 rows.push(("tuning", tuning_value));
446
447 rows.push(("rows", fmt_thousands(self.total_rows)));
448 rows.push(("files", fmt_thousands(self.files_produced as i64)));
449 if self.bytes_written > 0 {
450 rows.push(("bytes", format_bytes(self.bytes_written)));
451 }
452 rows.push(("duration", fmt_duration_ms(self.duration_ms)));
453
454 if self.peak_rss_mb > 0 {
455 rows.push((
456 "peak RSS",
457 format!(
458 "{} MB (sampled during run)",
459 fmt_thousands(self.peak_rss_mb)
460 ),
461 ));
462 }
463 if let Some(temp) = self.pg_temp_bytes_delta {
464 if temp > 0 {
468 let temp_mb = temp as f64 / (1024.0 * 1024.0);
469 let label = if temp > 100 * 1024 * 1024 {
470 format!(
471 "{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
472 temp_mb
473 )
474 } else {
475 format!("{:.1} MB", temp_mb)
476 };
477 rows.push(("pg temp spill", label));
478 }
479 }
480 if self.format == "parquet" && self.compression != "zstd" {
481 rows.push(("compression", self.compression.clone()));
482 }
483 if self.retries > 0 {
484 rows.push(("retries", self.retries.to_string()));
485 }
486 if let Some(v) = self.validated {
487 rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
488 }
489 if let Some(sc) = self.schema_changed {
490 rows.push((
491 "schema",
492 if sc {
493 "CHANGED".into()
494 } else {
495 "unchanged".into()
496 },
497 ));
498 }
499 if let Some(q) = self.quality_passed {
500 rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
501 }
502 if let Some(reconciled) = self.reconciled {
503 let src = self
504 .source_count
505 .map(fmt_thousands)
506 .unwrap_or_else(|| "?".into());
507 let exported = fmt_thousands(self.total_rows);
508 let value = if reconciled {
509 format!("MATCH ({exported}/{src})")
510 } else {
511 format!("MISMATCH (exported {exported} vs source {src})")
512 };
513 rows.push(("reconcile", value));
514 }
515 if let Some(err) = &self.error_message {
516 rows.push(("error", compact_error(err)));
522 }
523
524 format_block(&self.export_name, &rows)
525 }
526}
527
528fn compact_error(raw: &str) -> String {
540 const MAX_CHARS: usize = 240;
541 if let Some(summary) = summarize_parallel_chunk_errors(raw) {
542 return clamp_chars(&summary, MAX_CHARS);
543 }
544 let collapsed: String = raw
545 .lines()
546 .map(str::trim_end)
547 .filter(|s| !s.is_empty())
548 .collect::<Vec<_>>()
549 .join("; ");
550 clamp_chars(&collapsed, MAX_CHARS)
551}
552
553fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
554 let header_pos = raw.find("parallel checkpoint worker errors:")?;
555 let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
556 let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
557
558 let chunk_lines: Vec<&str> = tail
559 .lines()
560 .map(str::trim)
561 .filter(|l| l.starts_with("chunk "))
562 .collect();
563 if chunk_lines.is_empty() {
564 return None;
565 }
566 let first_chunk_full = chunk_lines[0];
567 let first_chunk_short = clamp_chars(first_chunk_full, 140);
569 let prefix = if prefix.is_empty() {
570 String::new()
571 } else {
572 format!("{}: ", prefix)
573 };
574 Some(format!(
575 "{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
576 prefix,
577 chunk_lines.len(),
578 first_chunk_short
579 ))
580}
581
582fn clamp_chars(s: &str, max_chars: usize) -> String {
583 if max_chars == 0 {
584 return String::new();
585 }
586 if s.chars().count() <= max_chars {
587 return s.to_string();
588 }
589 let keep = max_chars.saturating_sub(1);
590 let mut out: String = s.chars().take(keep).collect();
591 out.push('…');
592 out
593}
594
595fn format_block(name: &str, rows: &[(&str, String)]) -> String {
598 const HEADER_WIDTH: usize = 60;
599 let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
600
601 let prefix = format!("── {} ", name);
602 let prefix_chars = prefix.chars().count();
603 let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
604 let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
605 out.push('\n');
606 out.push_str(&prefix);
607 for _ in 0..dashes {
608 out.push('─');
609 }
610 out.push('\n');
611 for (label, value) in rows {
612 out.push_str(&format!(
615 " {:<width$} {}\n",
616 format!("{label}:"),
617 value,
618 width = label_w + 1
619 ));
620 }
621 out
622}
623
624fn fmt_duration_ms(ms: i64) -> String {
625 if ms < 1000 {
626 return format!("{}ms", ms);
627 }
628 let total_secs = ms / 1000;
629 let h = total_secs / 3600;
630 let m = (total_secs % 3600) / 60;
631 let s_frac = (ms % 60_000) as f64 / 1000.0;
632 if h > 0 {
633 format!("{}h {:02}m {:04.1}s", h, m, s_frac)
634 } else if m > 0 {
635 format!("{}m {:04.1}s", m, s_frac)
636 } else {
637 format!("{:.1}s", ms as f64 / 1000.0)
638 }
639}
640
641fn fmt_thousands(n: i64) -> String {
645 let abs = n.unsigned_abs();
646 let s = abs.to_string();
647 let bytes = s.as_bytes();
648 let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
649 if n < 0 {
650 out.push('-');
651 }
652 for (i, b) in bytes.iter().enumerate() {
653 let from_end = bytes.len() - i;
654 if i > 0 && from_end.is_multiple_of(3) {
655 out.push(',');
656 }
657 out.push(*b as char);
658 }
659 out
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665
666 #[test]
667 fn fmt_thousands_handles_small_and_large() {
668 assert_eq!(fmt_thousands(0), "0");
669 assert_eq!(fmt_thousands(7), "7");
670 assert_eq!(fmt_thousands(999), "999");
671 assert_eq!(fmt_thousands(1_000), "1,000");
672 assert_eq!(fmt_thousands(1_000_908), "1,000,908");
673 assert_eq!(fmt_thousands(39_990_376), "39,990,376");
674 assert_eq!(fmt_thousands(-1_234), "-1,234");
675 assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
676 }
677
678 #[test]
679 fn fmt_duration_picks_unit() {
680 assert_eq!(fmt_duration_ms(0), "0ms");
681 assert_eq!(fmt_duration_ms(800), "800ms");
682 assert_eq!(fmt_duration_ms(1_500), "1.5s");
683 assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
684 assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
685 }
686
687 #[test]
688 fn format_block_pads_labels_uniformly() {
689 let rows = vec![
690 ("run_id", "abc".to_string()),
691 ("rows", "42".to_string()),
692 ("compression", "zstd".to_string()),
693 ];
694 let out = format_block("orders", &rows);
695
696 let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
698 assert_eq!(lines.len(), 3);
699 let value_starts: Vec<usize> = lines
700 .iter()
701 .map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
702 .collect();
703 let value_col = lines[0].rfind("abc").unwrap();
707 assert_eq!(lines[1].rfind("42").unwrap(), value_col);
708 assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
709 let _ = value_starts;
711 }
712
713 #[test]
714 fn format_block_header_has_consistent_width() {
715 let block_a = format_block("a", &[("rows", "1".into())]);
716 let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
717 let header_a = block_a.lines().nth(1).unwrap();
718 let header_b = block_b.lines().nth(1).unwrap();
719 assert_eq!(
720 header_a.chars().count(),
721 header_b.chars().count(),
722 "headers must be the same width regardless of name length: {:?} vs {:?}",
723 header_a,
724 header_b
725 );
726 }
727
728 #[test]
729 fn render_produces_a_single_string_with_trailing_newline() {
730 use crate::plan::{
731 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
732 MetaColumns, ResolvedRunPlan,
733 };
734 use crate::tuning::SourceTuning;
735 let plan = ResolvedRunPlan {
736 export_name: "orders".into(),
737 base_query: "SELECT 1".into(),
738 strategy: ExtractionStrategy::Snapshot,
739 format: FormatType::Parquet,
740 compression: CompressionType::default(),
741 compression_level: None,
742 max_file_size_bytes: None,
743 skip_empty: false,
744 meta_columns: MetaColumns::default(),
745 destination: DestinationConfig {
746 destination_type: DestinationType::Local,
747 path: Some("./out".into()),
748 ..Default::default()
749 },
750 quality: None,
751 tuning: SourceTuning::from_config(None),
752 tuning_profile_label: "balanced (default)".into(),
753 validate: false,
754 reconcile: false,
755 resume: false,
756 source: crate::config::SourceConfig {
757 source_type: crate::config::SourceType::Postgres,
758 url: Some("postgresql://localhost/test".into()),
759 url_env: None,
760 url_file: None,
761 host: None,
762 port: None,
763 user: None,
764 password: None,
765 password_env: None,
766 database: None,
767 environment: None,
768 tuning: None,
769 tls: None,
770 },
771 column_overrides: Default::default(),
772 schema_drift_policy: Default::default(),
773 shape_drift_warn_factor: 2.0,
774 parquet: None,
775 };
776 let mut s = RunSummary::new(&plan);
777 s.status = "success".into();
778 s.total_rows = 1_000_908;
779 s.files_produced = 11;
780 s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
781 s.duration_ms = 68_400;
782 s.peak_rss_mb = 884;
783
784 let block = s.render();
785 assert!(
786 block.starts_with('\n'),
787 "block should start with a blank line"
788 );
789 assert!(block.ends_with('\n'), "block should end with a newline");
790 assert!(block.contains("── orders "));
791 assert!(
792 block.contains("1,000,908"),
793 "rows should be formatted with thousands separator: {}",
794 block
795 );
796 assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
797 assert!(!block.contains('\r'));
800
801 let line = s.render_compact();
803 assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
804 assert!(line.contains("orders"), "export name present: {:?}", line);
805 assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
806 assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
807 assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
808 assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
809 assert!(!line.contains('\n'), "single line: {:?}", line);
810 }
811
812 #[test]
813 fn compact_error_summarises_parallel_chunk_errors() {
814 let raw = "export 'page_views': parallel checkpoint worker errors:\n\
815 chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
816 chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
817 let out = compact_error(raw);
818 assert!(
819 out.contains("2 chunk(s)"),
820 "should report number of failed chunks: {:?}",
821 out
822 );
823 assert!(
824 out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
825 "should keep export prefix and use compact phrasing: {:?}",
826 out
827 );
828 assert!(
829 out.contains("chunk 4:"),
830 "should include the first chunk as an example: {:?}",
831 out
832 );
833 assert!(!out.contains('\n'), "single line output: {:?}", out);
834 assert!(
835 out.chars().count() <= 240,
836 "must be clamped to <=240 chars, got {}: {:?}",
837 out.chars().count(),
838 out
839 );
840 }
841
842 #[test]
843 fn compact_error_collapses_generic_multiline() {
844 let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
845 let out = compact_error(raw);
846 assert_eq!(
847 out, "first line of trouble; second line with detail; third line",
848 "newlines should collapse to '; ' and blanks dropped"
849 );
850 }
851
852 #[test]
853 fn compact_error_clamps_excessively_long_lines() {
854 let raw = "x".repeat(1_000);
855 let out = compact_error(&raw);
856 assert_eq!(out.chars().count(), 240);
857 assert!(out.ends_with('…'));
858 }
859
860 #[test]
861 fn render_compact_strips_chunked_recovery_hint_for_failed() {
862 use crate::plan::{
863 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
864 MetaColumns, ResolvedRunPlan,
865 };
866 use crate::tuning::SourceTuning;
867 let plan = ResolvedRunPlan {
868 export_name: "events".into(),
869 base_query: "SELECT 1".into(),
870 strategy: ExtractionStrategy::Snapshot,
871 format: FormatType::Parquet,
872 compression: CompressionType::default(),
873 compression_level: None,
874 max_file_size_bytes: None,
875 skip_empty: false,
876 meta_columns: MetaColumns::default(),
877 destination: DestinationConfig {
878 destination_type: DestinationType::Local,
879 path: Some("./out".into()),
880 ..Default::default()
881 },
882 quality: None,
883 tuning: SourceTuning::from_config(None),
884 tuning_profile_label: "balanced (default)".into(),
885 validate: false,
886 reconcile: false,
887 resume: false,
888 source: crate::config::SourceConfig {
889 source_type: crate::config::SourceType::Postgres,
890 url: Some("postgresql://localhost/test".into()),
891 url_env: None,
892 url_file: None,
893 host: None,
894 port: None,
895 user: None,
896 password: None,
897 password_env: None,
898 database: None,
899 environment: None,
900 tuning: None,
901 tls: None,
902 },
903 column_overrides: Default::default(),
904 schema_drift_policy: Default::default(),
905 shape_drift_warn_factor: 2.0,
906 parquet: None,
907 };
908 let mut s = RunSummary::new(&plan);
909 s.status = "failed".into();
910 s.error_message = Some(
911 "export 'events': --resume but no in-progress chunk checkpoint; \
912 run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
913 .to_string(),
914 );
915
916 let line = s.render_compact();
917 assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
918 assert!(line.contains("events"), "name present: {:?}", line);
919 assert!(
920 line.contains("--resume but no in-progress chunk checkpoint"),
921 "cause kept: {:?}",
922 line
923 );
924 assert!(
925 !line.contains("rivet state reset-chunks"),
926 "recovery hint should be stripped from per-export line: {:?}",
927 line
928 );
929 assert!(!line.contains('\n'), "single line: {:?}", line);
930 }
931}