1use super::ipc::{self, ChildEvent};
18use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
19use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
20use crate::manifest::ManifestPart;
21use crate::plan::ResolvedRunPlan;
22
23fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
29 PlanSnapshot {
30 export_name: plan.export_name.clone(),
31 base_query: plan.base_query.clone(),
32 strategy: plan.strategy.mode_label().to_string(),
33 format: plan.format.label().to_string(),
34 compression: plan.compression.label().to_string(),
35 destination_type: plan.destination.destination_type.label().to_string(),
36 tuning_profile: plan.tuning_profile_label.clone(),
37 batch_size: plan.tuning.batch_size,
38 validate: plan.validate,
39 reconcile: plan.reconcile,
40 resume: plan.resume,
41 }
42}
43
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct ApplyContext {
53 pub plan_id: String,
55 pub forced: bool,
57 pub force_bypassed: Vec<String>,
62}
63
64#[derive(Debug, Clone)]
70pub struct RunSummary {
71 pub run_id: String,
72 pub export_name: String,
73 pub status: String,
74 pub total_rows: i64,
75 pub files_produced: usize,
76 pub bytes_written: u64,
77 pub files_committed: usize,
80 pub duration_ms: i64,
81 pub peak_rss_mb: i64,
82 pub retries: u32,
83 pub validated: Option<bool>,
84 pub schema_changed: Option<bool>,
85 pub quality_passed: Option<bool>,
86 pub error_message: Option<String>,
87 pub tuning_profile: String,
89 pub batch_size: usize,
91 pub batch_size_memory_mb: Option<usize>,
93 pub format: String,
94 pub mode: String,
95 pub compression: String,
96 pub pg_temp_bytes_delta: Option<i64>,
103 pub skip_reason: Option<String>,
109 pub source_count: Option<i64>,
111 pub reconciled: Option<bool>,
113 pub manifest_parts: Vec<ManifestPart>,
118 pub schema_fingerprint: Option<String>,
132 pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
138 pub apply_context: Option<ApplyContext>,
142 pub journal: RunJournal,
144}
145
146impl RunSummary {
147 pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
148 let run_id = format!(
149 "{}_{}",
150 plan.export_name,
151 chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
152 );
153 let mut journal = RunJournal::new(&run_id, &plan.export_name);
154 journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
155
156 ipc::emit_event(&ChildEvent::Started {
157 export_name: plan.export_name.clone(),
158 run_id: run_id.clone(),
159 mode: plan.strategy.mode_label().to_string(),
160 tuning_profile: plan.tuning_profile_label.clone(),
161 batch_size: plan.tuning.batch_size,
162 });
163
164 Self {
165 run_id,
166 export_name: plan.export_name.clone(),
167 status: "running".into(),
168 total_rows: 0,
169 files_produced: 0,
170 bytes_written: 0,
171 files_committed: 0,
172 duration_ms: 0,
173 peak_rss_mb: 0,
174 retries: 0,
175 validated: None,
176 schema_changed: None,
177 quality_passed: None,
178 error_message: None,
179 tuning_profile: plan.tuning_profile_label.clone(),
180 batch_size: plan.tuning.batch_size,
181 batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
182 format: plan.format.label().to_string(),
183 mode: plan.strategy.mode_label().to_string(),
184 compression: plan.compression.label().to_string(),
185 pg_temp_bytes_delta: None,
186 skip_reason: None,
187 source_count: None,
188 reconciled: None,
189 manifest_parts: Vec::new(),
190 schema_fingerprint: None,
191 manifest_verification: None,
192 apply_context: None,
193 journal,
194 }
195 }
196
197 #[doc(hidden)]
218 #[allow(dead_code)]
219 pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
220 let run_id = run_id.into();
221 let export_name = export_name.into();
222 let journal = RunJournal::new(&run_id, &export_name);
223 Self {
224 run_id,
225 export_name,
226 status: "running".into(),
227 total_rows: 0,
228 files_produced: 0,
229 bytes_written: 0,
230 files_committed: 0,
231 duration_ms: 0,
232 peak_rss_mb: 0,
233 retries: 0,
234 validated: None,
235 schema_changed: None,
236 quality_passed: None,
237 error_message: None,
238 tuning_profile: "balanced".into(),
239 batch_size: 1000,
240 batch_size_memory_mb: None,
241 format: "parquet".into(),
242 mode: "snapshot".into(),
243 compression: "zstd".into(),
244 pg_temp_bytes_delta: None,
245 skip_reason: None,
246 source_count: None,
247 reconciled: None,
248 manifest_parts: Vec::new(),
249 schema_fingerprint: None,
250 manifest_verification: None,
251 apply_context: None,
252 journal,
253 }
254 }
255
256 #[doc(hidden)]
263 #[allow(dead_code)]
264 pub fn with_status(mut self, status: impl Into<String>) -> Self {
265 let s = status.into();
266 if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
267 self.journal.record(RunEvent::RunCompleted {
268 status: s.clone(),
269 error_message: self.error_message.clone(),
270 duration_ms: self.duration_ms,
271 });
272 }
273 self.status = s;
274 self
275 }
276
277 #[doc(hidden)]
281 #[allow(dead_code)]
282 pub fn with_files_committed(mut self, n: usize) -> Self {
283 self.files_committed = n;
284 self
285 }
286
287 #[doc(hidden)]
291 #[allow(dead_code)]
292 pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
293 self.total_rows = parts.iter().map(|p| p.rows).sum();
294 self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
295 self.files_produced = parts.len();
296 self.files_committed = parts.len();
297 self.manifest_parts = parts;
298 self
299 }
300
301 #[doc(hidden)]
305 #[allow(dead_code)]
306 pub fn with_error(mut self, msg: impl Into<String>) -> Self {
307 self.error_message = Some(msg.into());
308 self
309 }
310
311 #[doc(hidden)]
316 #[allow(dead_code)]
317 pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
318 self.journal.record(RunEvent::PlanResolved(snap));
319 self
320 }
321
322 pub(super) fn print(&self) {
323 if ipc::capturing_events() {
327 ipc::emit_event(&ChildEvent::Finished {
328 export_name: self.export_name.clone(),
329 run_id: self.run_id.clone(),
330 status: self.status.clone(),
331 total_rows: self.total_rows,
332 files_produced: self.files_produced as u64,
333 bytes_written: self.bytes_written,
334 duration_ms: self.duration_ms,
335 peak_rss_mb: self.peak_rss_mb,
336 error_message: self.error_message.clone(),
337 });
338 return;
339 }
340
341 self.print_stderr_block();
342 }
343
344 pub(super) fn print_stderr_block(&self) {
349 let block = if multi_export_mode() {
350 self.render_compact()
351 } else {
352 self.render().trim_end_matches('\n').to_string()
358 };
359
360 use std::io::Write;
361 let mut buf = block;
362 buf.push('\n');
363 let stderr = std::io::stderr();
364 let mut handle = stderr.lock();
365 let _ = handle.write_all(buf.as_bytes());
366 let _ = handle.flush();
367 }
368
369 fn render_compact(&self) -> String {
374 const NAME_COL: usize = 22;
375 const MODE_COL: usize = 8;
376 let icon = match self.status.as_str() {
377 "success" => "✓",
378 "failed" => "✗",
379 _ => "•",
380 };
381 let body = if self.status == "failed" {
382 let err = self
383 .error_message
384 .as_deref()
385 .unwrap_or("(no error message recorded)");
386 let (cause, _) = strip_chunked_recovery_hint(err);
387 compact_error(cause)
391 } else {
392 let rss = if self.peak_rss_mb > 0 {
393 format!(" RSS {} MB", fmt_thousands(self.peak_rss_mb))
394 } else {
395 String::new()
396 };
397 format!(
398 "{} rows {} files {} {}{}",
399 fmt_thousands(self.total_rows),
400 fmt_thousands(self.files_produced as i64),
401 format_bytes(self.bytes_written),
402 fmt_duration_ms(self.duration_ms),
403 rss
404 )
405 };
406 format!(
407 "{} {:<name$} {:<mode$} {}",
408 icon,
409 self.export_name,
410 self.mode,
411 body,
412 name = NAME_COL,
413 mode = MODE_COL,
414 )
415 }
416
417 fn render(&self) -> String {
420 let mut rows: Vec<(&'static str, String)> = Vec::with_capacity(16);
425 rows.push(("run_id", self.run_id.clone()));
426 let status_value = match (&self.status, &self.skip_reason) {
427 (s, Some(reason)) if s == "skipped" => format!("{s} ({reason})"),
428 (s, _) => s.clone(),
429 };
430 rows.push(("status", status_value));
431
432 let tuning_value = match self.batch_size_memory_mb {
433 Some(mem) => format!(
434 "profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
435 self.tuning_profile,
436 fmt_thousands(self.batch_size as i64),
437 mem
438 ),
439 None => format!(
440 "profile={}, batch_size={}",
441 self.tuning_profile,
442 fmt_thousands(self.batch_size as i64)
443 ),
444 };
445 rows.push(("tuning", tuning_value));
446
447 rows.push(("rows", fmt_thousands(self.total_rows)));
448 rows.push(("files", fmt_thousands(self.files_produced as i64)));
449 if self.bytes_written > 0 {
450 rows.push(("bytes", format_bytes(self.bytes_written)));
451 }
452 rows.push(("duration", fmt_duration_ms(self.duration_ms)));
453
454 if self.peak_rss_mb > 0 {
455 rows.push((
456 "peak RSS",
457 format!(
458 "{} MB (sampled during run)",
459 fmt_thousands(self.peak_rss_mb)
460 ),
461 ));
462 }
463 if let Some(temp) = self.pg_temp_bytes_delta {
464 if temp > 0 {
468 let temp_mb = temp as f64 / (1024.0 * 1024.0);
469 let label = if temp > 100 * 1024 * 1024 {
470 format!(
471 "{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
472 temp_mb
473 )
474 } else {
475 format!("{:.1} MB", temp_mb)
476 };
477 rows.push(("pg temp spill", label));
478 }
479 }
480 if self.format == "parquet" && self.compression != "zstd" {
481 rows.push(("compression", self.compression.clone()));
482 }
483 if self.retries > 0 {
484 rows.push(("retries", self.retries.to_string()));
485 }
486 if let Some(v) = self.validated {
487 rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
488 }
489 if let Some(sc) = self.schema_changed {
490 rows.push((
491 "schema",
492 if sc {
493 "CHANGED".into()
494 } else {
495 "unchanged".into()
496 },
497 ));
498 }
499 if let Some(q) = self.quality_passed {
500 rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
501 }
502 if let Some(reconciled) = self.reconciled {
503 let src = self
504 .source_count
505 .map(fmt_thousands)
506 .unwrap_or_else(|| "?".into());
507 let exported = fmt_thousands(self.total_rows);
508 let value = if reconciled {
509 format!("MATCH ({exported}/{src})")
510 } else {
511 format!("MISMATCH (exported {exported} vs source {src})")
512 };
513 rows.push(("reconcile", value));
514 }
515 if let Some(err) = &self.error_message {
516 rows.push(("error", compact_error(err)));
522 }
523
524 format_block(&self.export_name, &rows)
525 }
526
527 pub fn check_post_run_invariants(&self) -> Result<(), String> {
543 let parts_bytes: u64 = self.manifest_parts.iter().map(|p| p.size_bytes).sum();
544
545 if self.files_committed > self.manifest_parts.len() {
546 return Err(format!(
547 "summary.files_committed ({}) > manifest_parts.len() ({}) — \
548 a runner bumped files_committed without commit::record_part",
549 self.files_committed,
550 self.manifest_parts.len()
551 ));
552 }
553 if self.files_produced > self.manifest_parts.len() {
554 return Err(format!(
555 "summary.files_produced ({}) > manifest_parts.len() ({}) — \
556 a runner bumped files_produced without commit::record_part",
557 self.files_produced,
558 self.manifest_parts.len()
559 ));
560 }
561 if self.bytes_written > parts_bytes {
562 return Err(format!(
563 "summary.bytes_written ({}) > sum(manifest_parts.size_bytes) ({}) — \
564 a runner bumped bytes_written without commit::record_part",
565 self.bytes_written, parts_bytes
566 ));
567 }
568 if self.status == "success" && self.files_committed > 0 && self.manifest_parts.is_empty() {
569 return Err(format!(
570 "success run with files_committed={} has empty manifest_parts — \
571 cloud manifest (ADR-0012 M1) would ship with no part list \
572 (this is the gap parallel_checkpoint had before commit e9b0796)",
573 self.files_committed
574 ));
575 }
576 if self.status == "success" && self.total_rows > 0 && self.files_committed == 0 {
588 return Err(format!(
589 "summary.total_rows={} but files_committed=0 — rows extracted from \
590 source but no files committed (no output reached the destination)",
591 self.total_rows
592 ));
593 }
594 Ok(())
595 }
596}
597
598fn compact_error(raw: &str) -> String {
610 const MAX_CHARS: usize = 240;
611 if let Some(summary) = summarize_parallel_chunk_errors(raw) {
612 return clamp_chars(&summary, MAX_CHARS);
613 }
614 let collapsed: String = raw
615 .lines()
616 .map(str::trim_end)
617 .filter(|s| !s.is_empty())
618 .collect::<Vec<_>>()
619 .join("; ");
620 clamp_chars(&collapsed, MAX_CHARS)
621}
622
623fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
624 let header_pos = raw.find("parallel checkpoint worker errors:")?;
625 let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
626 let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
627
628 let chunk_lines: Vec<&str> = tail
629 .lines()
630 .map(str::trim)
631 .filter(|l| l.starts_with("chunk "))
632 .collect();
633 if chunk_lines.is_empty() {
634 return None;
635 }
636 let first_chunk_full = chunk_lines[0];
637 let first_chunk_short = clamp_chars(first_chunk_full, 140);
639 let prefix = if prefix.is_empty() {
640 String::new()
641 } else {
642 format!("{}: ", prefix)
643 };
644 Some(format!(
645 "{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
646 prefix,
647 chunk_lines.len(),
648 first_chunk_short
649 ))
650}
651
652fn clamp_chars(s: &str, max_chars: usize) -> String {
653 if max_chars == 0 {
654 return String::new();
655 }
656 if s.chars().count() <= max_chars {
657 return s.to_string();
658 }
659 let keep = max_chars.saturating_sub(1);
660 let mut out: String = s.chars().take(keep).collect();
661 out.push('…');
662 out
663}
664
665fn format_block(name: &str, rows: &[(&str, String)]) -> String {
668 const HEADER_WIDTH: usize = 60;
669 let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
670
671 let prefix = format!("── {} ", name);
672 let prefix_chars = prefix.chars().count();
673 let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
674 let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
675 out.push('\n');
676 out.push_str(&prefix);
677 for _ in 0..dashes {
678 out.push('─');
679 }
680 out.push('\n');
681 for (label, value) in rows {
682 out.push_str(&format!(
685 " {:<width$} {}\n",
686 format!("{label}:"),
687 value,
688 width = label_w + 1
689 ));
690 }
691 out
692}
693
694fn fmt_duration_ms(ms: i64) -> String {
695 if ms < 1000 {
696 return format!("{}ms", ms);
697 }
698 let total_secs = ms / 1000;
699 let h = total_secs / 3600;
700 let m = (total_secs % 3600) / 60;
701 let s_frac = (ms % 60_000) as f64 / 1000.0;
702 if h > 0 {
703 format!("{}h {:02}m {:04.1}s", h, m, s_frac)
704 } else if m > 0 {
705 format!("{}m {:04.1}s", m, s_frac)
706 } else {
707 format!("{:.1}s", ms as f64 / 1000.0)
708 }
709}
710
711fn fmt_thousands(n: i64) -> String {
715 let abs = n.unsigned_abs();
716 let s = abs.to_string();
717 let bytes = s.as_bytes();
718 let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
719 if n < 0 {
720 out.push('-');
721 }
722 for (i, b) in bytes.iter().enumerate() {
723 let from_end = bytes.len() - i;
724 if i > 0 && from_end.is_multiple_of(3) {
725 out.push(',');
726 }
727 out.push(*b as char);
728 }
729 out
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735
736 #[test]
737 fn fmt_thousands_handles_small_and_large() {
738 assert_eq!(fmt_thousands(0), "0");
739 assert_eq!(fmt_thousands(7), "7");
740 assert_eq!(fmt_thousands(999), "999");
741 assert_eq!(fmt_thousands(1_000), "1,000");
742 assert_eq!(fmt_thousands(1_000_908), "1,000,908");
743 assert_eq!(fmt_thousands(39_990_376), "39,990,376");
744 assert_eq!(fmt_thousands(-1_234), "-1,234");
745 assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
746 }
747
748 #[test]
749 fn fmt_duration_picks_unit() {
750 assert_eq!(fmt_duration_ms(0), "0ms");
751 assert_eq!(fmt_duration_ms(800), "800ms");
752 assert_eq!(fmt_duration_ms(1_500), "1.5s");
753 assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
754 assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
755 }
756
757 #[test]
758 fn format_block_pads_labels_uniformly() {
759 let rows = vec![
760 ("run_id", "abc".to_string()),
761 ("rows", "42".to_string()),
762 ("compression", "zstd".to_string()),
763 ];
764 let out = format_block("orders", &rows);
765
766 let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
768 assert_eq!(lines.len(), 3);
769 let value_starts: Vec<usize> = lines
770 .iter()
771 .map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
772 .collect();
773 let value_col = lines[0].rfind("abc").unwrap();
777 assert_eq!(lines[1].rfind("42").unwrap(), value_col);
778 assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
779 let _ = value_starts;
781 }
782
783 #[test]
784 fn format_block_header_has_consistent_width() {
785 let block_a = format_block("a", &[("rows", "1".into())]);
786 let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
787 let header_a = block_a.lines().nth(1).unwrap();
788 let header_b = block_b.lines().nth(1).unwrap();
789 assert_eq!(
790 header_a.chars().count(),
791 header_b.chars().count(),
792 "headers must be the same width regardless of name length: {:?} vs {:?}",
793 header_a,
794 header_b
795 );
796 }
797
798 #[test]
799 fn render_produces_a_single_string_with_trailing_newline() {
800 use crate::plan::{
801 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
802 MetaColumns, ResolvedRunPlan,
803 };
804 use crate::tuning::SourceTuning;
805 let plan = ResolvedRunPlan {
806 export_name: "orders".into(),
807 base_query: "SELECT 1".into(),
808 strategy: ExtractionStrategy::Snapshot,
809 format: FormatType::Parquet,
810 compression: CompressionType::default(),
811 compression_level: None,
812 max_file_size_bytes: None,
813 skip_empty: false,
814 meta_columns: MetaColumns::default(),
815 destination: DestinationConfig {
816 destination_type: DestinationType::Local,
817 path: Some("./out".into()),
818 ..Default::default()
819 },
820 quality: None,
821 tuning: SourceTuning::from_config(None),
822 tuning_profile_label: "balanced (default)".into(),
823 validate: false,
824 reconcile: false,
825 resume: false,
826 source: crate::config::SourceConfig {
827 source_type: crate::config::SourceType::Postgres,
828 url: Some("postgresql://localhost/test".into()),
829 url_env: None,
830 url_file: None,
831 host: None,
832 port: None,
833 user: None,
834 password: None,
835 password_env: None,
836 database: None,
837 environment: None,
838 tuning: None,
839 tls: None,
840 },
841 column_overrides: Default::default(),
842 schema_drift_policy: Default::default(),
843 shape_drift_warn_factor: 2.0,
844 parquet: None,
845 };
846 let mut s = RunSummary::new(&plan);
847 s.status = "success".into();
848 s.total_rows = 1_000_908;
849 s.files_produced = 11;
850 s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
851 s.duration_ms = 68_400;
852 s.peak_rss_mb = 884;
853
854 let block = s.render();
855 assert!(
856 block.starts_with('\n'),
857 "block should start with a blank line"
858 );
859 assert!(block.ends_with('\n'), "block should end with a newline");
860 assert!(block.contains("── orders "));
861 assert!(
862 block.contains("1,000,908"),
863 "rows should be formatted with thousands separator: {}",
864 block
865 );
866 assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
867 assert!(!block.contains('\r'));
870
871 let line = s.render_compact();
873 assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
874 assert!(line.contains("orders"), "export name present: {:?}", line);
875 assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
876 assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
877 assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
878 assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
879 assert!(!line.contains('\n'), "single line: {:?}", line);
880 }
881
882 #[test]
883 fn compact_error_summarises_parallel_chunk_errors() {
884 let raw = "export 'page_views': parallel checkpoint worker errors:\n\
885 chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
886 chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
887 let out = compact_error(raw);
888 assert!(
889 out.contains("2 chunk(s)"),
890 "should report number of failed chunks: {:?}",
891 out
892 );
893 assert!(
894 out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
895 "should keep export prefix and use compact phrasing: {:?}",
896 out
897 );
898 assert!(
899 out.contains("chunk 4:"),
900 "should include the first chunk as an example: {:?}",
901 out
902 );
903 assert!(!out.contains('\n'), "single line output: {:?}", out);
904 assert!(
905 out.chars().count() <= 240,
906 "must be clamped to <=240 chars, got {}: {:?}",
907 out.chars().count(),
908 out
909 );
910 }
911
912 #[test]
913 fn compact_error_collapses_generic_multiline() {
914 let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
915 let out = compact_error(raw);
916 assert_eq!(
917 out, "first line of trouble; second line with detail; third line",
918 "newlines should collapse to '; ' and blanks dropped"
919 );
920 }
921
922 #[test]
923 fn compact_error_clamps_excessively_long_lines() {
924 let raw = "x".repeat(1_000);
925 let out = compact_error(&raw);
926 assert_eq!(out.chars().count(), 240);
927 assert!(out.ends_with('…'));
928 }
929
930 #[test]
931 fn render_compact_strips_chunked_recovery_hint_for_failed() {
932 use crate::plan::{
933 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
934 MetaColumns, ResolvedRunPlan,
935 };
936 use crate::tuning::SourceTuning;
937 let plan = ResolvedRunPlan {
938 export_name: "events".into(),
939 base_query: "SELECT 1".into(),
940 strategy: ExtractionStrategy::Snapshot,
941 format: FormatType::Parquet,
942 compression: CompressionType::default(),
943 compression_level: None,
944 max_file_size_bytes: None,
945 skip_empty: false,
946 meta_columns: MetaColumns::default(),
947 destination: DestinationConfig {
948 destination_type: DestinationType::Local,
949 path: Some("./out".into()),
950 ..Default::default()
951 },
952 quality: None,
953 tuning: SourceTuning::from_config(None),
954 tuning_profile_label: "balanced (default)".into(),
955 validate: false,
956 reconcile: false,
957 resume: false,
958 source: crate::config::SourceConfig {
959 source_type: crate::config::SourceType::Postgres,
960 url: Some("postgresql://localhost/test".into()),
961 url_env: None,
962 url_file: None,
963 host: None,
964 port: None,
965 user: None,
966 password: None,
967 password_env: None,
968 database: None,
969 environment: None,
970 tuning: None,
971 tls: None,
972 },
973 column_overrides: Default::default(),
974 schema_drift_policy: Default::default(),
975 shape_drift_warn_factor: 2.0,
976 parquet: None,
977 };
978 let mut s = RunSummary::new(&plan);
979 s.status = "failed".into();
980 s.error_message = Some(
981 "export 'events': --resume but no in-progress chunk checkpoint; \
982 run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
983 .to_string(),
984 );
985
986 let line = s.render_compact();
987 assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
988 assert!(line.contains("events"), "name present: {:?}", line);
989 assert!(
990 line.contains("--resume but no in-progress chunk checkpoint"),
991 "cause kept: {:?}",
992 line
993 );
994 assert!(
995 !line.contains("rivet state reset-chunks"),
996 "recovery hint should be stripped from per-export line: {:?}",
997 line
998 );
999 assert!(!line.contains('\n'), "single line: {:?}", line);
1000 }
1001}