1use super::ipc::{self, ChildEvent};
18use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
19use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
20use crate::manifest::ManifestPart;
21use crate::plan::ResolvedRunPlan;
22
23fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
29 PlanSnapshot {
30 export_name: plan.export_name.clone(),
31 base_query: plan.base_query.clone(),
32 strategy: plan.strategy.mode_label().to_string(),
33 format: plan.format.label().to_string(),
34 compression: plan.compression.label().to_string(),
35 destination_type: plan.destination.destination_type.label().to_string(),
36 tuning_profile: plan.tuning_profile_label.clone(),
37 batch_size: plan.tuning.batch_size,
38 validate: plan.validate,
39 reconcile: plan.reconcile,
40 resume: plan.resume,
41 }
42}
43
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct ApplyContext {
53 pub plan_id: String,
55 pub forced: bool,
57 pub force_bypassed: Vec<String>,
62}
63
64#[derive(Debug, Clone, Default)]
70pub struct RunSummary {
71 pub run_id: String,
72 pub export_name: String,
73 pub status: String,
74 pub total_rows: i64,
75 pub files_produced: usize,
76 pub bytes_written: u64,
77 pub files_committed: usize,
80 pub duration_ms: i64,
81 pub peak_rss_mb: i64,
82 pub retries: u32,
83 pub validated: Option<bool>,
84 pub schema_changed: Option<bool>,
85 pub quality_passed: Option<bool>,
86 pub column_checksums: Vec<crate::manifest::ColumnChecksum>,
89 pub checksum_key_column: Option<String>,
91 pub error_message: Option<String>,
92 pub tuning_profile: String,
94 pub batch_size: usize,
96 pub batch_size_memory_mb: Option<usize>,
98 pub format: String,
99 pub mode: String,
100 pub compression: String,
101 pub destination_uri: Option<String>,
106 pub pg_temp_bytes_delta: Option<i64>,
113 pub skip_reason: Option<String>,
119 pub source_count: Option<i64>,
121 pub reconciled: Option<bool>,
123 pub manifest_parts: Vec<ManifestPart>,
128 pub schema_fingerprint: Option<String>,
142 pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
148 pub apply_context: Option<ApplyContext>,
152 pub journal: RunJournal,
154}
155
156type Row = (&'static str, String);
159
160impl RunSummary {
161 pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
162 let run_id = format!(
163 "{}_{}",
164 plan.export_name,
165 chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
166 );
167 let mut journal = RunJournal::new(&run_id, &plan.export_name);
168 journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
169
170 ipc::emit_event(&ChildEvent::Started {
171 export_name: plan.export_name.clone(),
172 run_id: run_id.clone(),
173 mode: plan.strategy.mode_label().to_string(),
174 tuning_profile: plan.tuning_profile_label.clone(),
175 batch_size: plan.tuning.batch_size,
176 });
177
178 Self {
179 run_id,
180 export_name: plan.export_name.clone(),
181 status: "running".into(),
182 total_rows: 0,
183 files_produced: 0,
184 bytes_written: 0,
185 files_committed: 0,
186 duration_ms: 0,
187 peak_rss_mb: 0,
188 retries: 0,
189 validated: None,
190 schema_changed: None,
191 quality_passed: None,
192 error_message: None,
193 tuning_profile: plan.tuning_profile_label.clone(),
194 batch_size: plan.tuning.batch_size,
195 batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
196 format: plan.format.label().to_string(),
197 mode: plan.strategy.mode_label().to_string(),
198 compression: plan.compression.label().to_string(),
199 destination_uri: (!matches!(
200 plan.destination.destination_type,
201 crate::config::DestinationType::Stdout
202 ))
203 .then(|| crate::pipeline::finalize::destination_uri_for_manifest(&plan.destination)),
204 pg_temp_bytes_delta: None,
205 skip_reason: None,
206 source_count: None,
207 reconciled: None,
208 manifest_parts: Vec::new(),
209 schema_fingerprint: None,
210 manifest_verification: None,
211 apply_context: None,
212 column_checksums: Vec::new(),
213 checksum_key_column: None,
214 journal,
215 }
216 }
217
218 #[doc(hidden)]
239 #[allow(dead_code)]
240 pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
241 let run_id = run_id.into();
242 let export_name = export_name.into();
243 let journal = RunJournal::new(&run_id, &export_name);
244 Self {
245 run_id,
246 export_name,
247 status: "running".into(),
248 total_rows: 0,
249 files_produced: 0,
250 bytes_written: 0,
251 files_committed: 0,
252 duration_ms: 0,
253 peak_rss_mb: 0,
254 retries: 0,
255 validated: None,
256 schema_changed: None,
257 quality_passed: None,
258 error_message: None,
259 tuning_profile: "balanced".into(),
260 batch_size: 1000,
261 batch_size_memory_mb: None,
262 format: "parquet".into(),
263 mode: "snapshot".into(),
264 compression: "zstd".into(),
265 destination_uri: None,
266 pg_temp_bytes_delta: None,
267 skip_reason: None,
268 source_count: None,
269 reconciled: None,
270 manifest_parts: Vec::new(),
271 schema_fingerprint: None,
272 manifest_verification: None,
273 apply_context: None,
274 column_checksums: Vec::new(),
275 checksum_key_column: None,
276 journal,
277 }
278 }
279
280 #[doc(hidden)]
287 #[allow(dead_code)]
288 pub fn with_status(mut self, status: impl Into<String>) -> Self {
289 let s = status.into();
290 if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
291 self.journal.record(RunEvent::RunCompleted {
292 status: s.clone(),
293 error_message: self.error_message.clone(),
294 duration_ms: self.duration_ms,
295 });
296 }
297 self.status = s;
298 self
299 }
300
301 #[doc(hidden)]
305 #[allow(dead_code)]
306 pub fn with_files_committed(mut self, n: usize) -> Self {
307 self.files_committed = n;
308 self
309 }
310
311 #[doc(hidden)]
315 #[allow(dead_code)]
316 pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
317 self.total_rows = parts.iter().map(|p| p.rows).sum();
318 self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
319 self.files_produced = parts.len();
320 self.files_committed = parts.len();
321 self.manifest_parts = parts;
322 self
323 }
324
325 #[doc(hidden)]
329 #[allow(dead_code)]
330 pub fn with_error(mut self, msg: impl Into<String>) -> Self {
331 self.error_message = Some(msg.into());
332 self
333 }
334
335 #[doc(hidden)]
340 #[allow(dead_code)]
341 pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
342 self.journal.record(RunEvent::PlanResolved(snap));
343 self
344 }
345
346 pub(super) fn print(&self) {
347 if ipc::capturing_events() {
351 ipc::emit_event(&ChildEvent::Finished {
352 export_name: self.export_name.clone(),
353 run_id: self.run_id.clone(),
354 status: self.status.clone(),
355 total_rows: self.total_rows,
356 files_produced: self.files_produced as u64,
357 bytes_written: self.bytes_written,
358 duration_ms: self.duration_ms,
359 peak_rss_mb: self.peak_rss_mb,
360 error_message: self.error_message.clone(),
361 });
362 return;
363 }
364
365 self.print_stderr_block();
366 }
367
368 pub(super) fn print_stderr_block(&self) {
373 let block = if multi_export_mode() {
374 self.render_compact()
375 } else {
376 self.render().trim_end_matches('\n').to_string()
382 };
383
384 use std::io::Write;
385 let mut buf = super::parent_ui::sanitize_terminal(&block);
392 buf.push('\n');
393 let stderr = std::io::stderr();
394 let mut handle = stderr.lock();
395 let _ = handle.write_all(buf.as_bytes());
396 let _ = handle.flush();
397 }
398
399 fn render_compact(&self) -> String {
404 const NAME_COL: usize = 22;
405 const MODE_COL: usize = 8;
406 let icon = match self.status.as_str() {
407 "success" => "✓",
408 "failed" => "✗",
409 _ => "•",
410 };
411 let body = if self.status == "failed" {
412 let err = self
413 .error_message
414 .as_deref()
415 .unwrap_or("(no error message recorded)");
416 let (cause, _) = strip_chunked_recovery_hint(err);
417 compact_error(cause)
421 } else {
422 let rss = if self.peak_rss_mb > 0 {
423 format!(" RSS {} MB", fmt_thousands(self.peak_rss_mb))
424 } else {
425 String::new()
426 };
427 format!(
428 "{} rows {} files {} {}{}",
429 fmt_thousands(self.total_rows),
430 fmt_thousands(self.files_produced as i64),
431 format_bytes(self.bytes_written),
432 fmt_duration_ms(self.duration_ms),
433 rss
434 )
435 };
436 format!(
437 "{} {:<name$} {:<mode$} {}",
438 icon,
439 self.export_name,
440 self.mode,
441 body,
442 name = NAME_COL,
443 mode = MODE_COL,
444 )
445 }
446
447 fn render(&self) -> String {
459 let mut rows: Vec<Row> = Vec::with_capacity(16);
460 rows.push(("run_id", self.run_id.clone()));
461 rows.push(self.status_row());
462 rows.push(self.tuning_row());
463 rows.push(("rows", fmt_thousands(self.total_rows)));
464 rows.push(("files", fmt_thousands(self.files_produced as i64)));
465 rows.extend(self.output_row());
466 rows.extend(self.position_row());
467 rows.extend(self.bytes_row());
468 rows.push(("duration", fmt_duration_ms(self.duration_ms)));
469 rows.extend(self.peak_rss_row());
470 rows.extend(self.pg_temp_spill_row());
471 rows.extend(self.compression_row());
472 rows.extend(self.retries_row());
473 rows.extend(self.outcome_rows());
474 rows.extend(self.error_row());
475 format_block(&self.export_name, &rows)
476 }
477
478 fn status_row(&self) -> Row {
480 let value = match (&self.status, &self.skip_reason) {
481 (s, Some(reason)) if s == "skipped" => format!("{s} ({reason})"),
482 (s, _) => s.clone(),
483 };
484 ("status", value)
485 }
486
487 fn tuning_row(&self) -> Row {
490 let value = match self.batch_size_memory_mb {
491 Some(mem) => format!(
492 "profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
493 self.tuning_profile,
494 fmt_thousands(self.batch_size as i64),
495 mem
496 ),
497 None => format!(
498 "profile={}, batch_size={}",
499 self.tuning_profile,
500 fmt_thousands(self.batch_size as i64)
501 ),
502 };
503 ("tuning", value)
504 }
505
506 fn output_row(&self) -> Option<Row> {
510 if self.files_produced > 0 {
511 self.destination_uri.clone().map(|uri| ("output", uri))
512 } else {
513 None
514 }
515 }
516
517 fn position_row(&self) -> Option<Row> {
524 if let Some(pos) = incremental_position_line(self.skip_reason.as_deref()) {
525 Some(("cursor", pos))
526 } else {
527 time_window_skip_line(&self.mode, self.skip_reason.as_deref()).map(|w| ("window", w))
528 }
529 }
530
531 fn bytes_row(&self) -> Option<Row> {
533 if self.bytes_written > 0 {
534 Some(("bytes", format_bytes(self.bytes_written)))
535 } else {
536 None
537 }
538 }
539
540 fn peak_rss_row(&self) -> Option<Row> {
542 if self.peak_rss_mb > 0 {
543 Some((
544 "peak RSS",
545 format!(
546 "{} MB (sampled during run)",
547 fmt_thousands(self.peak_rss_mb)
548 ),
549 ))
550 } else {
551 None
552 }
553 }
554
555 fn pg_temp_spill_row(&self) -> Option<Row> {
559 let temp = self.pg_temp_bytes_delta?;
560 if temp <= 0 {
561 return None;
562 }
563 let temp_mb = temp as f64 / (1024.0 * 1024.0);
564 let label = if temp > 100 * 1024 * 1024 {
565 format!(
566 "{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
567 temp_mb
568 )
569 } else {
570 format!("{:.1} MB", temp_mb)
571 };
572 Some(("pg temp spill", label))
573 }
574
575 fn compression_row(&self) -> Option<Row> {
577 if self.format == "parquet" && self.compression != "zstd" {
578 Some(("compression", self.compression.clone()))
579 } else {
580 None
581 }
582 }
583
584 fn retries_row(&self) -> Option<Row> {
586 if self.retries > 0 {
587 Some(("retries", self.retries.to_string()))
588 } else {
589 None
590 }
591 }
592
593 fn outcome_rows(&self) -> Vec<Row> {
599 let mut rows: Vec<Row> = Vec::new();
600 if let Some(v) = self.validated {
601 rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
602 }
603 if let Some(sc) = self.schema_changed {
604 rows.push((
605 "schema",
606 if sc {
607 "CHANGED".into()
608 } else {
609 "unchanged".into()
610 },
611 ));
612 }
613 if let Some(q) = self.quality_passed {
614 rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
615 }
616 if let Some(reconciled) = self.reconciled {
617 let src = self
618 .source_count
619 .map(fmt_thousands)
620 .unwrap_or_else(|| "?".into());
621 let exported = fmt_thousands(self.total_rows);
622 let value = if reconciled {
623 format!("MATCH ({exported}/{src})")
624 } else {
625 format!("MISMATCH (exported {exported} vs source {src})")
626 };
627 rows.push(("reconcile", value));
628 }
629 if self.status == "success"
634 && self.files_produced > 0
635 && self.validated.is_none()
636 && self.reconciled.is_none()
637 {
638 rows.push((
639 "verify",
640 "not run — add `--reconcile` (count vs source) or `rivet validate` (re-read outputs)"
641 .into(),
642 ));
643 }
644 rows
645 }
646
647 fn error_row(&self) -> Option<Row> {
653 self.error_message
654 .as_ref()
655 .map(|err| ("error", err.trim_end().to_string()))
656 }
657
658 pub fn check_post_run_invariants(&self) -> Result<(), String> {
674 let parts_bytes: u64 = self.manifest_parts.iter().map(|p| p.size_bytes).sum();
675
676 if self.files_committed > self.manifest_parts.len() {
677 return Err(format!(
678 "summary.files_committed ({}) > manifest_parts.len() ({}) — \
679 a runner bumped files_committed without commit::record_part",
680 self.files_committed,
681 self.manifest_parts.len()
682 ));
683 }
684 if self.files_produced > self.manifest_parts.len() {
685 return Err(format!(
686 "summary.files_produced ({}) > manifest_parts.len() ({}) — \
687 a runner bumped files_produced without commit::record_part",
688 self.files_produced,
689 self.manifest_parts.len()
690 ));
691 }
692 if self.bytes_written > parts_bytes {
693 return Err(format!(
694 "summary.bytes_written ({}) > sum(manifest_parts.size_bytes) ({}) — \
695 a runner bumped bytes_written without commit::record_part",
696 self.bytes_written, parts_bytes
697 ));
698 }
699 if self.status == "success" && self.files_committed > 0 && self.manifest_parts.is_empty() {
700 return Err(format!(
701 "success run with files_committed={} has empty manifest_parts — \
702 cloud manifest (ADR-0012 M1) would ship with no part list \
703 (this is the gap parallel_checkpoint had before commit e9b0796)",
704 self.files_committed
705 ));
706 }
707 if self.status == "success" && self.total_rows > 0 && self.files_committed == 0 {
719 return Err(format!(
720 "summary.total_rows={} but files_committed=0 — rows extracted from \
721 source but no files committed (no output reached the destination)",
722 self.total_rows
723 ));
724 }
725 Ok(())
726 }
727}
728
729fn compact_error(raw: &str) -> String {
741 const MAX_CHARS: usize = 240;
742 if let Some(summary) = summarize_parallel_chunk_errors(raw) {
743 return clamp_chars(&summary, MAX_CHARS);
744 }
745 let collapsed: String = raw
746 .lines()
747 .map(str::trim_end)
748 .filter(|s| !s.is_empty())
749 .collect::<Vec<_>>()
750 .join("; ");
751 clamp_chars(&collapsed, MAX_CHARS)
752}
753
754fn incremental_position_line(skip_reason: Option<&str>) -> Option<String> {
762 let col = skip_reason?
763 .strip_prefix("no new rows since cursor '")?
764 .strip_suffix('\'')?;
765 Some(format!("'{col}' unchanged (no new rows this run)"))
766}
767
768fn time_window_skip_line(mode: &str, skip_reason: Option<&str>) -> Option<String> {
785 skip_reason?;
786 if mode != "timewindow" {
787 return None;
788 }
789 Some("rolling time window matched no rows — check `time_column`/`days_window`".to_string())
790}
791
792fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
793 let header_pos = raw.find("parallel checkpoint worker errors:")?;
794 let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
795 let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
796
797 let chunk_lines: Vec<&str> = tail
798 .lines()
799 .map(str::trim)
800 .filter(|l| l.starts_with("chunk "))
801 .collect();
802 if chunk_lines.is_empty() {
803 return None;
804 }
805 let first_chunk_full = chunk_lines[0];
806 let first_chunk_short = clamp_chars(first_chunk_full, 140);
808 let prefix = if prefix.is_empty() {
809 String::new()
810 } else {
811 format!("{}: ", prefix)
812 };
813 Some(format!(
814 "{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
815 prefix,
816 chunk_lines.len(),
817 first_chunk_short
818 ))
819}
820
821fn clamp_chars(s: &str, max_chars: usize) -> String {
822 if max_chars == 0 {
823 return String::new();
824 }
825 if s.chars().count() <= max_chars {
826 return s.to_string();
827 }
828 let keep = max_chars.saturating_sub(1);
829 let mut out: String = s.chars().take(keep).collect();
830 out.push('…');
831 out
832}
833
834fn format_block(name: &str, rows: &[(&str, String)]) -> String {
837 const HEADER_WIDTH: usize = 60;
838 let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
839
840 let prefix = format!("── {} ", name);
841 let prefix_chars = prefix.chars().count();
842 let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
843 let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
844 out.push('\n');
845 out.push_str(&prefix);
846 for _ in 0..dashes {
847 out.push('─');
848 }
849 out.push('\n');
850 let value_indent = " ".repeat(2 + (label_w + 1) + 2);
854 for (label, value) in rows {
855 let mut lines = value.split('\n');
858 let first = lines.next().unwrap_or("");
859 out.push_str(&format!(
860 " {:<width$} {}\n",
861 format!("{label}:"),
862 first,
863 width = label_w + 1
864 ));
865 for cont in lines {
866 out.push_str(&value_indent);
867 out.push_str(cont);
868 out.push('\n');
869 }
870 }
871 out
872}
873
874fn fmt_duration_ms(ms: i64) -> String {
875 if ms < 1000 {
876 return format!("{}ms", ms);
877 }
878 let total_secs = ms / 1000;
879 let h = total_secs / 3600;
880 let m = (total_secs % 3600) / 60;
881 let s_frac = (ms % 60_000) as f64 / 1000.0;
882 if h > 0 {
883 format!("{}h {:02}m {:04.1}s", h, m, s_frac)
884 } else if m > 0 {
885 format!("{}m {:04.1}s", m, s_frac)
886 } else {
887 format!("{:.1}s", ms as f64 / 1000.0)
888 }
889}
890
891fn fmt_thousands(n: i64) -> String {
895 let abs = n.unsigned_abs();
896 let s = abs.to_string();
897 let bytes = s.as_bytes();
898 let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
899 if n < 0 {
900 out.push('-');
901 }
902 for (i, b) in bytes.iter().enumerate() {
903 let from_end = bytes.len() - i;
904 if i > 0 && from_end.is_multiple_of(3) {
905 out.push(',');
906 }
907 out.push(*b as char);
908 }
909 out
910}
911
912#[cfg(test)]
913mod tests {
914 use super::*;
915
916 #[test]
917 fn fmt_thousands_handles_small_and_large() {
918 assert_eq!(fmt_thousands(0), "0");
919 assert_eq!(fmt_thousands(7), "7");
920 assert_eq!(fmt_thousands(999), "999");
921 assert_eq!(fmt_thousands(1_000), "1,000");
922 assert_eq!(fmt_thousands(1_000_908), "1,000,908");
923 assert_eq!(fmt_thousands(39_990_376), "39,990,376");
924 assert_eq!(fmt_thousands(-1_234), "-1,234");
925 assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
926 }
927
928 #[test]
929 fn fmt_duration_picks_unit() {
930 assert_eq!(fmt_duration_ms(0), "0ms");
931 assert_eq!(fmt_duration_ms(800), "800ms");
932 assert_eq!(fmt_duration_ms(1_500), "1.5s");
933 assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
934 assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
935 }
936
937 #[test]
938 fn format_block_pads_labels_uniformly() {
939 let rows = vec![
940 ("run_id", "abc".to_string()),
941 ("rows", "42".to_string()),
942 ("compression", "zstd".to_string()),
943 ];
944 let out = format_block("orders", &rows);
945
946 let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
948 assert_eq!(lines.len(), 3);
949 let value_starts: Vec<usize> = lines
950 .iter()
951 .map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
952 .collect();
953 let value_col = lines[0].rfind("abc").unwrap();
957 assert_eq!(lines[1].rfind("42").unwrap(), value_col);
958 assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
959 let _ = value_starts;
961 }
962
963 #[test]
964 fn format_block_header_has_consistent_width() {
965 let block_a = format_block("a", &[("rows", "1".into())]);
966 let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
967 let header_a = block_a.lines().nth(1).unwrap();
968 let header_b = block_b.lines().nth(1).unwrap();
969 assert_eq!(
970 header_a.chars().count(),
971 header_b.chars().count(),
972 "headers must be the same width regardless of name length: {:?} vs {:?}",
973 header_a,
974 header_b
975 );
976 }
977
978 #[test]
979 fn render_produces_a_single_string_with_trailing_newline() {
980 use crate::plan::{
981 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
982 MetaColumns, ResolvedRunPlan,
983 };
984 use crate::tuning::SourceTuning;
985 let plan = ResolvedRunPlan {
986 export_name: "orders".into(),
987 base_query: "SELECT 1".into(),
988 strategy: ExtractionStrategy::Snapshot,
989 format: FormatType::Parquet,
990 compression: CompressionType::default(),
991 compression_level: None,
992 max_file_size_bytes: None,
993 skip_empty: false,
994 meta_columns: MetaColumns::default(),
995 destination: DestinationConfig {
996 destination_type: DestinationType::Local,
997 path: Some("./out".into()),
998 ..Default::default()
999 },
1000 quality: None,
1001 tuning: SourceTuning::from_config(None),
1002 tuning_profile_label: "balanced (default)".into(),
1003 validate: false,
1004 reconcile: false,
1005 resume: false,
1006 source: crate::config::SourceConfig {
1007 source_type: crate::config::SourceType::Postgres,
1008 url: Some("postgresql://localhost/test".into()),
1009 url_env: None,
1010 url_file: None,
1011 host: None,
1012 port: None,
1013 user: None,
1014 password: None,
1015 password_env: None,
1016 database: None,
1017 environment: None,
1018 tuning: None,
1019 tls: None,
1020 },
1021 column_overrides: Default::default(),
1022 verify: crate::config::VerifyMode::Size,
1023 schema_drift_policy: Default::default(),
1024 shape_drift_warn_factor: 2.0,
1025 parquet: None,
1026 };
1027 let mut s = RunSummary::new(&plan);
1028 s.status = "success".into();
1029 s.total_rows = 1_000_908;
1030 s.files_produced = 11;
1031 s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
1032 s.duration_ms = 68_400;
1033 s.peak_rss_mb = 884;
1034
1035 let block = s.render();
1036 assert!(
1037 block.starts_with('\n'),
1038 "block should start with a blank line"
1039 );
1040 assert!(block.ends_with('\n'), "block should end with a newline");
1041 assert!(block.contains("── orders "));
1042 assert!(
1043 block.contains("1,000,908"),
1044 "rows should be formatted with thousands separator: {}",
1045 block
1046 );
1047 assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
1048 assert!(!block.contains('\r'));
1051
1052 let line = s.render_compact();
1054 assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
1055 assert!(line.contains("orders"), "export name present: {:?}", line);
1056 assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
1057 assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
1058 assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
1059 assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
1060 assert!(!line.contains('\n'), "single line: {:?}", line);
1061 }
1062
1063 #[test]
1064 fn compact_error_summarises_parallel_chunk_errors() {
1065 let raw = "export 'page_views': parallel checkpoint worker errors:\n\
1066 chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
1067 chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
1068 let out = compact_error(raw);
1069 assert!(
1070 out.contains("2 chunk(s)"),
1071 "should report number of failed chunks: {:?}",
1072 out
1073 );
1074 assert!(
1075 out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
1076 "should keep export prefix and use compact phrasing: {:?}",
1077 out
1078 );
1079 assert!(
1080 out.contains("chunk 4:"),
1081 "should include the first chunk as an example: {:?}",
1082 out
1083 );
1084 assert!(!out.contains('\n'), "single line output: {:?}", out);
1085 assert!(
1086 out.chars().count() <= 240,
1087 "must be clamped to <=240 chars, got {}: {:?}",
1088 out.chars().count(),
1089 out
1090 );
1091 }
1092
1093 #[test]
1094 fn compact_error_collapses_generic_multiline() {
1095 let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
1096 let out = compact_error(raw);
1097 assert_eq!(
1098 out, "first line of trouble; second line with detail; third line",
1099 "newlines should collapse to '; ' and blanks dropped"
1100 );
1101 }
1102
1103 #[test]
1104 fn compact_error_clamps_excessively_long_lines() {
1105 let raw = "x".repeat(1_000);
1106 let out = compact_error(&raw);
1107 assert_eq!(out.chars().count(), 240);
1108 assert!(out.ends_with('…'));
1109 }
1110
1111 #[test]
1112 fn render_compact_strips_chunked_recovery_hint_for_failed() {
1113 use crate::plan::{
1114 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1115 MetaColumns, ResolvedRunPlan,
1116 };
1117 use crate::tuning::SourceTuning;
1118 let plan = ResolvedRunPlan {
1119 export_name: "events".into(),
1120 base_query: "SELECT 1".into(),
1121 strategy: ExtractionStrategy::Snapshot,
1122 format: FormatType::Parquet,
1123 compression: CompressionType::default(),
1124 compression_level: None,
1125 max_file_size_bytes: None,
1126 skip_empty: false,
1127 meta_columns: MetaColumns::default(),
1128 destination: DestinationConfig {
1129 destination_type: DestinationType::Local,
1130 path: Some("./out".into()),
1131 ..Default::default()
1132 },
1133 quality: None,
1134 tuning: SourceTuning::from_config(None),
1135 tuning_profile_label: "balanced (default)".into(),
1136 validate: false,
1137 reconcile: false,
1138 resume: false,
1139 source: crate::config::SourceConfig {
1140 source_type: crate::config::SourceType::Postgres,
1141 url: Some("postgresql://localhost/test".into()),
1142 url_env: None,
1143 url_file: None,
1144 host: None,
1145 port: None,
1146 user: None,
1147 password: None,
1148 password_env: None,
1149 database: None,
1150 environment: None,
1151 tuning: None,
1152 tls: None,
1153 },
1154 column_overrides: Default::default(),
1155 verify: crate::config::VerifyMode::Size,
1156 schema_drift_policy: Default::default(),
1157 shape_drift_warn_factor: 2.0,
1158 parquet: None,
1159 };
1160 let mut s = RunSummary::new(&plan);
1161 s.status = "failed".into();
1162 s.error_message = Some(
1163 "export 'events': --resume but no in-progress chunk checkpoint; \
1164 run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
1165 .to_string(),
1166 );
1167
1168 let line = s.render_compact();
1169 assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
1170 assert!(line.contains("events"), "name present: {:?}", line);
1171 assert!(
1172 line.contains("--resume but no in-progress chunk checkpoint"),
1173 "cause kept: {:?}",
1174 line
1175 );
1176 assert!(
1177 !line.contains("rivet state reset-chunks"),
1178 "recovery hint should be stripped from per-export line: {:?}",
1179 line
1180 );
1181 assert!(!line.contains('\n'), "single line: {:?}", line);
1182 }
1183
1184 fn plan_for(export_name: &str) -> crate::plan::ResolvedRunPlan {
1185 use crate::plan::{
1186 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1187 MetaColumns, ResolvedRunPlan,
1188 };
1189 use crate::tuning::SourceTuning;
1190 ResolvedRunPlan {
1191 export_name: export_name.into(),
1192 base_query: "SELECT 1".into(),
1193 strategy: ExtractionStrategy::Snapshot,
1194 format: FormatType::Parquet,
1195 compression: CompressionType::default(),
1196 compression_level: None,
1197 max_file_size_bytes: None,
1198 skip_empty: false,
1199 meta_columns: MetaColumns::default(),
1200 destination: DestinationConfig {
1201 destination_type: DestinationType::Local,
1202 path: Some("./out".into()),
1203 ..Default::default()
1204 },
1205 quality: None,
1206 tuning: SourceTuning::from_config(None),
1207 tuning_profile_label: "balanced (default)".into(),
1208 validate: false,
1209 reconcile: false,
1210 resume: false,
1211 source: crate::config::SourceConfig {
1212 source_type: crate::config::SourceType::Postgres,
1213 url: Some("postgresql://localhost/test".into()),
1214 url_env: None,
1215 url_file: None,
1216 host: None,
1217 port: None,
1218 user: None,
1219 password: None,
1220 password_env: None,
1221 database: None,
1222 environment: None,
1223 tuning: None,
1224 tls: None,
1225 },
1226 column_overrides: Default::default(),
1227 verify: crate::config::VerifyMode::Size,
1228 schema_drift_policy: Default::default(),
1229 shape_drift_warn_factor: 2.0,
1230 parquet: None,
1231 }
1232 }
1233
1234 #[test]
1235 fn render_preserves_multiline_error_block() {
1236 let mut s = RunSummary::new(&plan_for("orders"));
1240 s.status = "failed".into();
1241 s.error_message = Some(
1242 "export 'orders': 1 quality check(s) failed:\n \
1243 - row_count 10 below minimum 999999\n \
1244 Fix the source data, or adjust the thresholds under `quality:` in your config."
1245 .to_string(),
1246 );
1247
1248 let block = s.render();
1249 assert!(
1252 !block.contains("failed:;"),
1253 "error must not be '; '-flattened in the detailed block: {block}"
1254 );
1255 assert!(
1256 block.contains("- row_count 10 below minimum 999999"),
1257 "failing check line present: {block}"
1258 );
1259 let err_lines: Vec<&str> = block
1261 .lines()
1262 .filter(|l| {
1263 l.contains("quality check(s) failed")
1264 || l.contains("row_count 10 below minimum")
1265 || l.contains("Fix the source data")
1266 })
1267 .collect();
1268 assert_eq!(
1269 err_lines.len(),
1270 3,
1271 "all three error lines should render on separate lines: {block}"
1272 );
1273 for l in &err_lines {
1275 assert!(l.starts_with(' '), "error line should be indented: {l:?}");
1276 }
1277 }
1278
1279 #[test]
1280 fn render_nudges_verification_when_unverified_success() {
1281 let mut s = RunSummary::new(&plan_for("orders"));
1284 s.status = "success".into();
1285 s.files_produced = 3;
1286 s.total_rows = 1_000;
1287 let block = s.render();
1289 assert!(
1290 block.lines().any(|l| l.trim_start().starts_with("verify:")),
1291 "expected a verify nudge on an unverified success: {block}"
1292 );
1293
1294 let mut s2 = RunSummary::new(&plan_for("orders"));
1296 s2.status = "success".into();
1297 s2.files_produced = 3;
1298 s2.validated = Some(true);
1299 let block2 = s2.render();
1300 assert!(
1301 !block2
1302 .lines()
1303 .any(|l| l.trim_start().starts_with("verify:")),
1304 "a verified run must not nudge: {block2}"
1305 );
1306 }
1307
1308 #[test]
1309 fn pg_temp_spill_row_only_for_real_spill_and_annotates_large() {
1310 let mut s = RunSummary::stub_for_testing("r", "orders");
1313 assert_eq!(s.pg_temp_spill_row(), None, "no delta → no row");
1314 s.pg_temp_bytes_delta = Some(0);
1315 assert_eq!(s.pg_temp_spill_row(), None, "zero spill → no row");
1316 s.pg_temp_bytes_delta = Some(-5);
1317 assert_eq!(s.pg_temp_spill_row(), None, "negative delta → no row");
1318
1319 s.pg_temp_bytes_delta = Some(50 * 1024 * 1024);
1320 let (label, value) = s.pg_temp_spill_row().expect("50MB spill → row");
1321 assert_eq!(label, "pg temp spill");
1322 assert!(
1323 value.contains("50.0 MB") && !value.contains('⚠'),
1324 "small spill is plain info: {value:?}"
1325 );
1326
1327 s.pg_temp_bytes_delta = Some(200 * 1024 * 1024);
1328 let (_, value) = s.pg_temp_spill_row().expect("200MB spill → row");
1329 assert!(
1330 value.contains('⚠') && value.contains("batch_size"),
1331 "spill over 100 MB carries the tuning hint: {value:?}"
1332 );
1333 }
1334
1335 #[test]
1336 fn outcome_rows_format_reconcile_and_suppress_nudge_when_checked() {
1337 let mut s = RunSummary::stub_for_testing("r", "orders");
1338 s.reconciled = Some(true);
1339 s.source_count = Some(1_000);
1340 s.total_rows = 1_000;
1341 assert!(
1342 s.outcome_rows()
1343 .iter()
1344 .any(|(l, v)| *l == "reconcile" && v == "MATCH (1,000/1,000)"),
1345 "match wording: {:?}",
1346 s.outcome_rows()
1347 );
1348
1349 s.reconciled = Some(false);
1350 s.source_count = Some(1_200);
1351 let rows = s.outcome_rows();
1352 let recon = rows
1353 .iter()
1354 .find(|(l, _)| *l == "reconcile")
1355 .expect("reconcile row");
1356 assert!(
1357 recon.1.contains("MISMATCH") && recon.1.contains("1,000") && recon.1.contains("1,200"),
1358 "mismatch names both sides: {:?}",
1359 recon
1360 );
1361
1362 s.status = "success".into();
1365 s.files_produced = 2;
1366 assert!(
1367 !s.outcome_rows().iter().any(|(l, _)| *l == "verify"),
1368 "a reconciled run must not also nudge"
1369 );
1370 }
1371
1372 #[test]
1373 fn render_surfaces_cursor_position_on_zero_new_incremental() {
1374 let mut s = RunSummary::new(&plan_for("orders"));
1378 s.status = "skipped".into();
1379 s.skip_reason = Some("no new rows since cursor 'updated_at'".into());
1380
1381 let block = s.render();
1382 let cursor_line = block
1383 .lines()
1384 .find(|l| l.trim_start().starts_with("cursor:"))
1385 .unwrap_or_else(|| panic!("expected a cursor: line in block: {block}"));
1386 assert!(
1387 cursor_line.contains("'updated_at'"),
1388 "cursor line names the column: {cursor_line:?}"
1389 );
1390 assert!(
1391 cursor_line.contains("unchanged"),
1392 "cursor line reports the position held: {cursor_line:?}"
1393 );
1394 }
1395
1396 #[test]
1397 fn incremental_position_line_only_for_cursor_skips() {
1398 assert_eq!(
1400 incremental_position_line(Some("no new rows since cursor 'ts'")),
1401 Some("'ts' unchanged (no new rows this run)".into())
1402 );
1403 assert_eq!(
1404 incremental_position_line(Some("source returned 0 rows")),
1405 None
1406 );
1407 assert_eq!(incremental_position_line(None), None);
1408 }
1409
1410 #[test]
1411 fn render_surfaces_window_position_on_zero_row_time_window() {
1412 let mut s = RunSummary::new(&plan_for("events"));
1418 s.status = "skipped".into();
1419 s.mode = "timewindow".into();
1420 s.skip_reason = Some("source returned 0 rows".into());
1421
1422 let block = s.render();
1423 let window_line = block
1424 .lines()
1425 .find(|l| l.trim_start().starts_with("window:"))
1426 .unwrap_or_else(|| panic!("expected a window: line in block: {block}"));
1427 assert!(
1428 window_line.contains("matched no rows"),
1429 "window line reports the empty window: {window_line:?}"
1430 );
1431 assert!(
1432 window_line.contains("time_column") && window_line.contains("days_window"),
1433 "window line points at the window config to check: {window_line:?}"
1434 );
1435 assert!(
1437 !block.lines().any(|l| l.trim_start().starts_with("cursor:")),
1438 "no cursor line for a non-cursor strategy: {block}"
1439 );
1440 }
1441
1442 #[test]
1443 fn time_window_skip_line_only_for_skipped_time_window() {
1444 assert_eq!(
1446 time_window_skip_line("timewindow", Some("source returned 0 rows")),
1447 Some("rolling time window matched no rows — check `time_column`/`days_window`".into())
1448 );
1449 assert_eq!(
1451 time_window_skip_line("incremental", Some("source returned 0 rows")),
1452 None
1453 );
1454 assert_eq!(
1455 time_window_skip_line("full", Some("source returned 0 rows")),
1456 None
1457 );
1458 assert_eq!(time_window_skip_line("timewindow", None), None);
1460 }
1461}