use super::ipc::{self, ChildEvent};
use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
use crate::manifest::ManifestPart;
use crate::plan::ResolvedRunPlan;
fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
PlanSnapshot {
export_name: plan.export_name.clone(),
base_query: plan.base_query.clone(),
strategy: plan.strategy.mode_label().to_string(),
format: plan.format.label().to_string(),
compression: plan.compression.label().to_string(),
destination_type: plan.destination.destination_type.label().to_string(),
tuning_profile: plan.tuning_profile_label.clone(),
batch_size: plan.tuning.batch_size,
validate: plan.validate,
reconcile: plan.reconcile,
resume: plan.resume,
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ApplyContext {
pub plan_id: String,
pub forced: bool,
pub force_bypassed: Vec<String>,
}
#[derive(Debug, Clone, Default)]
pub struct RunSummary {
pub run_id: String,
pub export_name: String,
pub status: String,
pub total_rows: i64,
pub files_produced: usize,
pub bytes_written: u64,
pub files_committed: usize,
pub duration_ms: i64,
pub peak_rss_mb: i64,
pub retries: u32,
pub validated: Option<bool>,
pub schema_changed: Option<bool>,
pub quality_passed: Option<bool>,
pub column_checksums: Vec<crate::manifest::ColumnChecksum>,
pub checksum_key_column: Option<String>,
pub error_message: Option<String>,
pub tuning_profile: String,
pub batch_size: usize,
pub batch_size_memory_mb: Option<usize>,
pub format: String,
pub mode: String,
pub compression: String,
pub destination_uri: Option<String>,
pub pg_temp_bytes_delta: Option<i64>,
pub skip_reason: Option<String>,
pub source_count: Option<i64>,
pub reconciled: Option<bool>,
pub manifest_parts: Vec<ManifestPart>,
pub schema_fingerprint: Option<String>,
pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
pub apply_context: Option<ApplyContext>,
pub journal: RunJournal,
}
type Row = (&'static str, String);
impl RunSummary {
pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
let run_id = format!(
"{}_{}",
plan.export_name,
chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
);
let mut journal = RunJournal::new(&run_id, &plan.export_name);
journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
ipc::emit_event(&ChildEvent::Started {
export_name: plan.export_name.clone(),
run_id: run_id.clone(),
mode: plan.strategy.mode_label().to_string(),
tuning_profile: plan.tuning_profile_label.clone(),
batch_size: plan.tuning.batch_size,
});
Self {
run_id,
export_name: plan.export_name.clone(),
status: "running".into(),
total_rows: 0,
files_produced: 0,
bytes_written: 0,
files_committed: 0,
duration_ms: 0,
peak_rss_mb: 0,
retries: 0,
validated: None,
schema_changed: None,
quality_passed: None,
error_message: None,
tuning_profile: plan.tuning_profile_label.clone(),
batch_size: plan.tuning.batch_size,
batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
format: plan.format.label().to_string(),
mode: plan.strategy.mode_label().to_string(),
compression: plan.compression.label().to_string(),
destination_uri: (!matches!(
plan.destination.destination_type,
crate::config::DestinationType::Stdout
))
.then(|| crate::pipeline::finalize::destination_uri_for_manifest(&plan.destination)),
pg_temp_bytes_delta: None,
skip_reason: None,
source_count: None,
reconciled: None,
manifest_parts: Vec::new(),
schema_fingerprint: None,
manifest_verification: None,
apply_context: None,
column_checksums: Vec::new(),
checksum_key_column: None,
journal,
}
}
#[doc(hidden)]
#[allow(dead_code)]
pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
let run_id = run_id.into();
let export_name = export_name.into();
let journal = RunJournal::new(&run_id, &export_name);
Self {
run_id,
export_name,
status: "running".into(),
total_rows: 0,
files_produced: 0,
bytes_written: 0,
files_committed: 0,
duration_ms: 0,
peak_rss_mb: 0,
retries: 0,
validated: None,
schema_changed: None,
quality_passed: None,
error_message: None,
tuning_profile: "balanced".into(),
batch_size: 1000,
batch_size_memory_mb: None,
format: "parquet".into(),
mode: "snapshot".into(),
compression: "zstd".into(),
destination_uri: None,
pg_temp_bytes_delta: None,
skip_reason: None,
source_count: None,
reconciled: None,
manifest_parts: Vec::new(),
schema_fingerprint: None,
manifest_verification: None,
apply_context: None,
column_checksums: Vec::new(),
checksum_key_column: None,
journal,
}
}
#[doc(hidden)]
#[allow(dead_code)]
pub fn with_status(mut self, status: impl Into<String>) -> Self {
let s = status.into();
if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
self.journal.record(RunEvent::RunCompleted {
status: s.clone(),
error_message: self.error_message.clone(),
duration_ms: self.duration_ms,
});
}
self.status = s;
self
}
#[doc(hidden)]
#[allow(dead_code)]
pub fn with_files_committed(mut self, n: usize) -> Self {
self.files_committed = n;
self
}
#[doc(hidden)]
#[allow(dead_code)]
pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
self.total_rows = parts.iter().map(|p| p.rows).sum();
self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
self.files_produced = parts.len();
self.files_committed = parts.len();
self.manifest_parts = parts;
self
}
#[doc(hidden)]
#[allow(dead_code)]
pub fn with_error(mut self, msg: impl Into<String>) -> Self {
self.error_message = Some(msg.into());
self
}
#[doc(hidden)]
#[allow(dead_code)]
pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
self.journal.record(RunEvent::PlanResolved(snap));
self
}
pub(super) fn print(&self) {
if ipc::capturing_events() {
ipc::emit_event(&ChildEvent::Finished {
export_name: self.export_name.clone(),
run_id: self.run_id.clone(),
status: self.status.clone(),
total_rows: self.total_rows,
files_produced: self.files_produced as u64,
bytes_written: self.bytes_written,
duration_ms: self.duration_ms,
peak_rss_mb: self.peak_rss_mb,
error_message: self.error_message.clone(),
});
return;
}
self.print_stderr_block();
}
pub(super) fn print_stderr_block(&self) {
let block = if multi_export_mode() {
self.render_compact()
} else {
self.render().trim_end_matches('\n').to_string()
};
use std::io::Write;
let mut buf = super::parent_ui::sanitize_terminal(&block);
buf.push('\n');
let stderr = std::io::stderr();
let mut handle = stderr.lock();
let _ = handle.write_all(buf.as_bytes());
let _ = handle.flush();
}
fn render_compact(&self) -> String {
const NAME_COL: usize = 22;
const MODE_COL: usize = 8;
let icon = match self.status.as_str() {
"success" => "✓",
"failed" => "✗",
_ => "•",
};
let body = if self.status == "failed" {
let err = self
.error_message
.as_deref()
.unwrap_or("(no error message recorded)");
let (cause, _) = strip_chunked_recovery_hint(err);
compact_error(cause)
} else {
let rss = if self.peak_rss_mb > 0 {
format!(" RSS {} MB", fmt_thousands(self.peak_rss_mb))
} else {
String::new()
};
format!(
"{} rows {} files {} {}{}",
fmt_thousands(self.total_rows),
fmt_thousands(self.files_produced as i64),
format_bytes(self.bytes_written),
fmt_duration_ms(self.duration_ms),
rss
)
};
format!(
"{} {:<name$} {:<mode$} {}",
icon,
self.export_name,
self.mode,
body,
name = NAME_COL,
mode = MODE_COL,
)
}
fn render(&self) -> String {
let mut rows: Vec<Row> = Vec::with_capacity(16);
rows.push(("run_id", self.run_id.clone()));
rows.push(self.status_row());
rows.push(self.tuning_row());
rows.push(("rows", fmt_thousands(self.total_rows)));
rows.push(("files", fmt_thousands(self.files_produced as i64)));
rows.extend(self.output_row());
rows.extend(self.position_row());
rows.extend(self.bytes_row());
rows.push(("duration", fmt_duration_ms(self.duration_ms)));
rows.extend(self.peak_rss_row());
rows.extend(self.pg_temp_spill_row());
rows.extend(self.compression_row());
rows.extend(self.retries_row());
rows.extend(self.outcome_rows());
rows.extend(self.error_row());
format_block(&self.export_name, &rows)
}
fn status_row(&self) -> Row {
let value = match (&self.status, &self.skip_reason) {
(s, Some(reason)) if s == "skipped" => format!("{s} ({reason})"),
(s, _) => s.clone(),
};
("status", value)
}
fn tuning_row(&self) -> Row {
let value = match self.batch_size_memory_mb {
Some(mem) => format!(
"profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
self.tuning_profile,
fmt_thousands(self.batch_size as i64),
mem
),
None => format!(
"profile={}, batch_size={}",
self.tuning_profile,
fmt_thousands(self.batch_size as i64)
),
};
("tuning", value)
}
fn output_row(&self) -> Option<Row> {
if self.files_produced > 0 {
self.destination_uri.clone().map(|uri| ("output", uri))
} else {
None
}
}
fn position_row(&self) -> Option<Row> {
if let Some(pos) = incremental_position_line(self.skip_reason.as_deref()) {
Some(("cursor", pos))
} else {
time_window_skip_line(&self.mode, self.skip_reason.as_deref()).map(|w| ("window", w))
}
}
fn bytes_row(&self) -> Option<Row> {
if self.bytes_written > 0 {
Some(("bytes", format_bytes(self.bytes_written)))
} else {
None
}
}
fn peak_rss_row(&self) -> Option<Row> {
if self.peak_rss_mb > 0 {
Some((
"peak RSS",
format!(
"{} MB (sampled during run)",
fmt_thousands(self.peak_rss_mb)
),
))
} else {
None
}
}
fn pg_temp_spill_row(&self) -> Option<Row> {
let temp = self.pg_temp_bytes_delta?;
if temp <= 0 {
return None;
}
let temp_mb = temp as f64 / (1024.0 * 1024.0);
let label = if temp > 100 * 1024 * 1024 {
format!(
"{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
temp_mb
)
} else {
format!("{:.1} MB", temp_mb)
};
Some(("pg temp spill", label))
}
fn compression_row(&self) -> Option<Row> {
if self.format == "parquet" && self.compression != "zstd" {
Some(("compression", self.compression.clone()))
} else {
None
}
}
fn retries_row(&self) -> Option<Row> {
if self.retries > 0 {
Some(("retries", self.retries.to_string()))
} else {
None
}
}
fn outcome_rows(&self) -> Vec<Row> {
let mut rows: Vec<Row> = Vec::new();
if let Some(v) = self.validated {
rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
}
if let Some(sc) = self.schema_changed {
rows.push((
"schema",
if sc {
"CHANGED".into()
} else {
"unchanged".into()
},
));
}
if let Some(q) = self.quality_passed {
rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
}
if let Some(reconciled) = self.reconciled {
let src = self
.source_count
.map(fmt_thousands)
.unwrap_or_else(|| "?".into());
let exported = fmt_thousands(self.total_rows);
let value = if reconciled {
format!("MATCH ({exported}/{src})")
} else {
format!("MISMATCH (exported {exported} vs source {src})")
};
rows.push(("reconcile", value));
}
if self.status == "success"
&& self.files_produced > 0
&& self.validated.is_none()
&& self.reconciled.is_none()
{
rows.push((
"verify",
"not run — add `--reconcile` (count vs source) or `rivet validate` (re-read outputs)"
.into(),
));
}
rows
}
fn error_row(&self) -> Option<Row> {
self.error_message
.as_ref()
.map(|err| ("error", err.trim_end().to_string()))
}
pub fn check_post_run_invariants(&self) -> Result<(), String> {
let parts_bytes: u64 = self.manifest_parts.iter().map(|p| p.size_bytes).sum();
if self.files_committed > self.manifest_parts.len() {
return Err(format!(
"summary.files_committed ({}) > manifest_parts.len() ({}) — \
a runner bumped files_committed without commit::record_part",
self.files_committed,
self.manifest_parts.len()
));
}
if self.files_produced > self.manifest_parts.len() {
return Err(format!(
"summary.files_produced ({}) > manifest_parts.len() ({}) — \
a runner bumped files_produced without commit::record_part",
self.files_produced,
self.manifest_parts.len()
));
}
if self.bytes_written > parts_bytes {
return Err(format!(
"summary.bytes_written ({}) > sum(manifest_parts.size_bytes) ({}) — \
a runner bumped bytes_written without commit::record_part",
self.bytes_written, parts_bytes
));
}
if self.status == "success" && self.files_committed > 0 && self.manifest_parts.is_empty() {
return Err(format!(
"success run with files_committed={} has empty manifest_parts — \
cloud manifest (ADR-0012 M1) would ship with no part list \
(this is the gap parallel_checkpoint had before commit e9b0796)",
self.files_committed
));
}
if self.status == "success" && self.total_rows > 0 && self.files_committed == 0 {
return Err(format!(
"summary.total_rows={} but files_committed=0 — rows extracted from \
source but no files committed (no output reached the destination)",
self.total_rows
));
}
Ok(())
}
}
fn compact_error(raw: &str) -> String {
const MAX_CHARS: usize = 240;
if let Some(summary) = summarize_parallel_chunk_errors(raw) {
return clamp_chars(&summary, MAX_CHARS);
}
let collapsed: String = raw
.lines()
.map(str::trim_end)
.filter(|s| !s.is_empty())
.collect::<Vec<_>>()
.join("; ");
clamp_chars(&collapsed, MAX_CHARS)
}
fn incremental_position_line(skip_reason: Option<&str>) -> Option<String> {
let col = skip_reason?
.strip_prefix("no new rows since cursor '")?
.strip_suffix('\'')?;
Some(format!("'{col}' unchanged (no new rows this run)"))
}
fn time_window_skip_line(mode: &str, skip_reason: Option<&str>) -> Option<String> {
skip_reason?;
if mode != "timewindow" {
return None;
}
Some("rolling time window matched no rows — check `time_column`/`days_window`".to_string())
}
fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
let header_pos = raw.find("parallel checkpoint worker errors:")?;
let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
let chunk_lines: Vec<&str> = tail
.lines()
.map(str::trim)
.filter(|l| l.starts_with("chunk "))
.collect();
if chunk_lines.is_empty() {
return None;
}
let first_chunk_full = chunk_lines[0];
let first_chunk_short = clamp_chars(first_chunk_full, 140);
let prefix = if prefix.is_empty() {
String::new()
} else {
format!("{}: ", prefix)
};
Some(format!(
"{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
prefix,
chunk_lines.len(),
first_chunk_short
))
}
fn clamp_chars(s: &str, max_chars: usize) -> String {
if max_chars == 0 {
return String::new();
}
if s.chars().count() <= max_chars {
return s.to_string();
}
let keep = max_chars.saturating_sub(1);
let mut out: String = s.chars().take(keep).collect();
out.push('…');
out
}
fn format_block(name: &str, rows: &[(&str, String)]) -> String {
const HEADER_WIDTH: usize = 60;
let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
let prefix = format!("── {} ", name);
let prefix_chars = prefix.chars().count();
let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
out.push('\n');
out.push_str(&prefix);
for _ in 0..dashes {
out.push('─');
}
out.push('\n');
let value_indent = " ".repeat(2 + (label_w + 1) + 2);
for (label, value) in rows {
let mut lines = value.split('\n');
let first = lines.next().unwrap_or("");
out.push_str(&format!(
" {:<width$} {}\n",
format!("{label}:"),
first,
width = label_w + 1
));
for cont in lines {
out.push_str(&value_indent);
out.push_str(cont);
out.push('\n');
}
}
out
}
fn fmt_duration_ms(ms: i64) -> String {
if ms < 1000 {
return format!("{}ms", ms);
}
let total_secs = ms / 1000;
let h = total_secs / 3600;
let m = (total_secs % 3600) / 60;
let s_frac = (ms % 60_000) as f64 / 1000.0;
if h > 0 {
format!("{}h {:02}m {:04.1}s", h, m, s_frac)
} else if m > 0 {
format!("{}m {:04.1}s", m, s_frac)
} else {
format!("{:.1}s", ms as f64 / 1000.0)
}
}
fn fmt_thousands(n: i64) -> String {
let abs = n.unsigned_abs();
let s = abs.to_string();
let bytes = s.as_bytes();
let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
if n < 0 {
out.push('-');
}
for (i, b) in bytes.iter().enumerate() {
let from_end = bytes.len() - i;
if i > 0 && from_end.is_multiple_of(3) {
out.push(',');
}
out.push(*b as char);
}
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fmt_thousands_handles_small_and_large() {
assert_eq!(fmt_thousands(0), "0");
assert_eq!(fmt_thousands(7), "7");
assert_eq!(fmt_thousands(999), "999");
assert_eq!(fmt_thousands(1_000), "1,000");
assert_eq!(fmt_thousands(1_000_908), "1,000,908");
assert_eq!(fmt_thousands(39_990_376), "39,990,376");
assert_eq!(fmt_thousands(-1_234), "-1,234");
assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
}
#[test]
fn fmt_duration_picks_unit() {
assert_eq!(fmt_duration_ms(0), "0ms");
assert_eq!(fmt_duration_ms(800), "800ms");
assert_eq!(fmt_duration_ms(1_500), "1.5s");
assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
}
#[test]
fn format_block_pads_labels_uniformly() {
let rows = vec![
("run_id", "abc".to_string()),
("rows", "42".to_string()),
("compression", "zstd".to_string()),
];
let out = format_block("orders", &rows);
let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
assert_eq!(lines.len(), 3);
let value_starts: Vec<usize> = lines
.iter()
.map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
.collect();
let value_col = lines[0].rfind("abc").unwrap();
assert_eq!(lines[1].rfind("42").unwrap(), value_col);
assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
let _ = value_starts;
}
#[test]
fn format_block_header_has_consistent_width() {
let block_a = format_block("a", &[("rows", "1".into())]);
let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
let header_a = block_a.lines().nth(1).unwrap();
let header_b = block_b.lines().nth(1).unwrap();
assert_eq!(
header_a.chars().count(),
header_b.chars().count(),
"headers must be the same width regardless of name length: {:?} vs {:?}",
header_a,
header_b
);
}
#[test]
fn render_produces_a_single_string_with_trailing_newline() {
use crate::plan::{
CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
MetaColumns, ResolvedRunPlan,
};
use crate::tuning::SourceTuning;
let plan = ResolvedRunPlan {
export_name: "orders".into(),
base_query: "SELECT 1".into(),
strategy: ExtractionStrategy::Snapshot,
format: FormatType::Parquet,
compression: CompressionType::default(),
compression_level: None,
max_file_size_bytes: None,
skip_empty: false,
meta_columns: MetaColumns::default(),
destination: DestinationConfig {
destination_type: DestinationType::Local,
path: Some("./out".into()),
..Default::default()
},
quality: None,
tuning: SourceTuning::from_config(None),
tuning_profile_label: "balanced (default)".into(),
validate: false,
reconcile: false,
resume: false,
source: crate::config::SourceConfig {
source_type: crate::config::SourceType::Postgres,
url: Some("postgresql://localhost/test".into()),
url_env: None,
url_file: None,
host: None,
port: None,
user: None,
password: None,
password_env: None,
database: None,
environment: None,
tuning: None,
tls: None,
},
column_overrides: Default::default(),
verify: crate::config::VerifyMode::Size,
schema_drift_policy: Default::default(),
shape_drift_warn_factor: 2.0,
parquet: None,
};
let mut s = RunSummary::new(&plan);
s.status = "success".into();
s.total_rows = 1_000_908;
s.files_produced = 11;
s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
s.duration_ms = 68_400;
s.peak_rss_mb = 884;
let block = s.render();
assert!(
block.starts_with('\n'),
"block should start with a blank line"
);
assert!(block.ends_with('\n'), "block should end with a newline");
assert!(block.contains("── orders "));
assert!(
block.contains("1,000,908"),
"rows should be formatted with thousands separator: {}",
block
);
assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
assert!(!block.contains('\r'));
let line = s.render_compact();
assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
assert!(line.contains("orders"), "export name present: {:?}", line);
assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
assert!(!line.contains('\n'), "single line: {:?}", line);
}
#[test]
fn compact_error_summarises_parallel_chunk_errors() {
let raw = "export 'page_views': parallel checkpoint worker errors:\n\
chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
let out = compact_error(raw);
assert!(
out.contains("2 chunk(s)"),
"should report number of failed chunks: {:?}",
out
);
assert!(
out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
"should keep export prefix and use compact phrasing: {:?}",
out
);
assert!(
out.contains("chunk 4:"),
"should include the first chunk as an example: {:?}",
out
);
assert!(!out.contains('\n'), "single line output: {:?}", out);
assert!(
out.chars().count() <= 240,
"must be clamped to <=240 chars, got {}: {:?}",
out.chars().count(),
out
);
}
#[test]
fn compact_error_collapses_generic_multiline() {
let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
let out = compact_error(raw);
assert_eq!(
out, "first line of trouble; second line with detail; third line",
"newlines should collapse to '; ' and blanks dropped"
);
}
#[test]
fn compact_error_clamps_excessively_long_lines() {
let raw = "x".repeat(1_000);
let out = compact_error(&raw);
assert_eq!(out.chars().count(), 240);
assert!(out.ends_with('…'));
}
#[test]
fn render_compact_strips_chunked_recovery_hint_for_failed() {
use crate::plan::{
CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
MetaColumns, ResolvedRunPlan,
};
use crate::tuning::SourceTuning;
let plan = ResolvedRunPlan {
export_name: "events".into(),
base_query: "SELECT 1".into(),
strategy: ExtractionStrategy::Snapshot,
format: FormatType::Parquet,
compression: CompressionType::default(),
compression_level: None,
max_file_size_bytes: None,
skip_empty: false,
meta_columns: MetaColumns::default(),
destination: DestinationConfig {
destination_type: DestinationType::Local,
path: Some("./out".into()),
..Default::default()
},
quality: None,
tuning: SourceTuning::from_config(None),
tuning_profile_label: "balanced (default)".into(),
validate: false,
reconcile: false,
resume: false,
source: crate::config::SourceConfig {
source_type: crate::config::SourceType::Postgres,
url: Some("postgresql://localhost/test".into()),
url_env: None,
url_file: None,
host: None,
port: None,
user: None,
password: None,
password_env: None,
database: None,
environment: None,
tuning: None,
tls: None,
},
column_overrides: Default::default(),
verify: crate::config::VerifyMode::Size,
schema_drift_policy: Default::default(),
shape_drift_warn_factor: 2.0,
parquet: None,
};
let mut s = RunSummary::new(&plan);
s.status = "failed".into();
s.error_message = Some(
"export 'events': --resume but no in-progress chunk checkpoint; \
run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
.to_string(),
);
let line = s.render_compact();
assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
assert!(line.contains("events"), "name present: {:?}", line);
assert!(
line.contains("--resume but no in-progress chunk checkpoint"),
"cause kept: {:?}",
line
);
assert!(
!line.contains("rivet state reset-chunks"),
"recovery hint should be stripped from per-export line: {:?}",
line
);
assert!(!line.contains('\n'), "single line: {:?}", line);
}
fn plan_for(export_name: &str) -> crate::plan::ResolvedRunPlan {
use crate::plan::{
CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
MetaColumns, ResolvedRunPlan,
};
use crate::tuning::SourceTuning;
ResolvedRunPlan {
export_name: export_name.into(),
base_query: "SELECT 1".into(),
strategy: ExtractionStrategy::Snapshot,
format: FormatType::Parquet,
compression: CompressionType::default(),
compression_level: None,
max_file_size_bytes: None,
skip_empty: false,
meta_columns: MetaColumns::default(),
destination: DestinationConfig {
destination_type: DestinationType::Local,
path: Some("./out".into()),
..Default::default()
},
quality: None,
tuning: SourceTuning::from_config(None),
tuning_profile_label: "balanced (default)".into(),
validate: false,
reconcile: false,
resume: false,
source: crate::config::SourceConfig {
source_type: crate::config::SourceType::Postgres,
url: Some("postgresql://localhost/test".into()),
url_env: None,
url_file: None,
host: None,
port: None,
user: None,
password: None,
password_env: None,
database: None,
environment: None,
tuning: None,
tls: None,
},
column_overrides: Default::default(),
verify: crate::config::VerifyMode::Size,
schema_drift_policy: Default::default(),
shape_drift_warn_factor: 2.0,
parquet: None,
}
}
#[test]
fn render_preserves_multiline_error_block() {
let mut s = RunSummary::new(&plan_for("orders"));
s.status = "failed".into();
s.error_message = Some(
"export 'orders': 1 quality check(s) failed:\n \
- row_count 10 below minimum 999999\n \
Fix the source data, or adjust the thresholds under `quality:` in your config."
.to_string(),
);
let block = s.render();
assert!(
!block.contains("failed:;"),
"error must not be '; '-flattened in the detailed block: {block}"
);
assert!(
block.contains("- row_count 10 below minimum 999999"),
"failing check line present: {block}"
);
let err_lines: Vec<&str> = block
.lines()
.filter(|l| {
l.contains("quality check(s) failed")
|| l.contains("row_count 10 below minimum")
|| l.contains("Fix the source data")
})
.collect();
assert_eq!(
err_lines.len(),
3,
"all three error lines should render on separate lines: {block}"
);
for l in &err_lines {
assert!(l.starts_with(' '), "error line should be indented: {l:?}");
}
}
#[test]
fn render_nudges_verification_when_unverified_success() {
let mut s = RunSummary::new(&plan_for("orders"));
s.status = "success".into();
s.files_produced = 3;
s.total_rows = 1_000;
let block = s.render();
assert!(
block.lines().any(|l| l.trim_start().starts_with("verify:")),
"expected a verify nudge on an unverified success: {block}"
);
let mut s2 = RunSummary::new(&plan_for("orders"));
s2.status = "success".into();
s2.files_produced = 3;
s2.validated = Some(true);
let block2 = s2.render();
assert!(
!block2
.lines()
.any(|l| l.trim_start().starts_with("verify:")),
"a verified run must not nudge: {block2}"
);
}
#[test]
fn pg_temp_spill_row_only_for_real_spill_and_annotates_large() {
let mut s = RunSummary::stub_for_testing("r", "orders");
assert_eq!(s.pg_temp_spill_row(), None, "no delta → no row");
s.pg_temp_bytes_delta = Some(0);
assert_eq!(s.pg_temp_spill_row(), None, "zero spill → no row");
s.pg_temp_bytes_delta = Some(-5);
assert_eq!(s.pg_temp_spill_row(), None, "negative delta → no row");
s.pg_temp_bytes_delta = Some(50 * 1024 * 1024);
let (label, value) = s.pg_temp_spill_row().expect("50MB spill → row");
assert_eq!(label, "pg temp spill");
assert!(
value.contains("50.0 MB") && !value.contains('⚠'),
"small spill is plain info: {value:?}"
);
s.pg_temp_bytes_delta = Some(200 * 1024 * 1024);
let (_, value) = s.pg_temp_spill_row().expect("200MB spill → row");
assert!(
value.contains('⚠') && value.contains("batch_size"),
"spill over 100 MB carries the tuning hint: {value:?}"
);
}
#[test]
fn outcome_rows_format_reconcile_and_suppress_nudge_when_checked() {
let mut s = RunSummary::stub_for_testing("r", "orders");
s.reconciled = Some(true);
s.source_count = Some(1_000);
s.total_rows = 1_000;
assert!(
s.outcome_rows()
.iter()
.any(|(l, v)| *l == "reconcile" && v == "MATCH (1,000/1,000)"),
"match wording: {:?}",
s.outcome_rows()
);
s.reconciled = Some(false);
s.source_count = Some(1_200);
let rows = s.outcome_rows();
let recon = rows
.iter()
.find(|(l, _)| *l == "reconcile")
.expect("reconcile row");
assert!(
recon.1.contains("MISMATCH") && recon.1.contains("1,000") && recon.1.contains("1,200"),
"mismatch names both sides: {:?}",
recon
);
s.status = "success".into();
s.files_produced = 2;
assert!(
!s.outcome_rows().iter().any(|(l, _)| *l == "verify"),
"a reconciled run must not also nudge"
);
}
#[test]
fn render_surfaces_cursor_position_on_zero_new_incremental() {
let mut s = RunSummary::new(&plan_for("orders"));
s.status = "skipped".into();
s.skip_reason = Some("no new rows since cursor 'updated_at'".into());
let block = s.render();
let cursor_line = block
.lines()
.find(|l| l.trim_start().starts_with("cursor:"))
.unwrap_or_else(|| panic!("expected a cursor: line in block: {block}"));
assert!(
cursor_line.contains("'updated_at'"),
"cursor line names the column: {cursor_line:?}"
);
assert!(
cursor_line.contains("unchanged"),
"cursor line reports the position held: {cursor_line:?}"
);
}
#[test]
fn incremental_position_line_only_for_cursor_skips() {
assert_eq!(
incremental_position_line(Some("no new rows since cursor 'ts'")),
Some("'ts' unchanged (no new rows this run)".into())
);
assert_eq!(
incremental_position_line(Some("source returned 0 rows")),
None
);
assert_eq!(incremental_position_line(None), None);
}
#[test]
fn render_surfaces_window_position_on_zero_row_time_window() {
let mut s = RunSummary::new(&plan_for("events"));
s.status = "skipped".into();
s.mode = "timewindow".into();
s.skip_reason = Some("source returned 0 rows".into());
let block = s.render();
let window_line = block
.lines()
.find(|l| l.trim_start().starts_with("window:"))
.unwrap_or_else(|| panic!("expected a window: line in block: {block}"));
assert!(
window_line.contains("matched no rows"),
"window line reports the empty window: {window_line:?}"
);
assert!(
window_line.contains("time_column") && window_line.contains("days_window"),
"window line points at the window config to check: {window_line:?}"
);
assert!(
!block.lines().any(|l| l.trim_start().starts_with("cursor:")),
"no cursor line for a non-cursor strategy: {block}"
);
}
#[test]
fn time_window_skip_line_only_for_skipped_time_window() {
assert_eq!(
time_window_skip_line("timewindow", Some("source returned 0 rows")),
Some("rolling time window matched no rows — check `time_column`/`days_window`".into())
);
assert_eq!(
time_window_skip_line("incremental", Some("source returned 0 rows")),
None
);
assert_eq!(
time_window_skip_line("full", Some("source returned 0 rows")),
None
);
assert_eq!(time_window_skip_line("timewindow", None), None);
}
}