1use super::ipc::{self, ChildEvent};
18use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
19use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
20use crate::manifest::ManifestPart;
21use crate::plan::ResolvedRunPlan;
22
23fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
29 PlanSnapshot {
30 export_name: plan.export_name.clone(),
31 base_query: plan.base_query.clone(),
32 strategy: plan.strategy.mode_label().to_string(),
33 format: plan.format.label().to_string(),
34 compression: plan.compression.label().to_string(),
35 destination_type: plan.destination.destination_type.label().to_string(),
36 tuning_profile: plan.tuning_profile_label.clone(),
37 batch_size: plan.tuning.batch_size,
38 validate: plan.validate,
39 reconcile: plan.reconcile,
40 resume: plan.resume,
41 }
42}
43
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct ApplyContext {
53 pub plan_id: String,
55 pub forced: bool,
57 pub force_bypassed: Vec<String>,
62}
63
64#[derive(Debug, Clone)]
70pub struct RunSummary {
71 pub run_id: String,
72 pub export_name: String,
73 pub status: String,
74 pub total_rows: i64,
75 pub files_produced: usize,
76 pub bytes_written: u64,
77 pub files_committed: usize,
80 pub duration_ms: i64,
81 pub peak_rss_mb: i64,
82 pub retries: u32,
83 pub validated: Option<bool>,
84 pub schema_changed: Option<bool>,
85 pub quality_passed: Option<bool>,
86 pub error_message: Option<String>,
87 pub tuning_profile: String,
89 pub batch_size: usize,
91 pub batch_size_memory_mb: Option<usize>,
93 pub format: String,
94 pub mode: String,
95 pub compression: String,
96 pub pg_temp_bytes_delta: Option<i64>,
103 pub source_count: Option<i64>,
105 pub reconciled: Option<bool>,
107 pub manifest_parts: Vec<ManifestPart>,
112 pub schema_fingerprint: Option<String>,
126 pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
132 pub apply_context: Option<ApplyContext>,
136 pub journal: RunJournal,
138}
139
140impl RunSummary {
141 pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
142 let run_id = format!(
143 "{}_{}",
144 plan.export_name,
145 chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
146 );
147 let mut journal = RunJournal::new(&run_id, &plan.export_name);
148 journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
149
150 ipc::emit_event(&ChildEvent::Started {
151 export_name: plan.export_name.clone(),
152 run_id: run_id.clone(),
153 mode: plan.strategy.mode_label().to_string(),
154 tuning_profile: plan.tuning_profile_label.clone(),
155 batch_size: plan.tuning.batch_size,
156 });
157
158 Self {
159 run_id,
160 export_name: plan.export_name.clone(),
161 status: "running".into(),
162 total_rows: 0,
163 files_produced: 0,
164 bytes_written: 0,
165 files_committed: 0,
166 duration_ms: 0,
167 peak_rss_mb: 0,
168 retries: 0,
169 validated: None,
170 schema_changed: None,
171 quality_passed: None,
172 error_message: None,
173 tuning_profile: plan.tuning_profile_label.clone(),
174 batch_size: plan.tuning.batch_size,
175 batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
176 format: plan.format.label().to_string(),
177 mode: plan.strategy.mode_label().to_string(),
178 compression: plan.compression.label().to_string(),
179 pg_temp_bytes_delta: None,
180 source_count: None,
181 reconciled: None,
182 manifest_parts: Vec::new(),
183 schema_fingerprint: None,
184 manifest_verification: None,
185 apply_context: None,
186 journal,
187 }
188 }
189
190 #[doc(hidden)]
211 #[allow(dead_code)]
212 pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
213 let run_id = run_id.into();
214 let export_name = export_name.into();
215 let journal = RunJournal::new(&run_id, &export_name);
216 Self {
217 run_id,
218 export_name,
219 status: "running".into(),
220 total_rows: 0,
221 files_produced: 0,
222 bytes_written: 0,
223 files_committed: 0,
224 duration_ms: 0,
225 peak_rss_mb: 0,
226 retries: 0,
227 validated: None,
228 schema_changed: None,
229 quality_passed: None,
230 error_message: None,
231 tuning_profile: "balanced".into(),
232 batch_size: 1000,
233 batch_size_memory_mb: None,
234 format: "parquet".into(),
235 mode: "snapshot".into(),
236 compression: "zstd".into(),
237 pg_temp_bytes_delta: None,
238 source_count: None,
239 reconciled: None,
240 manifest_parts: Vec::new(),
241 schema_fingerprint: None,
242 manifest_verification: None,
243 apply_context: None,
244 journal,
245 }
246 }
247
248 #[doc(hidden)]
255 #[allow(dead_code)]
256 pub fn with_status(mut self, status: impl Into<String>) -> Self {
257 let s = status.into();
258 if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
259 self.journal.record(RunEvent::RunCompleted {
260 status: s.clone(),
261 error_message: self.error_message.clone(),
262 duration_ms: self.duration_ms,
263 });
264 }
265 self.status = s;
266 self
267 }
268
269 #[doc(hidden)]
273 #[allow(dead_code)]
274 pub fn with_files_committed(mut self, n: usize) -> Self {
275 self.files_committed = n;
276 self
277 }
278
279 #[doc(hidden)]
283 #[allow(dead_code)]
284 pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
285 self.total_rows = parts.iter().map(|p| p.rows).sum();
286 self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
287 self.files_produced = parts.len();
288 self.files_committed = parts.len();
289 self.manifest_parts = parts;
290 self
291 }
292
293 #[doc(hidden)]
297 #[allow(dead_code)]
298 pub fn with_error(mut self, msg: impl Into<String>) -> Self {
299 self.error_message = Some(msg.into());
300 self
301 }
302
303 #[doc(hidden)]
308 #[allow(dead_code)]
309 pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
310 self.journal.record(RunEvent::PlanResolved(snap));
311 self
312 }
313
314 pub(super) fn print(&self) {
315 if ipc::capturing_events() {
319 ipc::emit_event(&ChildEvent::Finished {
320 export_name: self.export_name.clone(),
321 run_id: self.run_id.clone(),
322 status: self.status.clone(),
323 total_rows: self.total_rows,
324 files_produced: self.files_produced as u64,
325 bytes_written: self.bytes_written,
326 duration_ms: self.duration_ms,
327 peak_rss_mb: self.peak_rss_mb,
328 error_message: self.error_message.clone(),
329 });
330 return;
331 }
332
333 let block = if multi_export_mode() {
338 self.render_compact()
339 } else {
340 self.render().trim_end_matches('\n').to_string()
346 };
347
348 use std::io::Write;
349 let mut buf = block;
350 buf.push('\n');
351 let stderr = std::io::stderr();
352 let mut handle = stderr.lock();
353 let _ = handle.write_all(buf.as_bytes());
354 let _ = handle.flush();
355 }
356
357 fn render_compact(&self) -> String {
362 const NAME_COL: usize = 22;
363 const MODE_COL: usize = 8;
364 let icon = match self.status.as_str() {
365 "success" => "✓",
366 "failed" => "✗",
367 _ => "•",
368 };
369 let body = if self.status == "failed" {
370 let err = self
371 .error_message
372 .as_deref()
373 .unwrap_or("(no error message recorded)");
374 let (cause, _) = strip_chunked_recovery_hint(err);
375 compact_error(cause)
379 } else {
380 let rss = if self.peak_rss_mb > 0 {
381 format!(" RSS {} MB", fmt_thousands(self.peak_rss_mb))
382 } else {
383 String::new()
384 };
385 format!(
386 "{} rows {} files {} {}{}",
387 fmt_thousands(self.total_rows),
388 fmt_thousands(self.files_produced as i64),
389 format_bytes(self.bytes_written),
390 fmt_duration_ms(self.duration_ms),
391 rss
392 )
393 };
394 format!(
395 "{} {:<name$} {:<mode$} {}",
396 icon,
397 self.export_name,
398 self.mode,
399 body,
400 name = NAME_COL,
401 mode = MODE_COL,
402 )
403 }
404
405 fn render(&self) -> String {
408 let mut rows: Vec<(&'static str, String)> = Vec::with_capacity(16);
413 rows.push(("run_id", self.run_id.clone()));
414 rows.push(("status", self.status.clone()));
415
416 let tuning_value = match self.batch_size_memory_mb {
417 Some(mem) => format!(
418 "profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
419 self.tuning_profile,
420 fmt_thousands(self.batch_size as i64),
421 mem
422 ),
423 None => format!(
424 "profile={}, batch_size={}",
425 self.tuning_profile,
426 fmt_thousands(self.batch_size as i64)
427 ),
428 };
429 rows.push(("tuning", tuning_value));
430
431 rows.push(("rows", fmt_thousands(self.total_rows)));
432 rows.push(("files", fmt_thousands(self.files_produced as i64)));
433 if self.bytes_written > 0 {
434 rows.push(("bytes", format_bytes(self.bytes_written)));
435 }
436 rows.push(("duration", fmt_duration_ms(self.duration_ms)));
437
438 if self.peak_rss_mb > 0 {
439 rows.push((
440 "peak RSS",
441 format!(
442 "{} MB (sampled during run)",
443 fmt_thousands(self.peak_rss_mb)
444 ),
445 ));
446 }
447 if let Some(temp) = self.pg_temp_bytes_delta {
448 if temp > 0 {
452 let temp_mb = temp as f64 / (1024.0 * 1024.0);
453 let label = if temp > 100 * 1024 * 1024 {
454 format!(
455 "{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
456 temp_mb
457 )
458 } else {
459 format!("{:.1} MB", temp_mb)
460 };
461 rows.push(("pg temp spill", label));
462 }
463 }
464 if self.format == "parquet" && self.compression != "zstd" {
465 rows.push(("compression", self.compression.clone()));
466 }
467 if self.retries > 0 {
468 rows.push(("retries", self.retries.to_string()));
469 }
470 if let Some(v) = self.validated {
471 rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
472 }
473 if let Some(sc) = self.schema_changed {
474 rows.push((
475 "schema",
476 if sc {
477 "CHANGED".into()
478 } else {
479 "unchanged".into()
480 },
481 ));
482 }
483 if let Some(q) = self.quality_passed {
484 rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
485 }
486 if let Some(reconciled) = self.reconciled {
487 let src = self
488 .source_count
489 .map(fmt_thousands)
490 .unwrap_or_else(|| "?".into());
491 let exported = fmt_thousands(self.total_rows);
492 let value = if reconciled {
493 format!("MATCH ({exported}/{src})")
494 } else {
495 format!("MISMATCH (exported {exported} vs source {src})")
496 };
497 rows.push(("reconcile", value));
498 }
499 if let Some(err) = &self.error_message {
500 rows.push(("error", compact_error(err)));
506 }
507
508 format_block(&self.export_name, &rows)
509 }
510}
511
512fn compact_error(raw: &str) -> String {
524 const MAX_CHARS: usize = 240;
525 if let Some(summary) = summarize_parallel_chunk_errors(raw) {
526 return clamp_chars(&summary, MAX_CHARS);
527 }
528 let collapsed: String = raw
529 .lines()
530 .map(str::trim_end)
531 .filter(|s| !s.is_empty())
532 .collect::<Vec<_>>()
533 .join("; ");
534 clamp_chars(&collapsed, MAX_CHARS)
535}
536
537fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
538 let header_pos = raw.find("parallel checkpoint worker errors:")?;
539 let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
540 let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
541
542 let chunk_lines: Vec<&str> = tail
543 .lines()
544 .map(str::trim)
545 .filter(|l| l.starts_with("chunk "))
546 .collect();
547 if chunk_lines.is_empty() {
548 return None;
549 }
550 let first_chunk_full = chunk_lines[0];
551 let first_chunk_short = clamp_chars(first_chunk_full, 140);
553 let prefix = if prefix.is_empty() {
554 String::new()
555 } else {
556 format!("{}: ", prefix)
557 };
558 Some(format!(
559 "{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
560 prefix,
561 chunk_lines.len(),
562 first_chunk_short
563 ))
564}
565
566fn clamp_chars(s: &str, max_chars: usize) -> String {
567 if max_chars == 0 {
568 return String::new();
569 }
570 if s.chars().count() <= max_chars {
571 return s.to_string();
572 }
573 let keep = max_chars.saturating_sub(1);
574 let mut out: String = s.chars().take(keep).collect();
575 out.push('…');
576 out
577}
578
579fn format_block(name: &str, rows: &[(&str, String)]) -> String {
582 const HEADER_WIDTH: usize = 60;
583 let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
584
585 let prefix = format!("── {} ", name);
586 let prefix_chars = prefix.chars().count();
587 let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
588 let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
589 out.push('\n');
590 out.push_str(&prefix);
591 for _ in 0..dashes {
592 out.push('─');
593 }
594 out.push('\n');
595 for (label, value) in rows {
596 out.push_str(&format!(
599 " {:<width$} {}\n",
600 format!("{label}:"),
601 value,
602 width = label_w + 1
603 ));
604 }
605 out
606}
607
608fn fmt_duration_ms(ms: i64) -> String {
609 if ms < 1000 {
610 return format!("{}ms", ms);
611 }
612 let total_secs = ms / 1000;
613 let h = total_secs / 3600;
614 let m = (total_secs % 3600) / 60;
615 let s_frac = (ms % 60_000) as f64 / 1000.0;
616 if h > 0 {
617 format!("{}h {:02}m {:04.1}s", h, m, s_frac)
618 } else if m > 0 {
619 format!("{}m {:04.1}s", m, s_frac)
620 } else {
621 format!("{:.1}s", ms as f64 / 1000.0)
622 }
623}
624
625fn fmt_thousands(n: i64) -> String {
629 let abs = n.unsigned_abs();
630 let s = abs.to_string();
631 let bytes = s.as_bytes();
632 let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
633 if n < 0 {
634 out.push('-');
635 }
636 for (i, b) in bytes.iter().enumerate() {
637 let from_end = bytes.len() - i;
638 if i > 0 && from_end.is_multiple_of(3) {
639 out.push(',');
640 }
641 out.push(*b as char);
642 }
643 out
644}
645
646#[cfg(test)]
647mod tests {
648 use super::*;
649
650 #[test]
651 fn fmt_thousands_handles_small_and_large() {
652 assert_eq!(fmt_thousands(0), "0");
653 assert_eq!(fmt_thousands(7), "7");
654 assert_eq!(fmt_thousands(999), "999");
655 assert_eq!(fmt_thousands(1_000), "1,000");
656 assert_eq!(fmt_thousands(1_000_908), "1,000,908");
657 assert_eq!(fmt_thousands(39_990_376), "39,990,376");
658 assert_eq!(fmt_thousands(-1_234), "-1,234");
659 assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
660 }
661
662 #[test]
663 fn fmt_duration_picks_unit() {
664 assert_eq!(fmt_duration_ms(0), "0ms");
665 assert_eq!(fmt_duration_ms(800), "800ms");
666 assert_eq!(fmt_duration_ms(1_500), "1.5s");
667 assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
668 assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
669 }
670
671 #[test]
672 fn format_block_pads_labels_uniformly() {
673 let rows = vec![
674 ("run_id", "abc".to_string()),
675 ("rows", "42".to_string()),
676 ("compression", "zstd".to_string()),
677 ];
678 let out = format_block("orders", &rows);
679
680 let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
682 assert_eq!(lines.len(), 3);
683 let value_starts: Vec<usize> = lines
684 .iter()
685 .map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
686 .collect();
687 let value_col = lines[0].rfind("abc").unwrap();
691 assert_eq!(lines[1].rfind("42").unwrap(), value_col);
692 assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
693 let _ = value_starts;
695 }
696
697 #[test]
698 fn format_block_header_has_consistent_width() {
699 let block_a = format_block("a", &[("rows", "1".into())]);
700 let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
701 let header_a = block_a.lines().nth(1).unwrap();
702 let header_b = block_b.lines().nth(1).unwrap();
703 assert_eq!(
704 header_a.chars().count(),
705 header_b.chars().count(),
706 "headers must be the same width regardless of name length: {:?} vs {:?}",
707 header_a,
708 header_b
709 );
710 }
711
712 #[test]
713 fn render_produces_a_single_string_with_trailing_newline() {
714 use crate::plan::{
715 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
716 MetaColumns, ResolvedRunPlan,
717 };
718 use crate::tuning::SourceTuning;
719 let plan = ResolvedRunPlan {
720 export_name: "orders".into(),
721 base_query: "SELECT 1".into(),
722 strategy: ExtractionStrategy::Snapshot,
723 format: FormatType::Parquet,
724 compression: CompressionType::default(),
725 compression_level: None,
726 max_file_size_bytes: None,
727 skip_empty: false,
728 meta_columns: MetaColumns::default(),
729 destination: DestinationConfig {
730 destination_type: DestinationType::Local,
731 path: Some("./out".into()),
732 ..Default::default()
733 },
734 quality: None,
735 tuning: SourceTuning::from_config(None),
736 tuning_profile_label: "balanced (default)".into(),
737 validate: false,
738 reconcile: false,
739 resume: false,
740 source: crate::config::SourceConfig {
741 source_type: crate::config::SourceType::Postgres,
742 url: Some("postgresql://localhost/test".into()),
743 url_env: None,
744 url_file: None,
745 host: None,
746 port: None,
747 user: None,
748 password: None,
749 password_env: None,
750 database: None,
751 environment: None,
752 tuning: None,
753 tls: None,
754 },
755 column_overrides: Default::default(),
756 schema_drift_policy: Default::default(),
757 shape_drift_warn_factor: 2.0,
758 parquet: None,
759 };
760 let mut s = RunSummary::new(&plan);
761 s.status = "success".into();
762 s.total_rows = 1_000_908;
763 s.files_produced = 11;
764 s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
765 s.duration_ms = 68_400;
766 s.peak_rss_mb = 884;
767
768 let block = s.render();
769 assert!(
770 block.starts_with('\n'),
771 "block should start with a blank line"
772 );
773 assert!(block.ends_with('\n'), "block should end with a newline");
774 assert!(block.contains("── orders "));
775 assert!(
776 block.contains("1,000,908"),
777 "rows should be formatted with thousands separator: {}",
778 block
779 );
780 assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
781 assert!(!block.contains('\r'));
784
785 let line = s.render_compact();
787 assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
788 assert!(line.contains("orders"), "export name present: {:?}", line);
789 assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
790 assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
791 assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
792 assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
793 assert!(!line.contains('\n'), "single line: {:?}", line);
794 }
795
796 #[test]
797 fn compact_error_summarises_parallel_chunk_errors() {
798 let raw = "export 'page_views': parallel checkpoint worker errors:\n\
799 chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
800 chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
801 let out = compact_error(raw);
802 assert!(
803 out.contains("2 chunk(s)"),
804 "should report number of failed chunks: {:?}",
805 out
806 );
807 assert!(
808 out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
809 "should keep export prefix and use compact phrasing: {:?}",
810 out
811 );
812 assert!(
813 out.contains("chunk 4:"),
814 "should include the first chunk as an example: {:?}",
815 out
816 );
817 assert!(!out.contains('\n'), "single line output: {:?}", out);
818 assert!(
819 out.chars().count() <= 240,
820 "must be clamped to <=240 chars, got {}: {:?}",
821 out.chars().count(),
822 out
823 );
824 }
825
826 #[test]
827 fn compact_error_collapses_generic_multiline() {
828 let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
829 let out = compact_error(raw);
830 assert_eq!(
831 out, "first line of trouble; second line with detail; third line",
832 "newlines should collapse to '; ' and blanks dropped"
833 );
834 }
835
836 #[test]
837 fn compact_error_clamps_excessively_long_lines() {
838 let raw = "x".repeat(1_000);
839 let out = compact_error(&raw);
840 assert_eq!(out.chars().count(), 240);
841 assert!(out.ends_with('…'));
842 }
843
844 #[test]
845 fn render_compact_strips_chunked_recovery_hint_for_failed() {
846 use crate::plan::{
847 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
848 MetaColumns, ResolvedRunPlan,
849 };
850 use crate::tuning::SourceTuning;
851 let plan = ResolvedRunPlan {
852 export_name: "events".into(),
853 base_query: "SELECT 1".into(),
854 strategy: ExtractionStrategy::Snapshot,
855 format: FormatType::Parquet,
856 compression: CompressionType::default(),
857 compression_level: None,
858 max_file_size_bytes: None,
859 skip_empty: false,
860 meta_columns: MetaColumns::default(),
861 destination: DestinationConfig {
862 destination_type: DestinationType::Local,
863 path: Some("./out".into()),
864 ..Default::default()
865 },
866 quality: None,
867 tuning: SourceTuning::from_config(None),
868 tuning_profile_label: "balanced (default)".into(),
869 validate: false,
870 reconcile: false,
871 resume: false,
872 source: crate::config::SourceConfig {
873 source_type: crate::config::SourceType::Postgres,
874 url: Some("postgresql://localhost/test".into()),
875 url_env: None,
876 url_file: None,
877 host: None,
878 port: None,
879 user: None,
880 password: None,
881 password_env: None,
882 database: None,
883 environment: None,
884 tuning: None,
885 tls: None,
886 },
887 column_overrides: Default::default(),
888 schema_drift_policy: Default::default(),
889 shape_drift_warn_factor: 2.0,
890 parquet: None,
891 };
892 let mut s = RunSummary::new(&plan);
893 s.status = "failed".into();
894 s.error_message = Some(
895 "export 'events': --resume but no in-progress chunk checkpoint; \
896 run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
897 .to_string(),
898 );
899
900 let line = s.render_compact();
901 assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
902 assert!(line.contains("events"), "name present: {:?}", line);
903 assert!(
904 line.contains("--resume but no in-progress chunk checkpoint"),
905 "cause kept: {:?}",
906 line
907 );
908 assert!(
909 !line.contains("rivet state reset-chunks"),
910 "recovery hint should be stripped from per-export line: {:?}",
911 line
912 );
913 assert!(!line.contains('\n'), "single line: {:?}", line);
914 }
915}