1use super::ipc::{self, ChildEvent};
18use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
19use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
20use crate::manifest::ManifestPart;
21use crate::plan::ResolvedRunPlan;
22
23fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
29 PlanSnapshot {
30 export_name: plan.export_name.clone(),
31 base_query: plan.base_query.clone(),
32 strategy: plan.strategy.mode_label().to_string(),
33 format: plan.format.label().to_string(),
34 compression: plan.compression.label().to_string(),
35 destination_type: plan.destination.destination_type.label().to_string(),
36 tuning_profile: plan.tuning_profile_label.clone(),
37 batch_size: plan.tuning.batch_size,
38 validate: plan.validate,
39 reconcile: plan.reconcile,
40 resume: plan.resume,
41 }
42}
43
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct ApplyContext {
53 pub plan_id: String,
55 pub forced: bool,
57 pub force_bypassed: Vec<String>,
62}
63
64#[derive(Debug, Clone)]
70pub struct RunSummary {
71 pub run_id: String,
72 pub export_name: String,
73 pub status: String,
74 pub total_rows: i64,
75 pub files_produced: usize,
76 pub bytes_written: u64,
77 pub files_committed: usize,
80 pub duration_ms: i64,
81 pub peak_rss_mb: i64,
82 pub retries: u32,
83 pub validated: Option<bool>,
84 pub schema_changed: Option<bool>,
85 pub quality_passed: Option<bool>,
86 pub error_message: Option<String>,
87 pub tuning_profile: String,
89 pub batch_size: usize,
91 pub batch_size_memory_mb: Option<usize>,
93 pub format: String,
94 pub mode: String,
95 pub compression: String,
96 pub destination_uri: Option<String>,
101 pub pg_temp_bytes_delta: Option<i64>,
108 pub skip_reason: Option<String>,
114 pub source_count: Option<i64>,
116 pub reconciled: Option<bool>,
118 pub manifest_parts: Vec<ManifestPart>,
123 pub schema_fingerprint: Option<String>,
137 pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
143 pub apply_context: Option<ApplyContext>,
147 pub journal: RunJournal,
149}
150
151type Row = (&'static str, String);
154
155impl RunSummary {
156 pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
157 let run_id = format!(
158 "{}_{}",
159 plan.export_name,
160 chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
161 );
162 let mut journal = RunJournal::new(&run_id, &plan.export_name);
163 journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
164
165 ipc::emit_event(&ChildEvent::Started {
166 export_name: plan.export_name.clone(),
167 run_id: run_id.clone(),
168 mode: plan.strategy.mode_label().to_string(),
169 tuning_profile: plan.tuning_profile_label.clone(),
170 batch_size: plan.tuning.batch_size,
171 });
172
173 Self {
174 run_id,
175 export_name: plan.export_name.clone(),
176 status: "running".into(),
177 total_rows: 0,
178 files_produced: 0,
179 bytes_written: 0,
180 files_committed: 0,
181 duration_ms: 0,
182 peak_rss_mb: 0,
183 retries: 0,
184 validated: None,
185 schema_changed: None,
186 quality_passed: None,
187 error_message: None,
188 tuning_profile: plan.tuning_profile_label.clone(),
189 batch_size: plan.tuning.batch_size,
190 batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
191 format: plan.format.label().to_string(),
192 mode: plan.strategy.mode_label().to_string(),
193 compression: plan.compression.label().to_string(),
194 destination_uri: (!matches!(
195 plan.destination.destination_type,
196 crate::config::DestinationType::Stdout
197 ))
198 .then(|| crate::pipeline::finalize::destination_uri_for_manifest(&plan.destination)),
199 pg_temp_bytes_delta: None,
200 skip_reason: None,
201 source_count: None,
202 reconciled: None,
203 manifest_parts: Vec::new(),
204 schema_fingerprint: None,
205 manifest_verification: None,
206 apply_context: None,
207 journal,
208 }
209 }
210
211 #[doc(hidden)]
232 #[allow(dead_code)]
233 pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
234 let run_id = run_id.into();
235 let export_name = export_name.into();
236 let journal = RunJournal::new(&run_id, &export_name);
237 Self {
238 run_id,
239 export_name,
240 status: "running".into(),
241 total_rows: 0,
242 files_produced: 0,
243 bytes_written: 0,
244 files_committed: 0,
245 duration_ms: 0,
246 peak_rss_mb: 0,
247 retries: 0,
248 validated: None,
249 schema_changed: None,
250 quality_passed: None,
251 error_message: None,
252 tuning_profile: "balanced".into(),
253 batch_size: 1000,
254 batch_size_memory_mb: None,
255 format: "parquet".into(),
256 mode: "snapshot".into(),
257 compression: "zstd".into(),
258 destination_uri: None,
259 pg_temp_bytes_delta: None,
260 skip_reason: None,
261 source_count: None,
262 reconciled: None,
263 manifest_parts: Vec::new(),
264 schema_fingerprint: None,
265 manifest_verification: None,
266 apply_context: None,
267 journal,
268 }
269 }
270
271 #[doc(hidden)]
278 #[allow(dead_code)]
279 pub fn with_status(mut self, status: impl Into<String>) -> Self {
280 let s = status.into();
281 if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
282 self.journal.record(RunEvent::RunCompleted {
283 status: s.clone(),
284 error_message: self.error_message.clone(),
285 duration_ms: self.duration_ms,
286 });
287 }
288 self.status = s;
289 self
290 }
291
292 #[doc(hidden)]
296 #[allow(dead_code)]
297 pub fn with_files_committed(mut self, n: usize) -> Self {
298 self.files_committed = n;
299 self
300 }
301
302 #[doc(hidden)]
306 #[allow(dead_code)]
307 pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
308 self.total_rows = parts.iter().map(|p| p.rows).sum();
309 self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
310 self.files_produced = parts.len();
311 self.files_committed = parts.len();
312 self.manifest_parts = parts;
313 self
314 }
315
316 #[doc(hidden)]
320 #[allow(dead_code)]
321 pub fn with_error(mut self, msg: impl Into<String>) -> Self {
322 self.error_message = Some(msg.into());
323 self
324 }
325
326 #[doc(hidden)]
331 #[allow(dead_code)]
332 pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
333 self.journal.record(RunEvent::PlanResolved(snap));
334 self
335 }
336
337 pub(super) fn print(&self) {
338 if ipc::capturing_events() {
342 ipc::emit_event(&ChildEvent::Finished {
343 export_name: self.export_name.clone(),
344 run_id: self.run_id.clone(),
345 status: self.status.clone(),
346 total_rows: self.total_rows,
347 files_produced: self.files_produced as u64,
348 bytes_written: self.bytes_written,
349 duration_ms: self.duration_ms,
350 peak_rss_mb: self.peak_rss_mb,
351 error_message: self.error_message.clone(),
352 });
353 return;
354 }
355
356 self.print_stderr_block();
357 }
358
359 pub(super) fn print_stderr_block(&self) {
364 let block = if multi_export_mode() {
365 self.render_compact()
366 } else {
367 self.render().trim_end_matches('\n').to_string()
373 };
374
375 use std::io::Write;
376 let mut buf = super::parent_ui::sanitize_terminal(&block);
383 buf.push('\n');
384 let stderr = std::io::stderr();
385 let mut handle = stderr.lock();
386 let _ = handle.write_all(buf.as_bytes());
387 let _ = handle.flush();
388 }
389
390 fn render_compact(&self) -> String {
395 const NAME_COL: usize = 22;
396 const MODE_COL: usize = 8;
397 let icon = match self.status.as_str() {
398 "success" => "✓",
399 "failed" => "✗",
400 _ => "•",
401 };
402 let body = if self.status == "failed" {
403 let err = self
404 .error_message
405 .as_deref()
406 .unwrap_or("(no error message recorded)");
407 let (cause, _) = strip_chunked_recovery_hint(err);
408 compact_error(cause)
412 } else {
413 let rss = if self.peak_rss_mb > 0 {
414 format!(" RSS {} MB", fmt_thousands(self.peak_rss_mb))
415 } else {
416 String::new()
417 };
418 format!(
419 "{} rows {} files {} {}{}",
420 fmt_thousands(self.total_rows),
421 fmt_thousands(self.files_produced as i64),
422 format_bytes(self.bytes_written),
423 fmt_duration_ms(self.duration_ms),
424 rss
425 )
426 };
427 format!(
428 "{} {:<name$} {:<mode$} {}",
429 icon,
430 self.export_name,
431 self.mode,
432 body,
433 name = NAME_COL,
434 mode = MODE_COL,
435 )
436 }
437
438 fn render(&self) -> String {
450 let mut rows: Vec<Row> = Vec::with_capacity(16);
451 rows.push(("run_id", self.run_id.clone()));
452 rows.push(self.status_row());
453 rows.push(self.tuning_row());
454 rows.push(("rows", fmt_thousands(self.total_rows)));
455 rows.push(("files", fmt_thousands(self.files_produced as i64)));
456 rows.extend(self.output_row());
457 rows.extend(self.position_row());
458 rows.extend(self.bytes_row());
459 rows.push(("duration", fmt_duration_ms(self.duration_ms)));
460 rows.extend(self.peak_rss_row());
461 rows.extend(self.pg_temp_spill_row());
462 rows.extend(self.compression_row());
463 rows.extend(self.retries_row());
464 rows.extend(self.outcome_rows());
465 rows.extend(self.error_row());
466 format_block(&self.export_name, &rows)
467 }
468
469 fn status_row(&self) -> Row {
471 let value = match (&self.status, &self.skip_reason) {
472 (s, Some(reason)) if s == "skipped" => format!("{s} ({reason})"),
473 (s, _) => s.clone(),
474 };
475 ("status", value)
476 }
477
478 fn tuning_row(&self) -> Row {
481 let value = match self.batch_size_memory_mb {
482 Some(mem) => format!(
483 "profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
484 self.tuning_profile,
485 fmt_thousands(self.batch_size as i64),
486 mem
487 ),
488 None => format!(
489 "profile={}, batch_size={}",
490 self.tuning_profile,
491 fmt_thousands(self.batch_size as i64)
492 ),
493 };
494 ("tuning", value)
495 }
496
497 fn output_row(&self) -> Option<Row> {
501 if self.files_produced > 0 {
502 self.destination_uri.clone().map(|uri| ("output", uri))
503 } else {
504 None
505 }
506 }
507
508 fn position_row(&self) -> Option<Row> {
515 if let Some(pos) = incremental_position_line(self.skip_reason.as_deref()) {
516 Some(("cursor", pos))
517 } else {
518 time_window_skip_line(&self.mode, self.skip_reason.as_deref()).map(|w| ("window", w))
519 }
520 }
521
522 fn bytes_row(&self) -> Option<Row> {
524 if self.bytes_written > 0 {
525 Some(("bytes", format_bytes(self.bytes_written)))
526 } else {
527 None
528 }
529 }
530
531 fn peak_rss_row(&self) -> Option<Row> {
533 if self.peak_rss_mb > 0 {
534 Some((
535 "peak RSS",
536 format!(
537 "{} MB (sampled during run)",
538 fmt_thousands(self.peak_rss_mb)
539 ),
540 ))
541 } else {
542 None
543 }
544 }
545
546 fn pg_temp_spill_row(&self) -> Option<Row> {
550 let temp = self.pg_temp_bytes_delta?;
551 if temp <= 0 {
552 return None;
553 }
554 let temp_mb = temp as f64 / (1024.0 * 1024.0);
555 let label = if temp > 100 * 1024 * 1024 {
556 format!(
557 "{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
558 temp_mb
559 )
560 } else {
561 format!("{:.1} MB", temp_mb)
562 };
563 Some(("pg temp spill", label))
564 }
565
566 fn compression_row(&self) -> Option<Row> {
568 if self.format == "parquet" && self.compression != "zstd" {
569 Some(("compression", self.compression.clone()))
570 } else {
571 None
572 }
573 }
574
575 fn retries_row(&self) -> Option<Row> {
577 if self.retries > 0 {
578 Some(("retries", self.retries.to_string()))
579 } else {
580 None
581 }
582 }
583
584 fn outcome_rows(&self) -> Vec<Row> {
590 let mut rows: Vec<Row> = Vec::new();
591 if let Some(v) = self.validated {
592 rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
593 }
594 if let Some(sc) = self.schema_changed {
595 rows.push((
596 "schema",
597 if sc {
598 "CHANGED".into()
599 } else {
600 "unchanged".into()
601 },
602 ));
603 }
604 if let Some(q) = self.quality_passed {
605 rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
606 }
607 if let Some(reconciled) = self.reconciled {
608 let src = self
609 .source_count
610 .map(fmt_thousands)
611 .unwrap_or_else(|| "?".into());
612 let exported = fmt_thousands(self.total_rows);
613 let value = if reconciled {
614 format!("MATCH ({exported}/{src})")
615 } else {
616 format!("MISMATCH (exported {exported} vs source {src})")
617 };
618 rows.push(("reconcile", value));
619 }
620 if self.status == "success"
625 && self.files_produced > 0
626 && self.validated.is_none()
627 && self.reconciled.is_none()
628 {
629 rows.push((
630 "verify",
631 "not run — add `--reconcile` (count vs source) or `rivet validate` (re-read outputs)"
632 .into(),
633 ));
634 }
635 rows
636 }
637
638 fn error_row(&self) -> Option<Row> {
644 self.error_message
645 .as_ref()
646 .map(|err| ("error", err.trim_end().to_string()))
647 }
648
649 pub fn check_post_run_invariants(&self) -> Result<(), String> {
665 let parts_bytes: u64 = self.manifest_parts.iter().map(|p| p.size_bytes).sum();
666
667 if self.files_committed > self.manifest_parts.len() {
668 return Err(format!(
669 "summary.files_committed ({}) > manifest_parts.len() ({}) — \
670 a runner bumped files_committed without commit::record_part",
671 self.files_committed,
672 self.manifest_parts.len()
673 ));
674 }
675 if self.files_produced > self.manifest_parts.len() {
676 return Err(format!(
677 "summary.files_produced ({}) > manifest_parts.len() ({}) — \
678 a runner bumped files_produced without commit::record_part",
679 self.files_produced,
680 self.manifest_parts.len()
681 ));
682 }
683 if self.bytes_written > parts_bytes {
684 return Err(format!(
685 "summary.bytes_written ({}) > sum(manifest_parts.size_bytes) ({}) — \
686 a runner bumped bytes_written without commit::record_part",
687 self.bytes_written, parts_bytes
688 ));
689 }
690 if self.status == "success" && self.files_committed > 0 && self.manifest_parts.is_empty() {
691 return Err(format!(
692 "success run with files_committed={} has empty manifest_parts — \
693 cloud manifest (ADR-0012 M1) would ship with no part list \
694 (this is the gap parallel_checkpoint had before commit e9b0796)",
695 self.files_committed
696 ));
697 }
698 if self.status == "success" && self.total_rows > 0 && self.files_committed == 0 {
710 return Err(format!(
711 "summary.total_rows={} but files_committed=0 — rows extracted from \
712 source but no files committed (no output reached the destination)",
713 self.total_rows
714 ));
715 }
716 Ok(())
717 }
718}
719
720fn compact_error(raw: &str) -> String {
732 const MAX_CHARS: usize = 240;
733 if let Some(summary) = summarize_parallel_chunk_errors(raw) {
734 return clamp_chars(&summary, MAX_CHARS);
735 }
736 let collapsed: String = raw
737 .lines()
738 .map(str::trim_end)
739 .filter(|s| !s.is_empty())
740 .collect::<Vec<_>>()
741 .join("; ");
742 clamp_chars(&collapsed, MAX_CHARS)
743}
744
745fn incremental_position_line(skip_reason: Option<&str>) -> Option<String> {
753 let col = skip_reason?
754 .strip_prefix("no new rows since cursor '")?
755 .strip_suffix('\'')?;
756 Some(format!("'{col}' unchanged (no new rows this run)"))
757}
758
759fn time_window_skip_line(mode: &str, skip_reason: Option<&str>) -> Option<String> {
776 skip_reason?;
777 if mode != "timewindow" {
778 return None;
779 }
780 Some("rolling time window matched no rows — check `time_column`/`days_window`".to_string())
781}
782
783fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
784 let header_pos = raw.find("parallel checkpoint worker errors:")?;
785 let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
786 let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
787
788 let chunk_lines: Vec<&str> = tail
789 .lines()
790 .map(str::trim)
791 .filter(|l| l.starts_with("chunk "))
792 .collect();
793 if chunk_lines.is_empty() {
794 return None;
795 }
796 let first_chunk_full = chunk_lines[0];
797 let first_chunk_short = clamp_chars(first_chunk_full, 140);
799 let prefix = if prefix.is_empty() {
800 String::new()
801 } else {
802 format!("{}: ", prefix)
803 };
804 Some(format!(
805 "{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
806 prefix,
807 chunk_lines.len(),
808 first_chunk_short
809 ))
810}
811
812fn clamp_chars(s: &str, max_chars: usize) -> String {
813 if max_chars == 0 {
814 return String::new();
815 }
816 if s.chars().count() <= max_chars {
817 return s.to_string();
818 }
819 let keep = max_chars.saturating_sub(1);
820 let mut out: String = s.chars().take(keep).collect();
821 out.push('…');
822 out
823}
824
825fn format_block(name: &str, rows: &[(&str, String)]) -> String {
828 const HEADER_WIDTH: usize = 60;
829 let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
830
831 let prefix = format!("── {} ", name);
832 let prefix_chars = prefix.chars().count();
833 let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
834 let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
835 out.push('\n');
836 out.push_str(&prefix);
837 for _ in 0..dashes {
838 out.push('─');
839 }
840 out.push('\n');
841 let value_indent = " ".repeat(2 + (label_w + 1) + 2);
845 for (label, value) in rows {
846 let mut lines = value.split('\n');
849 let first = lines.next().unwrap_or("");
850 out.push_str(&format!(
851 " {:<width$} {}\n",
852 format!("{label}:"),
853 first,
854 width = label_w + 1
855 ));
856 for cont in lines {
857 out.push_str(&value_indent);
858 out.push_str(cont);
859 out.push('\n');
860 }
861 }
862 out
863}
864
865fn fmt_duration_ms(ms: i64) -> String {
866 if ms < 1000 {
867 return format!("{}ms", ms);
868 }
869 let total_secs = ms / 1000;
870 let h = total_secs / 3600;
871 let m = (total_secs % 3600) / 60;
872 let s_frac = (ms % 60_000) as f64 / 1000.0;
873 if h > 0 {
874 format!("{}h {:02}m {:04.1}s", h, m, s_frac)
875 } else if m > 0 {
876 format!("{}m {:04.1}s", m, s_frac)
877 } else {
878 format!("{:.1}s", ms as f64 / 1000.0)
879 }
880}
881
882fn fmt_thousands(n: i64) -> String {
886 let abs = n.unsigned_abs();
887 let s = abs.to_string();
888 let bytes = s.as_bytes();
889 let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
890 if n < 0 {
891 out.push('-');
892 }
893 for (i, b) in bytes.iter().enumerate() {
894 let from_end = bytes.len() - i;
895 if i > 0 && from_end.is_multiple_of(3) {
896 out.push(',');
897 }
898 out.push(*b as char);
899 }
900 out
901}
902
903#[cfg(test)]
904mod tests {
905 use super::*;
906
907 #[test]
908 fn fmt_thousands_handles_small_and_large() {
909 assert_eq!(fmt_thousands(0), "0");
910 assert_eq!(fmt_thousands(7), "7");
911 assert_eq!(fmt_thousands(999), "999");
912 assert_eq!(fmt_thousands(1_000), "1,000");
913 assert_eq!(fmt_thousands(1_000_908), "1,000,908");
914 assert_eq!(fmt_thousands(39_990_376), "39,990,376");
915 assert_eq!(fmt_thousands(-1_234), "-1,234");
916 assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
917 }
918
919 #[test]
920 fn fmt_duration_picks_unit() {
921 assert_eq!(fmt_duration_ms(0), "0ms");
922 assert_eq!(fmt_duration_ms(800), "800ms");
923 assert_eq!(fmt_duration_ms(1_500), "1.5s");
924 assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
925 assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
926 }
927
928 #[test]
929 fn format_block_pads_labels_uniformly() {
930 let rows = vec![
931 ("run_id", "abc".to_string()),
932 ("rows", "42".to_string()),
933 ("compression", "zstd".to_string()),
934 ];
935 let out = format_block("orders", &rows);
936
937 let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
939 assert_eq!(lines.len(), 3);
940 let value_starts: Vec<usize> = lines
941 .iter()
942 .map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
943 .collect();
944 let value_col = lines[0].rfind("abc").unwrap();
948 assert_eq!(lines[1].rfind("42").unwrap(), value_col);
949 assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
950 let _ = value_starts;
952 }
953
954 #[test]
955 fn format_block_header_has_consistent_width() {
956 let block_a = format_block("a", &[("rows", "1".into())]);
957 let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
958 let header_a = block_a.lines().nth(1).unwrap();
959 let header_b = block_b.lines().nth(1).unwrap();
960 assert_eq!(
961 header_a.chars().count(),
962 header_b.chars().count(),
963 "headers must be the same width regardless of name length: {:?} vs {:?}",
964 header_a,
965 header_b
966 );
967 }
968
969 #[test]
970 fn render_produces_a_single_string_with_trailing_newline() {
971 use crate::plan::{
972 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
973 MetaColumns, ResolvedRunPlan,
974 };
975 use crate::tuning::SourceTuning;
976 let plan = ResolvedRunPlan {
977 export_name: "orders".into(),
978 base_query: "SELECT 1".into(),
979 strategy: ExtractionStrategy::Snapshot,
980 format: FormatType::Parquet,
981 compression: CompressionType::default(),
982 compression_level: None,
983 max_file_size_bytes: None,
984 skip_empty: false,
985 meta_columns: MetaColumns::default(),
986 destination: DestinationConfig {
987 destination_type: DestinationType::Local,
988 path: Some("./out".into()),
989 ..Default::default()
990 },
991 quality: None,
992 tuning: SourceTuning::from_config(None),
993 tuning_profile_label: "balanced (default)".into(),
994 validate: false,
995 reconcile: false,
996 resume: false,
997 source: crate::config::SourceConfig {
998 source_type: crate::config::SourceType::Postgres,
999 url: Some("postgresql://localhost/test".into()),
1000 url_env: None,
1001 url_file: None,
1002 host: None,
1003 port: None,
1004 user: None,
1005 password: None,
1006 password_env: None,
1007 database: None,
1008 environment: None,
1009 tuning: None,
1010 tls: None,
1011 },
1012 column_overrides: Default::default(),
1013 verify: crate::config::VerifyMode::Size,
1014 schema_drift_policy: Default::default(),
1015 shape_drift_warn_factor: 2.0,
1016 parquet: None,
1017 };
1018 let mut s = RunSummary::new(&plan);
1019 s.status = "success".into();
1020 s.total_rows = 1_000_908;
1021 s.files_produced = 11;
1022 s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
1023 s.duration_ms = 68_400;
1024 s.peak_rss_mb = 884;
1025
1026 let block = s.render();
1027 assert!(
1028 block.starts_with('\n'),
1029 "block should start with a blank line"
1030 );
1031 assert!(block.ends_with('\n'), "block should end with a newline");
1032 assert!(block.contains("── orders "));
1033 assert!(
1034 block.contains("1,000,908"),
1035 "rows should be formatted with thousands separator: {}",
1036 block
1037 );
1038 assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
1039 assert!(!block.contains('\r'));
1042
1043 let line = s.render_compact();
1045 assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
1046 assert!(line.contains("orders"), "export name present: {:?}", line);
1047 assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
1048 assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
1049 assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
1050 assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
1051 assert!(!line.contains('\n'), "single line: {:?}", line);
1052 }
1053
1054 #[test]
1055 fn compact_error_summarises_parallel_chunk_errors() {
1056 let raw = "export 'page_views': parallel checkpoint worker errors:\n\
1057 chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
1058 chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
1059 let out = compact_error(raw);
1060 assert!(
1061 out.contains("2 chunk(s)"),
1062 "should report number of failed chunks: {:?}",
1063 out
1064 );
1065 assert!(
1066 out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
1067 "should keep export prefix and use compact phrasing: {:?}",
1068 out
1069 );
1070 assert!(
1071 out.contains("chunk 4:"),
1072 "should include the first chunk as an example: {:?}",
1073 out
1074 );
1075 assert!(!out.contains('\n'), "single line output: {:?}", out);
1076 assert!(
1077 out.chars().count() <= 240,
1078 "must be clamped to <=240 chars, got {}: {:?}",
1079 out.chars().count(),
1080 out
1081 );
1082 }
1083
1084 #[test]
1085 fn compact_error_collapses_generic_multiline() {
1086 let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
1087 let out = compact_error(raw);
1088 assert_eq!(
1089 out, "first line of trouble; second line with detail; third line",
1090 "newlines should collapse to '; ' and blanks dropped"
1091 );
1092 }
1093
1094 #[test]
1095 fn compact_error_clamps_excessively_long_lines() {
1096 let raw = "x".repeat(1_000);
1097 let out = compact_error(&raw);
1098 assert_eq!(out.chars().count(), 240);
1099 assert!(out.ends_with('…'));
1100 }
1101
1102 #[test]
1103 fn render_compact_strips_chunked_recovery_hint_for_failed() {
1104 use crate::plan::{
1105 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1106 MetaColumns, ResolvedRunPlan,
1107 };
1108 use crate::tuning::SourceTuning;
1109 let plan = ResolvedRunPlan {
1110 export_name: "events".into(),
1111 base_query: "SELECT 1".into(),
1112 strategy: ExtractionStrategy::Snapshot,
1113 format: FormatType::Parquet,
1114 compression: CompressionType::default(),
1115 compression_level: None,
1116 max_file_size_bytes: None,
1117 skip_empty: false,
1118 meta_columns: MetaColumns::default(),
1119 destination: DestinationConfig {
1120 destination_type: DestinationType::Local,
1121 path: Some("./out".into()),
1122 ..Default::default()
1123 },
1124 quality: None,
1125 tuning: SourceTuning::from_config(None),
1126 tuning_profile_label: "balanced (default)".into(),
1127 validate: false,
1128 reconcile: false,
1129 resume: false,
1130 source: crate::config::SourceConfig {
1131 source_type: crate::config::SourceType::Postgres,
1132 url: Some("postgresql://localhost/test".into()),
1133 url_env: None,
1134 url_file: None,
1135 host: None,
1136 port: None,
1137 user: None,
1138 password: None,
1139 password_env: None,
1140 database: None,
1141 environment: None,
1142 tuning: None,
1143 tls: None,
1144 },
1145 column_overrides: Default::default(),
1146 verify: crate::config::VerifyMode::Size,
1147 schema_drift_policy: Default::default(),
1148 shape_drift_warn_factor: 2.0,
1149 parquet: None,
1150 };
1151 let mut s = RunSummary::new(&plan);
1152 s.status = "failed".into();
1153 s.error_message = Some(
1154 "export 'events': --resume but no in-progress chunk checkpoint; \
1155 run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
1156 .to_string(),
1157 );
1158
1159 let line = s.render_compact();
1160 assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
1161 assert!(line.contains("events"), "name present: {:?}", line);
1162 assert!(
1163 line.contains("--resume but no in-progress chunk checkpoint"),
1164 "cause kept: {:?}",
1165 line
1166 );
1167 assert!(
1168 !line.contains("rivet state reset-chunks"),
1169 "recovery hint should be stripped from per-export line: {:?}",
1170 line
1171 );
1172 assert!(!line.contains('\n'), "single line: {:?}", line);
1173 }
1174
1175 fn plan_for(export_name: &str) -> crate::plan::ResolvedRunPlan {
1176 use crate::plan::{
1177 CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1178 MetaColumns, ResolvedRunPlan,
1179 };
1180 use crate::tuning::SourceTuning;
1181 ResolvedRunPlan {
1182 export_name: export_name.into(),
1183 base_query: "SELECT 1".into(),
1184 strategy: ExtractionStrategy::Snapshot,
1185 format: FormatType::Parquet,
1186 compression: CompressionType::default(),
1187 compression_level: None,
1188 max_file_size_bytes: None,
1189 skip_empty: false,
1190 meta_columns: MetaColumns::default(),
1191 destination: DestinationConfig {
1192 destination_type: DestinationType::Local,
1193 path: Some("./out".into()),
1194 ..Default::default()
1195 },
1196 quality: None,
1197 tuning: SourceTuning::from_config(None),
1198 tuning_profile_label: "balanced (default)".into(),
1199 validate: false,
1200 reconcile: false,
1201 resume: false,
1202 source: crate::config::SourceConfig {
1203 source_type: crate::config::SourceType::Postgres,
1204 url: Some("postgresql://localhost/test".into()),
1205 url_env: None,
1206 url_file: None,
1207 host: None,
1208 port: None,
1209 user: None,
1210 password: None,
1211 password_env: None,
1212 database: None,
1213 environment: None,
1214 tuning: None,
1215 tls: None,
1216 },
1217 column_overrides: Default::default(),
1218 verify: crate::config::VerifyMode::Size,
1219 schema_drift_policy: Default::default(),
1220 shape_drift_warn_factor: 2.0,
1221 parquet: None,
1222 }
1223 }
1224
1225 #[test]
1226 fn render_preserves_multiline_error_block() {
1227 let mut s = RunSummary::new(&plan_for("orders"));
1231 s.status = "failed".into();
1232 s.error_message = Some(
1233 "export 'orders': 1 quality check(s) failed:\n \
1234 - row_count 10 below minimum 999999\n \
1235 Fix the source data, or adjust the thresholds under `quality:` in your config."
1236 .to_string(),
1237 );
1238
1239 let block = s.render();
1240 assert!(
1243 !block.contains("failed:;"),
1244 "error must not be '; '-flattened in the detailed block: {block}"
1245 );
1246 assert!(
1247 block.contains("- row_count 10 below minimum 999999"),
1248 "failing check line present: {block}"
1249 );
1250 let err_lines: Vec<&str> = block
1252 .lines()
1253 .filter(|l| {
1254 l.contains("quality check(s) failed")
1255 || l.contains("row_count 10 below minimum")
1256 || l.contains("Fix the source data")
1257 })
1258 .collect();
1259 assert_eq!(
1260 err_lines.len(),
1261 3,
1262 "all three error lines should render on separate lines: {block}"
1263 );
1264 for l in &err_lines {
1266 assert!(l.starts_with(' '), "error line should be indented: {l:?}");
1267 }
1268 }
1269
1270 #[test]
1271 fn render_nudges_verification_when_unverified_success() {
1272 let mut s = RunSummary::new(&plan_for("orders"));
1275 s.status = "success".into();
1276 s.files_produced = 3;
1277 s.total_rows = 1_000;
1278 let block = s.render();
1280 assert!(
1281 block.lines().any(|l| l.trim_start().starts_with("verify:")),
1282 "expected a verify nudge on an unverified success: {block}"
1283 );
1284
1285 let mut s2 = RunSummary::new(&plan_for("orders"));
1287 s2.status = "success".into();
1288 s2.files_produced = 3;
1289 s2.validated = Some(true);
1290 let block2 = s2.render();
1291 assert!(
1292 !block2
1293 .lines()
1294 .any(|l| l.trim_start().starts_with("verify:")),
1295 "a verified run must not nudge: {block2}"
1296 );
1297 }
1298
1299 #[test]
1300 fn pg_temp_spill_row_only_for_real_spill_and_annotates_large() {
1301 let mut s = RunSummary::stub_for_testing("r", "orders");
1304 assert_eq!(s.pg_temp_spill_row(), None, "no delta → no row");
1305 s.pg_temp_bytes_delta = Some(0);
1306 assert_eq!(s.pg_temp_spill_row(), None, "zero spill → no row");
1307 s.pg_temp_bytes_delta = Some(-5);
1308 assert_eq!(s.pg_temp_spill_row(), None, "negative delta → no row");
1309
1310 s.pg_temp_bytes_delta = Some(50 * 1024 * 1024);
1311 let (label, value) = s.pg_temp_spill_row().expect("50MB spill → row");
1312 assert_eq!(label, "pg temp spill");
1313 assert!(
1314 value.contains("50.0 MB") && !value.contains('⚠'),
1315 "small spill is plain info: {value:?}"
1316 );
1317
1318 s.pg_temp_bytes_delta = Some(200 * 1024 * 1024);
1319 let (_, value) = s.pg_temp_spill_row().expect("200MB spill → row");
1320 assert!(
1321 value.contains('⚠') && value.contains("batch_size"),
1322 "spill over 100 MB carries the tuning hint: {value:?}"
1323 );
1324 }
1325
1326 #[test]
1327 fn outcome_rows_format_reconcile_and_suppress_nudge_when_checked() {
1328 let mut s = RunSummary::stub_for_testing("r", "orders");
1329 s.reconciled = Some(true);
1330 s.source_count = Some(1_000);
1331 s.total_rows = 1_000;
1332 assert!(
1333 s.outcome_rows()
1334 .iter()
1335 .any(|(l, v)| *l == "reconcile" && v == "MATCH (1,000/1,000)"),
1336 "match wording: {:?}",
1337 s.outcome_rows()
1338 );
1339
1340 s.reconciled = Some(false);
1341 s.source_count = Some(1_200);
1342 let rows = s.outcome_rows();
1343 let recon = rows
1344 .iter()
1345 .find(|(l, _)| *l == "reconcile")
1346 .expect("reconcile row");
1347 assert!(
1348 recon.1.contains("MISMATCH") && recon.1.contains("1,000") && recon.1.contains("1,200"),
1349 "mismatch names both sides: {:?}",
1350 recon
1351 );
1352
1353 s.status = "success".into();
1356 s.files_produced = 2;
1357 assert!(
1358 !s.outcome_rows().iter().any(|(l, _)| *l == "verify"),
1359 "a reconciled run must not also nudge"
1360 );
1361 }
1362
1363 #[test]
1364 fn render_surfaces_cursor_position_on_zero_new_incremental() {
1365 let mut s = RunSummary::new(&plan_for("orders"));
1369 s.status = "skipped".into();
1370 s.skip_reason = Some("no new rows since cursor 'updated_at'".into());
1371
1372 let block = s.render();
1373 let cursor_line = block
1374 .lines()
1375 .find(|l| l.trim_start().starts_with("cursor:"))
1376 .unwrap_or_else(|| panic!("expected a cursor: line in block: {block}"));
1377 assert!(
1378 cursor_line.contains("'updated_at'"),
1379 "cursor line names the column: {cursor_line:?}"
1380 );
1381 assert!(
1382 cursor_line.contains("unchanged"),
1383 "cursor line reports the position held: {cursor_line:?}"
1384 );
1385 }
1386
1387 #[test]
1388 fn incremental_position_line_only_for_cursor_skips() {
1389 assert_eq!(
1391 incremental_position_line(Some("no new rows since cursor 'ts'")),
1392 Some("'ts' unchanged (no new rows this run)".into())
1393 );
1394 assert_eq!(
1395 incremental_position_line(Some("source returned 0 rows")),
1396 None
1397 );
1398 assert_eq!(incremental_position_line(None), None);
1399 }
1400
1401 #[test]
1402 fn render_surfaces_window_position_on_zero_row_time_window() {
1403 let mut s = RunSummary::new(&plan_for("events"));
1409 s.status = "skipped".into();
1410 s.mode = "timewindow".into();
1411 s.skip_reason = Some("source returned 0 rows".into());
1412
1413 let block = s.render();
1414 let window_line = block
1415 .lines()
1416 .find(|l| l.trim_start().starts_with("window:"))
1417 .unwrap_or_else(|| panic!("expected a window: line in block: {block}"));
1418 assert!(
1419 window_line.contains("matched no rows"),
1420 "window line reports the empty window: {window_line:?}"
1421 );
1422 assert!(
1423 window_line.contains("time_column") && window_line.contains("days_window"),
1424 "window line points at the window config to check: {window_line:?}"
1425 );
1426 assert!(
1428 !block.lines().any(|l| l.trim_start().starts_with("cursor:")),
1429 "no cursor line for a non-cursor strategy: {block}"
1430 );
1431 }
1432
1433 #[test]
1434 fn time_window_skip_line_only_for_skipped_time_window() {
1435 assert_eq!(
1437 time_window_skip_line("timewindow", Some("source returned 0 rows")),
1438 Some("rolling time window matched no rows — check `time_column`/`days_window`".into())
1439 );
1440 assert_eq!(
1442 time_window_skip_line("incremental", Some("source returned 0 rows")),
1443 None
1444 );
1445 assert_eq!(
1446 time_window_skip_line("full", Some("source returned 0 rows")),
1447 None
1448 );
1449 assert_eq!(time_window_skip_line("timewindow", None), None);
1451 }
1452}