Skip to main content

rivet/pipeline/
summary.rs

1//! **Layer: Observability**
2//!
3//! `RunSummary` is the single observability artifact for a pipeline run.
4//! It accumulates operational data during execution and is consumed by:
5//! - the end-of-run terminal output (`print`)
6//! - the metrics store (`state::record_metric`)
7//! - the notification system (`notify::maybe_send`)
8//!
9//! `RunSummary` is written to by execution modules (row counts, byte counts, retries)
10//! but it makes no execution decisions itself — it is a pure data accumulator.
11//!
12//! It embeds a `RunJournal` so that all pipeline modules — which already hold
13//! `&mut RunSummary` — can record structured events via `summary.journal.record()`
14//! without any signature changes.  In a future epic the relationship will invert:
15//! `RunSummary` will be derived from `RunJournal`.
16
17use super::ipc::{self, ChildEvent};
18use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
19use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
20use crate::manifest::ManifestPart;
21use crate::plan::ResolvedRunPlan;
22
23/// Build a `PlanSnapshot` from a `ResolvedRunPlan`.
24///
25/// Lives here rather than on `journal` itself so that the journal module
26/// stays free of plan/pipeline dependencies (avoids the state→pipeline cycle
27/// we used to have via `state::journal_store`).
28fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
29    PlanSnapshot {
30        export_name: plan.export_name.clone(),
31        base_query: plan.base_query.clone(),
32        strategy: plan.strategy.mode_label().to_string(),
33        format: plan.format.label().to_string(),
34        compression: plan.compression.label().to_string(),
35        destination_type: plan.destination.destination_type.label().to_string(),
36        tuning_profile: plan.tuning_profile_label.clone(),
37        batch_size: plan.tuning.batch_size,
38        validate: plan.validate,
39        reconcile: plan.reconcile,
40        resume: plan.resume,
41    }
42}
43
44/// Context recorded when a run was launched via `rivet apply` rather than
45/// `rivet run`.  Provides the audit trail for **F5**: which plan artifact
46/// was applied, whether `--force` was passed, and which preflight checks
47/// `--force` actually bypassed.
48///
49/// `None` on `RunSummary` means the run came from `rivet run` (no plan
50/// artifact was applied).
51#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct ApplyContext {
53    /// `plan_id` from the applied `PlanArtifact`.
54    pub plan_id: String,
55    /// Was `--force` passed to `rivet apply`?
56    pub forced: bool,
57    /// Names of preflight checks that `--force` actually overrode for this
58    /// run.  Possible values: `"staleness"`, `"cursor_drift"`.  Empty when
59    /// `forced` is true but no check actually had to be bypassed (the plan
60    /// was fresh and the cursor matched).
61    pub force_bypassed: Vec<String>,
62}
63
64/// Accumulates operational data during a pipeline run for summary and metrics.
65///
66/// The embedded `journal` is the structured event log for this run.  Use
67/// `summary.journal.record(event)` at any call site that already holds
68/// `&mut RunSummary`.
69#[derive(Debug, Clone, Default)]
70pub struct RunSummary {
71    pub run_id: String,
72    pub export_name: String,
73    pub status: String,
74    pub total_rows: i64,
75    pub files_produced: usize,
76    pub bytes_written: u64,
77    /// Incremented after each successful `dest.write()`. Non-zero means a previous
78    /// attempt already committed data — retrying from the same cursor would duplicate rows.
79    pub files_committed: usize,
80    pub duration_ms: i64,
81    pub peak_rss_mb: i64,
82    pub retries: u32,
83    pub validated: Option<bool>,
84    pub schema_changed: Option<bool>,
85    pub quality_passed: Option<bool>,
86    /// Form B per-column value checksums (name-keyed), harvested from the sink;
87    /// recorded in the manifest so `validate` re-reads + verifies. Empty = none.
88    pub column_checksums: Vec<crate::manifest::ColumnChecksum>,
89    /// The column the Form B checksum is keyed to (cursor/key column); `None` = un-keyed.
90    pub checksum_key_column: Option<String>,
91    pub error_message: Option<String>,
92    /// `profile` from YAML, or `balanced (default)` if omitted.
93    pub tuning_profile: String,
94    /// Configured `batch_size` from YAML/profile (FETCH cap before `batch_size_memory_mb` override).
95    pub batch_size: usize,
96    /// When set, actual FETCH size is derived from schema (see logs).
97    pub batch_size_memory_mb: Option<usize>,
98    pub format: String,
99    pub mode: String,
100    pub compression: String,
101    /// Where the files were written, as an operator would type it to find them
102    /// again (`./output`, `s3://bucket/prefix`, …). `None` for stdout or a
103    /// summary built outside the run path (tests). Surfaced on the success card
104    /// so a newcomer isn't left wondering where their data landed.
105    pub destination_uri: Option<String>,
106    /// Postgres `pg_stat_database.temp_bytes` delta around the run. `None` for
107    /// non-Postgres sources or when the snapshot probe failed (no admin perms
108    /// not required — the view is readable by any role). When set and large,
109    /// indicates cursor / sort spill to `pgsql_tmp/` — the safe action is to
110    /// shrink `tuning.batch_size` or set `tuning.batch_size_memory_mb` below
111    /// PG's `work_mem`.
112    pub pg_temp_bytes_delta: Option<i64>,
113    /// Human-readable parenthetical attached to `status: skipped` so the
114    /// operator knows *why* there was nothing to export this run (e.g.
115    /// `"no new rows since cursor 'updated_at'"`). Always `None` when
116    /// `status != "skipped"`. Surfaced in the console summary card as
117    /// `status: skipped (<reason>)`.
118    pub skip_reason: Option<String>,
119    /// Source COUNT(*) result for reconciliation (None = not requested or not applicable).
120    pub source_count: Option<i64>,
121    /// Whether reconciliation passed (Some(true) = match, Some(false) = mismatch, None = skipped).
122    pub reconciled: Option<bool>,
123    /// Committed parts accumulated during the run, in commit order.  Populated by
124    /// `pipeline::manifest_writer::record_committed_part` at each `dest.write`
125    /// site (ADR-0012 M1 — Parts Before Manifest).  Drained at finalize into a
126    /// `RunManifest` by [`crate::pipeline::manifest_writer::write_manifest`].
127    pub manifest_parts: Vec<ManifestPart>,
128    /// xxh3 fingerprint of the dest-facing column schema for this run, in the
129    /// canonical `xxh3:<16-hex>` form produced by [`crate::state::schema_fingerprint`].
130    ///
131    /// Recorded by [`crate::pipeline::manifest_writer::record_run_schema_fingerprint`]
132    /// the first time the sink has resolved a schema (i.e. on the first batch
133    /// of any chunk).  Idempotent within a run — the schema is identical across
134    /// chunks, so later writes are no-ops.
135    ///
136    /// `finalize_manifest` reads this directly so the manifest's
137    /// `schema_fingerprint` no longer depends on the per-export schema row
138    /// happening to land in `state` before the manifest write.  The state
139    /// lookup remains a fallback for resume scenarios where the summary was
140    /// reconstructed without ever seeing a live schema.
141    pub schema_fingerprint: Option<String>,
142    /// Result of the manifest-aware `--validate` pass (ADR-0012 M5/M6,
143    /// ADR-0013).  Populated by `pipeline::job::finalize_validate_manifest`
144    /// after `finalize_manifest` succeeds; `None` when the run targeted a
145    /// streaming destination, when `--validate` was not requested, or when
146    /// the run failed before any manifest could be written.
147    pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
148    /// Apply-time context (plan_id, --force usage, bypassed checks).
149    /// `None` when the run came from `rivet run` rather than `rivet apply`.
150    /// See [`ApplyContext`] and finding **F5** of the 0.7.5 audit.
151    pub apply_context: Option<ApplyContext>,
152    /// Structured event log for this run.  Answers the four DoD observability questions.
153    pub journal: RunJournal,
154}
155
156/// One `(label, value)` line in the rendered summary block. The label is a
157/// fixed string literal; the value is computed per run.
158type Row = (&'static str, String);
159
160impl RunSummary {
161    pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
162        let run_id = format!(
163            "{}_{}",
164            plan.export_name,
165            chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
166        );
167        let mut journal = RunJournal::new(&run_id, &plan.export_name);
168        journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
169
170        ipc::emit_event(&ChildEvent::Started {
171            export_name: plan.export_name.clone(),
172            run_id: run_id.clone(),
173            mode: plan.strategy.mode_label().to_string(),
174            tuning_profile: plan.tuning_profile_label.clone(),
175            batch_size: plan.tuning.batch_size,
176        });
177
178        Self {
179            run_id,
180            export_name: plan.export_name.clone(),
181            status: "running".into(),
182            total_rows: 0,
183            files_produced: 0,
184            bytes_written: 0,
185            files_committed: 0,
186            duration_ms: 0,
187            peak_rss_mb: 0,
188            retries: 0,
189            validated: None,
190            schema_changed: None,
191            quality_passed: None,
192            error_message: None,
193            tuning_profile: plan.tuning_profile_label.clone(),
194            batch_size: plan.tuning.batch_size,
195            batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
196            format: plan.format.label().to_string(),
197            mode: plan.strategy.mode_label().to_string(),
198            compression: plan.compression.label().to_string(),
199            destination_uri: (!matches!(
200                plan.destination.destination_type,
201                crate::config::DestinationType::Stdout
202            ))
203            .then(|| crate::pipeline::finalize::destination_uri_for_manifest(&plan.destination)),
204            pg_temp_bytes_delta: None,
205            skip_reason: None,
206            source_count: None,
207            reconciled: None,
208            manifest_parts: Vec::new(),
209            schema_fingerprint: None,
210            manifest_verification: None,
211            apply_context: None,
212            column_checksums: Vec::new(),
213            checksum_key_column: None,
214            journal,
215        }
216    }
217
218    /// One canonical builder for tests across the crate + integration suite.
219    ///
220    /// Every field is filled with a sensible default; callers tweak only what
221    /// the test cares about via the chainable setters below.  This replaces
222    /// the seven copies of `stub_summary` / `fresh_summary` / `make_summary`
223    /// / `dummy_summary` / `empty_summary` that used to live in `notify.rs`,
224    /// `report.rs`, `chunked/{exec,mod}.rs`, `manifest_writer.rs`, and the
225    /// two integration-test files.  When `RunSummary` gains a field, it is
226    /// updated here once instead of across nine sites.
227    ///
228    /// Available outside `pipeline` (`pub` not `pub(crate)`) so integration
229    /// tests in `tests/` can use it via `RunSummary::stub_for_testing(...)`.
230    /// The `_for_testing` suffix is the convention from elsewhere in the
231    /// codebase (`destination_for_tests`, etc.) — production code should
232    /// never call it.
233    ///
234    /// `#[allow(dead_code)]` on each helper because the bin target's
235    /// dead-code analysis doesn't see uses from integration tests in `tests/`.
236    /// The lib's unit tests + the integration suite together exercise every
237    /// helper; the attribute is a no-op for them.
238    #[doc(hidden)]
239    #[allow(dead_code)]
240    pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
241        let run_id = run_id.into();
242        let export_name = export_name.into();
243        let journal = RunJournal::new(&run_id, &export_name);
244        Self {
245            run_id,
246            export_name,
247            status: "running".into(),
248            total_rows: 0,
249            files_produced: 0,
250            bytes_written: 0,
251            files_committed: 0,
252            duration_ms: 0,
253            peak_rss_mb: 0,
254            retries: 0,
255            validated: None,
256            schema_changed: None,
257            quality_passed: None,
258            error_message: None,
259            tuning_profile: "balanced".into(),
260            batch_size: 1000,
261            batch_size_memory_mb: None,
262            format: "parquet".into(),
263            mode: "snapshot".into(),
264            compression: "zstd".into(),
265            destination_uri: None,
266            pg_temp_bytes_delta: None,
267            skip_reason: None,
268            source_count: None,
269            reconciled: None,
270            manifest_parts: Vec::new(),
271            schema_fingerprint: None,
272            manifest_verification: None,
273            apply_context: None,
274            column_checksums: Vec::new(),
275            checksum_key_column: None,
276            journal,
277        }
278    }
279
280    /// Test-only chainable setter for the run's status field.
281    ///
282    /// Used to build success/failed/running variants without re-listing every
283    /// field.  Keeps the journal in sync: terminal statuses get a matching
284    /// `RunCompleted` event recorded so consumers reading
285    /// `journal.final_outcome()` see the right shape.
286    #[doc(hidden)]
287    #[allow(dead_code)]
288    pub fn with_status(mut self, status: impl Into<String>) -> Self {
289        let s = status.into();
290        if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
291            self.journal.record(RunEvent::RunCompleted {
292                status: s.clone(),
293                error_message: self.error_message.clone(),
294                duration_ms: self.duration_ms,
295            });
296        }
297        self.status = s;
298        self
299    }
300
301    /// Test-only setter — record `files_committed` so resume-hint logic
302    /// (`pipeline::report`) can detect the "failed run with committed files"
303    /// path that produces a resume command.
304    #[doc(hidden)]
305    #[allow(dead_code)]
306    pub fn with_files_committed(mut self, n: usize) -> Self {
307        self.files_committed = n;
308        self
309    }
310
311    /// Test-only setter — replace the recorded manifest parts (and adjust
312    /// `total_rows` / `bytes_written` / `files_produced` to keep them
313    /// consistent with the parts list, the way real production code does).
314    #[doc(hidden)]
315    #[allow(dead_code)]
316    pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
317        self.total_rows = parts.iter().map(|p| p.rows).sum();
318        self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
319        self.files_produced = parts.len();
320        self.files_committed = parts.len();
321        self.manifest_parts = parts;
322        self
323    }
324
325    /// Test-only setter — error_message + (optionally) status.  Common
326    /// shape for the "failed-run" fixtures that populated 4+ existing
327    /// stubs.
328    #[doc(hidden)]
329    #[allow(dead_code)]
330    pub fn with_error(mut self, msg: impl Into<String>) -> Self {
331        self.error_message = Some(msg.into());
332        self
333    }
334
335    /// Test-only setter — record a PlanResolved event in the journal so
336    /// downstream observability paths (`journal.plan_snapshot()`,
337    /// `RunReport::from_summary` plan_origin lookup) see the same shape
338    /// they would on a real run.
339    #[doc(hidden)]
340    #[allow(dead_code)]
341    pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
342        self.journal.record(RunEvent::PlanResolved(snap));
343        self
344    }
345
346    pub(super) fn print(&self) {
347        // Capturing mode (IPC child or in-process channel): emit a
348        // `Finished` event and let the unified UI thread render the card.
349        // No stderr block here — the renderer owns the screen.
350        if ipc::capturing_events() {
351            ipc::emit_event(&ChildEvent::Finished {
352                export_name: self.export_name.clone(),
353                run_id: self.run_id.clone(),
354                status: self.status.clone(),
355                total_rows: self.total_rows,
356                files_produced: self.files_produced as u64,
357                bytes_written: self.bytes_written,
358                duration_ms: self.duration_ms,
359                peak_rss_mb: self.peak_rss_mb,
360                error_message: self.error_message.clone(),
361            });
362            return;
363        }
364
365        self.print_stderr_block();
366    }
367
368    /// Write the per-export summary block to stderr, bypassing IPC capture.
369    /// Used after the in-process card UI finishes on single-export sequential
370    /// runs so operators still get the detailed `── name ──` block below the
371    /// live card line.
372    pub(super) fn print_stderr_block(&self) {
373        let block = if multi_export_mode() {
374            self.render_compact()
375        } else {
376            // Render the whole block into a single buffer so the call site
377            // emits one `write_all` to stderr.  Without this, parallel
378            // exports could interleave individual lines from different
379            // `RunSummary::print()` calls — visible as garbled blocks in
380            // `--parallel-exports` runs.
381            self.render().trim_end_matches('\n').to_string()
382        };
383
384        use std::io::Write;
385        // V9 (CWE-150): the block embeds error_message, which can carry
386        // attacker-controlled ANSI/OSC escapes from a malicious source DB. The
387        // single-export path reaches the operator terminal here (the parallel
388        // renderer sanitises separately). Funnel the whole block through the
389        // shared sanitiser before write — it preserves the renderer's own
390        // multi-byte glyphs (✓/✗/──) and strips only C0/C1/DEL control bytes.
391        let mut buf = super::parent_ui::sanitize_terminal(&block);
392        buf.push('\n');
393        let stderr = std::io::stderr();
394        let mut handle = stderr.lock();
395        let _ = handle.write_all(buf.as_bytes());
396        let _ = handle.flush();
397    }
398
399    /// Compact one-line summary used when several exports run in the same
400    /// invocation.  Mirrors the parent_ui card line so `--parallel-exports`
401    /// (threads), sequential, and `--parallel-export-processes` (processes)
402    /// produce visually consistent per-export rows.
403    fn render_compact(&self) -> String {
404        const NAME_COL: usize = 22;
405        const MODE_COL: usize = 8;
406        let icon = match self.status.as_str() {
407            "success" => "✓",
408            "failed" => "✗",
409            _ => "•",
410        };
411        let body = if self.status == "failed" {
412            let err = self
413                .error_message
414                .as_deref()
415                .unwrap_or("(no error message recorded)");
416            let (cause, _) = strip_chunked_recovery_hint(err);
417            // Collapse multi-line / extremely long errors so the compact
418            // line stays one row tall.  Full payload lives in the stderr
419            // log above the run summary.
420            compact_error(cause)
421        } else {
422            let rss = if self.peak_rss_mb > 0 {
423                format!("  RSS {} MB", fmt_thousands(self.peak_rss_mb))
424            } else {
425                String::new()
426            };
427            format!(
428                "{} rows  {} files  {}  {}{}",
429                fmt_thousands(self.total_rows),
430                fmt_thousands(self.files_produced as i64),
431                format_bytes(self.bytes_written),
432                fmt_duration_ms(self.duration_ms),
433                rss
434            )
435        };
436        format!(
437            "{} {:<name$}  {:<mode$}  {}",
438            icon,
439            self.export_name,
440            self.mode,
441            body,
442            name = NAME_COL,
443            mode = MODE_COL,
444        )
445    }
446
447    /// Build the block as a string.  Module-private so tests can assert
448    /// formatting without capturing stderr.
449    ///
450    /// Adaptive layout: assemble the `(label, value)` rows that apply to this
451    /// run from a flat manifest of per-row providers, then [`format_block`]
452    /// pads labels to the longest so columns line up *within* the block (the
453    /// header is fixed-width so consecutive blocks stay uniform regardless of
454    /// which optional fields are present). Each provider owns one row's gate +
455    /// formatting and is unit-testable in isolation; this method owns only
456    /// their order — completing the pattern begun by [`incremental_position_line`]
457    /// / [`time_window_skip_line`].
458    fn render(&self) -> String {
459        let mut rows: Vec<Row> = Vec::with_capacity(16);
460        rows.push(("run_id", self.run_id.clone()));
461        rows.push(self.status_row());
462        rows.push(self.tuning_row());
463        rows.push(("rows", fmt_thousands(self.total_rows)));
464        rows.push(("files", fmt_thousands(self.files_produced as i64)));
465        rows.extend(self.output_row());
466        rows.extend(self.position_row());
467        rows.extend(self.bytes_row());
468        rows.push(("duration", fmt_duration_ms(self.duration_ms)));
469        rows.extend(self.peak_rss_row());
470        rows.extend(self.pg_temp_spill_row());
471        rows.extend(self.compression_row());
472        rows.extend(self.retries_row());
473        rows.extend(self.outcome_rows());
474        rows.extend(self.error_row());
475        format_block(&self.export_name, &rows)
476    }
477
478    /// `status`, annotated with the skip reason when the run was skipped.
479    fn status_row(&self) -> Row {
480        let value = match (&self.status, &self.skip_reason) {
481            (s, Some(reason)) if s == "skipped" => format!("{s} ({reason})"),
482            (s, _) => s.clone(),
483        };
484        ("status", value)
485    }
486
487    /// `tuning` — profile + configured batch_size, noting the memory-derived
488    /// FETCH override when `batch_size_memory_mb` is set.
489    fn tuning_row(&self) -> Row {
490        let value = match self.batch_size_memory_mb {
491            Some(mem) => format!(
492                "profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
493                self.tuning_profile,
494                fmt_thousands(self.batch_size as i64),
495                mem
496            ),
497            None => format!(
498                "profile={}, batch_size={}",
499                self.tuning_profile,
500                fmt_thousands(self.batch_size as i64)
501            ),
502        };
503        ("tuning", value)
504    }
505
506    /// `output` — where the files landed, so a newcomer isn't left guessing
507    /// after a successful export. Only when we actually wrote somewhere
508    /// addressable (skips stdout and 0-file skips).
509    fn output_row(&self) -> Option<Row> {
510        if self.files_produced > 0 {
511            self.destination_uri.clone().map(|uri| ("output", uri))
512        } else {
513            None
514        }
515    }
516
517    /// `cursor` (incremental) xor `window` (time_window): the position line for
518    /// a 0-row run. On a bare `0 rows  0 files` run this tells the operator the
519    /// incremental boundary held / the rolling window was simply empty, rather
520    /// than leaving an empty run indistinguishable from a misconfigured one.
521    /// `None` for a run that produced rows or whose skip carries no position.
522    /// (Detail lives in the two helpers it wraps.)
523    fn position_row(&self) -> Option<Row> {
524        if let Some(pos) = incremental_position_line(self.skip_reason.as_deref()) {
525            Some(("cursor", pos))
526        } else {
527            time_window_skip_line(&self.mode, self.skip_reason.as_deref()).map(|w| ("window", w))
528        }
529    }
530
531    /// `bytes` — only when something was written.
532    fn bytes_row(&self) -> Option<Row> {
533        if self.bytes_written > 0 {
534            Some(("bytes", format_bytes(self.bytes_written)))
535        } else {
536            None
537        }
538    }
539
540    /// `peak RSS` — only when sampled during the run.
541    fn peak_rss_row(&self) -> Option<Row> {
542        if self.peak_rss_mb > 0 {
543            Some((
544                "peak RSS",
545                format!(
546                    "{} MB (sampled during run)",
547                    fmt_thousands(self.peak_rss_mb)
548                ),
549            ))
550        } else {
551            None
552        }
553    }
554
555    /// `pg temp spill` — PostgreSQL temp-file spill around the run. Chatters
556    /// only on actual spill (`> 0`); annotates a tuning hint above 100 MB.
557    /// `None` for non-Postgres sources, a failed probe, or no spill.
558    fn pg_temp_spill_row(&self) -> Option<Row> {
559        let temp = self.pg_temp_bytes_delta?;
560        if temp <= 0 {
561            return None;
562        }
563        let temp_mb = temp as f64 / (1024.0 * 1024.0);
564        let label = if temp > 100 * 1024 * 1024 {
565            format!(
566                "{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
567                temp_mb
568            )
569        } else {
570            format!("{:.1} MB", temp_mb)
571        };
572        Some(("pg temp spill", label))
573    }
574
575    /// `compression` — only when it differs from the parquet default (zstd).
576    fn compression_row(&self) -> Option<Row> {
577        if self.format == "parquet" && self.compression != "zstd" {
578            Some(("compression", self.compression.clone()))
579        } else {
580            None
581        }
582    }
583
584    /// `retries` — only when the run had to retry.
585    fn retries_row(&self) -> Option<Row> {
586        if self.retries > 0 {
587            Some(("retries", self.retries.to_string()))
588        } else {
589            None
590        }
591    }
592
593    /// Post-extraction check outcomes: `--validate`, schema-drift, the quality
594    /// gate, `--reconcile`, plus the advisory verify nudge. Grouped so the
595    /// nudge's "ran neither verification pass" gate sits next to the very
596    /// fields it inspects — when it was added (#4) it landed as a 10-line block
597    /// appended to a flat ladder; here that dependency is local.
598    fn outcome_rows(&self) -> Vec<Row> {
599        let mut rows: Vec<Row> = Vec::new();
600        if let Some(v) = self.validated {
601            rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
602        }
603        if let Some(sc) = self.schema_changed {
604            rows.push((
605                "schema",
606                if sc {
607                    "CHANGED".into()
608                } else {
609                    "unchanged".into()
610                },
611            ));
612        }
613        if let Some(q) = self.quality_passed {
614            rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
615        }
616        if let Some(reconciled) = self.reconciled {
617            let src = self
618                .source_count
619                .map(fmt_thousands)
620                .unwrap_or_else(|| "?".into());
621            let exported = fmt_thousands(self.total_rows);
622            let value = if reconciled {
623                format!("MATCH ({exported}/{src})")
624            } else {
625                format!("MISMATCH (exported {exported} vs source {src})")
626            };
627            rows.push(("reconcile", value));
628        }
629        // Nudge: a successful run that wrote files but ran neither verification
630        // pass leaves completeness unconfirmed. Advisory only — so skipping
631        // verification is a deliberate choice, not an oversight (a pilot loaded
632        // hundreds of millions of rows across 5 runs with 0 verified).
633        if self.status == "success"
634            && self.files_produced > 0
635            && self.validated.is_none()
636            && self.reconciled.is_none()
637        {
638            rows.push((
639                "verify",
640                "not run — add `--reconcile` (count vs source) or `rivet validate` (re-read outputs)"
641                    .into(),
642            ));
643        }
644        rows
645    }
646
647    /// `error` — the failure message, with its own multi-line structure
648    /// preserved (the detailed block indents continuation lines under the
649    /// value column; flattening to `"; "`-joined text is the compact
650    /// one-liner's job, not this one's — a quality failure's multi-line
651    /// `failed:\n  - <check>\n  Fix …` stays readable here).
652    fn error_row(&self) -> Option<Row> {
653        self.error_message
654            .as_ref()
655            .map(|err| ("error", err.trim_end().to_string()))
656    }
657
658    /// Sanity-check the post-run summary ↔ manifest_parts coherence. Used as
659    /// a `debug_assert!`-style runtime gate from `finalize_manifest` so any
660    /// future runner that bumps `bytes_written` / `files_committed` /
661    /// `files_produced` without going through `pipeline::commit::record_part`
662    /// is caught the moment it finishes a real export. Compiled out in
663    /// release builds via the `cfg!(debug_assertions)` guard at the call site.
664    ///
665    /// **Resume-safe inequalities only**: on resume, `manifest_parts` carries
666    /// prior runs' parts via `chunked::resume_m8` while `bytes_written` /
667    /// `files_committed` reflect only the current invocation — so strict
668    /// equality is wrong across resume boundaries. Strict equality on the
669    /// non-resume path is pinned by `pipeline::commit::tests`.
670    ///
671    /// Returns `Ok(())` when the summary satisfies the invariants, else an
672    /// `Err(String)` naming which one was violated and by how much.
673    pub fn check_post_run_invariants(&self) -> Result<(), String> {
674        let parts_bytes: u64 = self.manifest_parts.iter().map(|p| p.size_bytes).sum();
675
676        if self.files_committed > self.manifest_parts.len() {
677            return Err(format!(
678                "summary.files_committed ({}) > manifest_parts.len() ({}) — \
679                 a runner bumped files_committed without commit::record_part",
680                self.files_committed,
681                self.manifest_parts.len()
682            ));
683        }
684        if self.files_produced > self.manifest_parts.len() {
685            return Err(format!(
686                "summary.files_produced ({}) > manifest_parts.len() ({}) — \
687                 a runner bumped files_produced without commit::record_part",
688                self.files_produced,
689                self.manifest_parts.len()
690            ));
691        }
692        if self.bytes_written > parts_bytes {
693            return Err(format!(
694                "summary.bytes_written ({}) > sum(manifest_parts.size_bytes) ({}) — \
695                 a runner bumped bytes_written without commit::record_part",
696                self.bytes_written, parts_bytes
697            ));
698        }
699        if self.status == "success" && self.files_committed > 0 && self.manifest_parts.is_empty() {
700            return Err(format!(
701                "success run with files_committed={} has empty manifest_parts — \
702                 cloud manifest (ADR-0012 M1) would ship with no part list \
703                 (this is the gap parallel_checkpoint had before commit e9b0796)",
704                self.files_committed
705            ));
706        }
707        // Invariant audit gap #1, weak form: a successful run that produced
708        // rows for THIS invocation must have committed at least one file.
709        // The strict form ("rows_written <= rows_read") would require a
710        // separate source-side row counter we do not track, and concurrent
711        // INSERTs on the source (live_oltp_load) make a source_count
712        // comparison brittle. This weak form catches a fabrication shape:
713        // total_rows accumulated but nothing reached the destination — a
714        // runner that fetched and silently dropped rows produces exactly
715        // this signature. Resume-safe: total_rows reflects only this
716        // invocation, so a resume with no work to do legitimately ends at
717        // total_rows=0 / files_committed=0 and the guard does not fire.
718        if self.status == "success" && self.total_rows > 0 && self.files_committed == 0 {
719            return Err(format!(
720                "summary.total_rows={} but files_committed=0 — rows extracted from \
721                 source but no files committed (no output reached the destination)",
722                self.total_rows
723            ));
724        }
725        Ok(())
726    }
727}
728
729/// Reduce a possibly-multi-line execution error to a single-line, bounded-
730/// length cause suitable for the per-export summary block and the compact
731/// one-liner.  Keeps the user-actionable bit and drops noisy diagnostic
732/// payloads (long URLs, query strings, repeated chunk errors).
733///
734/// Recognised shapes:
735/// - `parallel checkpoint worker errors:\nchunk N: <msg>\nchunk M: <msg>` →
736///   `parallel checkpoint workers failed: K chunk(s) (chunk N: <truncated>)`.
737///   The full per-chunk detail is already in stderr logs.
738/// - Generic multi-line: newlines are replaced with `; ` and the result is
739///   clamped to 240 characters with an ellipsis.
740fn compact_error(raw: &str) -> String {
741    const MAX_CHARS: usize = 240;
742    if let Some(summary) = summarize_parallel_chunk_errors(raw) {
743        return clamp_chars(&summary, MAX_CHARS);
744    }
745    let collapsed: String = raw
746        .lines()
747        .map(str::trim_end)
748        .filter(|s| !s.is_empty())
749        .collect::<Vec<_>>()
750        .join("; ");
751    clamp_chars(&collapsed, MAX_CHARS)
752}
753
754/// Derive the summary block's `cursor:` line from a `skip_reason`.
755///
756/// `skip_reason` for an incremental no-op is `"no new rows since cursor
757/// '<col>'"` (set by the runner); we lift the column out and report the
758/// position as held. Returns `None` for the non-cursor `"source returned 0
759/// rows"` skip and for `None` (a run that actually produced rows). The cursor
760/// *value* isn't carried on the summary yet, so this is column-level only.
761fn incremental_position_line(skip_reason: Option<&str>) -> Option<String> {
762    let col = skip_reason?
763        .strip_prefix("no new rows since cursor '")?
764        .strip_suffix('\'')?;
765    Some(format!("'{col}' unchanged (no new rows this run)"))
766}
767
768/// Derive the summary block's `window:` line for a time_window run that
769/// returned nothing.
770///
771/// A `TimeWindow` strategy has no cursor column, so a 0-row run reports the
772/// generic `"source returned 0 rows"` skip — the `incremental_position_line`
773/// branch never fires and, without this line, an empty time window looks
774/// identical to any other empty export. Surfacing it lets the operator tell an
775/// *empty window* (data simply outside the rolling range) from a *misconfigured
776/// window* (wrong `time_column` / `days_window`).
777///
778/// Keyed on `mode == "timewindow"` (set from `ExtractionStrategy::mode_label`)
779/// plus a set skip reason, so it only fires on a skipped time_window run and
780/// never on incremental/snapshot/chunked/keyset. The window column, days, and
781/// computed lower bound are not carried on `RunSummary`, so this reports the
782/// strategy-level fact and where to look — the concrete bound is a follow-up
783/// once the runner records it onto the summary.
784fn time_window_skip_line(mode: &str, skip_reason: Option<&str>) -> Option<String> {
785    skip_reason?;
786    if mode != "timewindow" {
787        return None;
788    }
789    Some("rolling time window matched no rows — check `time_column`/`days_window`".to_string())
790}
791
792fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
793    let header_pos = raw.find("parallel checkpoint worker errors:")?;
794    let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
795    let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
796
797    let chunk_lines: Vec<&str> = tail
798        .lines()
799        .map(str::trim)
800        .filter(|l| l.starts_with("chunk "))
801        .collect();
802    if chunk_lines.is_empty() {
803        return None;
804    }
805    let first_chunk_full = chunk_lines[0];
806    // Truncate the example chunk message; the URL/payload is in stderr logs.
807    let first_chunk_short = clamp_chars(first_chunk_full, 140);
808    let prefix = if prefix.is_empty() {
809        String::new()
810    } else {
811        format!("{}: ", prefix)
812    };
813    Some(format!(
814        "{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
815        prefix,
816        chunk_lines.len(),
817        first_chunk_short
818    ))
819}
820
821fn clamp_chars(s: &str, max_chars: usize) -> String {
822    if max_chars == 0 {
823        return String::new();
824    }
825    if s.chars().count() <= max_chars {
826        return s.to_string();
827    }
828    let keep = max_chars.saturating_sub(1);
829    let mut out: String = s.chars().take(keep).collect();
830    out.push('…');
831    out
832}
833
834/// Render a `── name ─────…─` header plus one indented `label:  value` line
835/// per row, all joined into a single string ending with `\n`.
836fn format_block(name: &str, rows: &[(&str, String)]) -> String {
837    const HEADER_WIDTH: usize = 60;
838    let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
839
840    let prefix = format!("── {} ", name);
841    let prefix_chars = prefix.chars().count();
842    let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
843    let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
844    out.push('\n');
845    out.push_str(&prefix);
846    for _ in 0..dashes {
847        out.push('─');
848    }
849    out.push('\n');
850    // Continuation lines of a multi-line value (e.g. the multi-line `error`
851    // row) are indented to align under the value column, so block-shaped
852    // messages stay readable instead of being flattened onto one line.
853    let value_indent = " ".repeat(2 + (label_w + 1) + 2);
854    for (label, value) in rows {
855        // `label_w + 1` so the colon stays attached to the label and the
856        // value column starts uniformly two spaces after it.
857        let mut lines = value.split('\n');
858        let first = lines.next().unwrap_or("");
859        out.push_str(&format!(
860            "  {:<width$}  {}\n",
861            format!("{label}:"),
862            first,
863            width = label_w + 1
864        ));
865        for cont in lines {
866            out.push_str(&value_indent);
867            out.push_str(cont);
868            out.push('\n');
869        }
870    }
871    out
872}
873
874fn fmt_duration_ms(ms: i64) -> String {
875    if ms < 1000 {
876        return format!("{}ms", ms);
877    }
878    let total_secs = ms / 1000;
879    let h = total_secs / 3600;
880    let m = (total_secs % 3600) / 60;
881    let s_frac = (ms % 60_000) as f64 / 1000.0;
882    if h > 0 {
883        format!("{}h {:02}m {:04.1}s", h, m, s_frac)
884    } else if m > 0 {
885        format!("{}m {:04.1}s", m, s_frac)
886    } else {
887        format!("{:.1}s", ms as f64 / 1000.0)
888    }
889}
890
891/// Format integers with a comma every three digits.  Negative values keep
892/// their sign.  Used for rows / files / batch_size so large numbers stay
893/// readable: `39_990_376` → `39,990,376`.
894fn fmt_thousands(n: i64) -> String {
895    let abs = n.unsigned_abs();
896    let s = abs.to_string();
897    let bytes = s.as_bytes();
898    let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
899    if n < 0 {
900        out.push('-');
901    }
902    for (i, b) in bytes.iter().enumerate() {
903        let from_end = bytes.len() - i;
904        if i > 0 && from_end.is_multiple_of(3) {
905            out.push(',');
906        }
907        out.push(*b as char);
908    }
909    out
910}
911
912#[cfg(test)]
913mod tests {
914    use super::*;
915
916    #[test]
917    fn fmt_thousands_handles_small_and_large() {
918        assert_eq!(fmt_thousands(0), "0");
919        assert_eq!(fmt_thousands(7), "7");
920        assert_eq!(fmt_thousands(999), "999");
921        assert_eq!(fmt_thousands(1_000), "1,000");
922        assert_eq!(fmt_thousands(1_000_908), "1,000,908");
923        assert_eq!(fmt_thousands(39_990_376), "39,990,376");
924        assert_eq!(fmt_thousands(-1_234), "-1,234");
925        assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
926    }
927
928    #[test]
929    fn fmt_duration_picks_unit() {
930        assert_eq!(fmt_duration_ms(0), "0ms");
931        assert_eq!(fmt_duration_ms(800), "800ms");
932        assert_eq!(fmt_duration_ms(1_500), "1.5s");
933        assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
934        assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
935    }
936
937    #[test]
938    fn format_block_pads_labels_uniformly() {
939        let rows = vec![
940            ("run_id", "abc".to_string()),
941            ("rows", "42".to_string()),
942            ("compression", "zstd".to_string()),
943        ];
944        let out = format_block("orders", &rows);
945
946        // Each value column starts at the same character position.
947        let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
948        assert_eq!(lines.len(), 3);
949        let value_starts: Vec<usize> = lines
950            .iter()
951            .map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
952            .collect();
953        // The value (after `label:` plus padding plus two spaces) starts at the
954        // same column for every row.  We verify by checking all lines have the
955        // value substring at the same byte offset.
956        let value_col = lines[0].rfind("abc").unwrap();
957        assert_eq!(lines[1].rfind("42").unwrap(), value_col);
958        assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
959        // Sanity: silence unused.
960        let _ = value_starts;
961    }
962
963    #[test]
964    fn format_block_header_has_consistent_width() {
965        let block_a = format_block("a", &[("rows", "1".into())]);
966        let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
967        let header_a = block_a.lines().nth(1).unwrap();
968        let header_b = block_b.lines().nth(1).unwrap();
969        assert_eq!(
970            header_a.chars().count(),
971            header_b.chars().count(),
972            "headers must be the same width regardless of name length: {:?} vs {:?}",
973            header_a,
974            header_b
975        );
976    }
977
978    #[test]
979    fn render_produces_a_single_string_with_trailing_newline() {
980        use crate::plan::{
981            CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
982            MetaColumns, ResolvedRunPlan,
983        };
984        use crate::tuning::SourceTuning;
985        let plan = ResolvedRunPlan {
986            export_name: "orders".into(),
987            base_query: "SELECT 1".into(),
988            strategy: ExtractionStrategy::Snapshot,
989            format: FormatType::Parquet,
990            compression: CompressionType::default(),
991            compression_level: None,
992            max_file_size_bytes: None,
993            skip_empty: false,
994            meta_columns: MetaColumns::default(),
995            destination: DestinationConfig {
996                destination_type: DestinationType::Local,
997                path: Some("./out".into()),
998                ..Default::default()
999            },
1000            quality: None,
1001            tuning: SourceTuning::from_config(None),
1002            tuning_profile_label: "balanced (default)".into(),
1003            validate: false,
1004            reconcile: false,
1005            resume: false,
1006            source: crate::config::SourceConfig {
1007                source_type: crate::config::SourceType::Postgres,
1008                url: Some("postgresql://localhost/test".into()),
1009                url_env: None,
1010                url_file: None,
1011                host: None,
1012                port: None,
1013                user: None,
1014                password: None,
1015                password_env: None,
1016                database: None,
1017                environment: None,
1018                tuning: None,
1019                tls: None,
1020            },
1021            column_overrides: Default::default(),
1022            verify: crate::config::VerifyMode::Size,
1023            schema_drift_policy: Default::default(),
1024            shape_drift_warn_factor: 2.0,
1025            parquet: None,
1026        };
1027        let mut s = RunSummary::new(&plan);
1028        s.status = "success".into();
1029        s.total_rows = 1_000_908;
1030        s.files_produced = 11;
1031        s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
1032        s.duration_ms = 68_400;
1033        s.peak_rss_mb = 884;
1034
1035        let block = s.render();
1036        assert!(
1037            block.starts_with('\n'),
1038            "block should start with a blank line"
1039        );
1040        assert!(block.ends_with('\n'), "block should end with a newline");
1041        assert!(block.contains("── orders "));
1042        assert!(
1043            block.contains("1,000,908"),
1044            "rows should be formatted with thousands separator: {}",
1045            block
1046        );
1047        assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
1048        // No raw progress-bar bleed: header dashes still present, no carriage
1049        // returns or escape sequences.
1050        assert!(!block.contains('\r'));
1051
1052        // Compact one-liner used in multi-export runs.
1053        let line = s.render_compact();
1054        assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
1055        assert!(line.contains("orders"), "export name present: {:?}", line);
1056        assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
1057        assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
1058        assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
1059        assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
1060        assert!(!line.contains('\n'), "single line: {:?}", line);
1061    }
1062
1063    #[test]
1064    fn compact_error_summarises_parallel_chunk_errors() {
1065        let raw = "export 'page_views': parallel checkpoint worker errors:\n\
1066                   chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
1067                   chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
1068        let out = compact_error(raw);
1069        assert!(
1070            out.contains("2 chunk(s)"),
1071            "should report number of failed chunks: {:?}",
1072            out
1073        );
1074        assert!(
1075            out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
1076            "should keep export prefix and use compact phrasing: {:?}",
1077            out
1078        );
1079        assert!(
1080            out.contains("chunk 4:"),
1081            "should include the first chunk as an example: {:?}",
1082            out
1083        );
1084        assert!(!out.contains('\n'), "single line output: {:?}", out);
1085        assert!(
1086            out.chars().count() <= 240,
1087            "must be clamped to <=240 chars, got {}: {:?}",
1088            out.chars().count(),
1089            out
1090        );
1091    }
1092
1093    #[test]
1094    fn compact_error_collapses_generic_multiline() {
1095        let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
1096        let out = compact_error(raw);
1097        assert_eq!(
1098            out, "first line of trouble; second line with detail; third line",
1099            "newlines should collapse to '; ' and blanks dropped"
1100        );
1101    }
1102
1103    #[test]
1104    fn compact_error_clamps_excessively_long_lines() {
1105        let raw = "x".repeat(1_000);
1106        let out = compact_error(&raw);
1107        assert_eq!(out.chars().count(), 240);
1108        assert!(out.ends_with('…'));
1109    }
1110
1111    #[test]
1112    fn render_compact_strips_chunked_recovery_hint_for_failed() {
1113        use crate::plan::{
1114            CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1115            MetaColumns, ResolvedRunPlan,
1116        };
1117        use crate::tuning::SourceTuning;
1118        let plan = ResolvedRunPlan {
1119            export_name: "events".into(),
1120            base_query: "SELECT 1".into(),
1121            strategy: ExtractionStrategy::Snapshot,
1122            format: FormatType::Parquet,
1123            compression: CompressionType::default(),
1124            compression_level: None,
1125            max_file_size_bytes: None,
1126            skip_empty: false,
1127            meta_columns: MetaColumns::default(),
1128            destination: DestinationConfig {
1129                destination_type: DestinationType::Local,
1130                path: Some("./out".into()),
1131                ..Default::default()
1132            },
1133            quality: None,
1134            tuning: SourceTuning::from_config(None),
1135            tuning_profile_label: "balanced (default)".into(),
1136            validate: false,
1137            reconcile: false,
1138            resume: false,
1139            source: crate::config::SourceConfig {
1140                source_type: crate::config::SourceType::Postgres,
1141                url: Some("postgresql://localhost/test".into()),
1142                url_env: None,
1143                url_file: None,
1144                host: None,
1145                port: None,
1146                user: None,
1147                password: None,
1148                password_env: None,
1149                database: None,
1150                environment: None,
1151                tuning: None,
1152                tls: None,
1153            },
1154            column_overrides: Default::default(),
1155            verify: crate::config::VerifyMode::Size,
1156            schema_drift_policy: Default::default(),
1157            shape_drift_warn_factor: 2.0,
1158            parquet: None,
1159        };
1160        let mut s = RunSummary::new(&plan);
1161        s.status = "failed".into();
1162        s.error_message = Some(
1163            "export 'events': --resume but no in-progress chunk checkpoint; \
1164             run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
1165                .to_string(),
1166        );
1167
1168        let line = s.render_compact();
1169        assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
1170        assert!(line.contains("events"), "name present: {:?}", line);
1171        assert!(
1172            line.contains("--resume but no in-progress chunk checkpoint"),
1173            "cause kept: {:?}",
1174            line
1175        );
1176        assert!(
1177            !line.contains("rivet state reset-chunks"),
1178            "recovery hint should be stripped from per-export line: {:?}",
1179            line
1180        );
1181        assert!(!line.contains('\n'), "single line: {:?}", line);
1182    }
1183
1184    fn plan_for(export_name: &str) -> crate::plan::ResolvedRunPlan {
1185        use crate::plan::{
1186            CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1187            MetaColumns, ResolvedRunPlan,
1188        };
1189        use crate::tuning::SourceTuning;
1190        ResolvedRunPlan {
1191            export_name: export_name.into(),
1192            base_query: "SELECT 1".into(),
1193            strategy: ExtractionStrategy::Snapshot,
1194            format: FormatType::Parquet,
1195            compression: CompressionType::default(),
1196            compression_level: None,
1197            max_file_size_bytes: None,
1198            skip_empty: false,
1199            meta_columns: MetaColumns::default(),
1200            destination: DestinationConfig {
1201                destination_type: DestinationType::Local,
1202                path: Some("./out".into()),
1203                ..Default::default()
1204            },
1205            quality: None,
1206            tuning: SourceTuning::from_config(None),
1207            tuning_profile_label: "balanced (default)".into(),
1208            validate: false,
1209            reconcile: false,
1210            resume: false,
1211            source: crate::config::SourceConfig {
1212                source_type: crate::config::SourceType::Postgres,
1213                url: Some("postgresql://localhost/test".into()),
1214                url_env: None,
1215                url_file: None,
1216                host: None,
1217                port: None,
1218                user: None,
1219                password: None,
1220                password_env: None,
1221                database: None,
1222                environment: None,
1223                tuning: None,
1224                tls: None,
1225            },
1226            column_overrides: Default::default(),
1227            verify: crate::config::VerifyMode::Size,
1228            schema_drift_policy: Default::default(),
1229            shape_drift_warn_factor: 2.0,
1230            parquet: None,
1231        }
1232    }
1233
1234    #[test]
1235    fn render_preserves_multiline_error_block() {
1236        // L19: a multi-line error (a quality failure here) must stay multi-line
1237        // in the detailed single-export block — not collapsed to `"; "`-joined
1238        // text the way the compact one-liner does.
1239        let mut s = RunSummary::new(&plan_for("orders"));
1240        s.status = "failed".into();
1241        s.error_message = Some(
1242            "export 'orders': 1 quality check(s) failed:\n  \
1243             - row_count 10 below minimum 999999\n  \
1244             Fix the source data, or adjust the thresholds under `quality:` in your config."
1245                .to_string(),
1246        );
1247
1248        let block = s.render();
1249        // The collapsed form joined lines with `"; "` — assert that flattening
1250        // is gone and the original newline structure survives.
1251        assert!(
1252            !block.contains("failed:;"),
1253            "error must not be '; '-flattened in the detailed block: {block}"
1254        );
1255        assert!(
1256            block.contains("- row_count 10 below minimum 999999"),
1257            "failing check line present: {block}"
1258        );
1259        // Each part of the multi-line error lands on its own line.
1260        let err_lines: Vec<&str> = block
1261            .lines()
1262            .filter(|l| {
1263                l.contains("quality check(s) failed")
1264                    || l.contains("row_count 10 below minimum")
1265                    || l.contains("Fix the source data")
1266            })
1267            .collect();
1268        assert_eq!(
1269            err_lines.len(),
1270            3,
1271            "all three error lines should render on separate lines: {block}"
1272        );
1273        // Continuation lines are indented under the value column, not at col 0.
1274        for l in &err_lines {
1275            assert!(l.starts_with(' '), "error line should be indented: {l:?}");
1276        }
1277    }
1278
1279    #[test]
1280    fn render_nudges_verification_when_unverified_success() {
1281        // #4: a successful run that wrote files but ran no verification pass
1282        // should surface an advisory `verify:` line — so skipping it is a choice.
1283        let mut s = RunSummary::new(&plan_for("orders"));
1284        s.status = "success".into();
1285        s.files_produced = 3;
1286        s.total_rows = 1_000;
1287        // validated / reconciled left None (no --validate / --reconcile).
1288        let block = s.render();
1289        assert!(
1290            block.lines().any(|l| l.trim_start().starts_with("verify:")),
1291            "expected a verify nudge on an unverified success: {block}"
1292        );
1293
1294        // A run that verified must NOT nudge.
1295        let mut s2 = RunSummary::new(&plan_for("orders"));
1296        s2.status = "success".into();
1297        s2.files_produced = 3;
1298        s2.validated = Some(true);
1299        let block2 = s2.render();
1300        assert!(
1301            !block2
1302                .lines()
1303                .any(|l| l.trim_start().starts_with("verify:")),
1304            "a verified run must not nudge: {block2}"
1305        );
1306    }
1307
1308    #[test]
1309    fn pg_temp_spill_row_only_for_real_spill_and_annotates_large() {
1310        // Direct provider test — previously this threshold logic was only
1311        // reachable by rendering a whole block (and no test set the field).
1312        let mut s = RunSummary::stub_for_testing("r", "orders");
1313        assert_eq!(s.pg_temp_spill_row(), None, "no delta → no row");
1314        s.pg_temp_bytes_delta = Some(0);
1315        assert_eq!(s.pg_temp_spill_row(), None, "zero spill → no row");
1316        s.pg_temp_bytes_delta = Some(-5);
1317        assert_eq!(s.pg_temp_spill_row(), None, "negative delta → no row");
1318
1319        s.pg_temp_bytes_delta = Some(50 * 1024 * 1024);
1320        let (label, value) = s.pg_temp_spill_row().expect("50MB spill → row");
1321        assert_eq!(label, "pg temp spill");
1322        assert!(
1323            value.contains("50.0 MB") && !value.contains('⚠'),
1324            "small spill is plain info: {value:?}"
1325        );
1326
1327        s.pg_temp_bytes_delta = Some(200 * 1024 * 1024);
1328        let (_, value) = s.pg_temp_spill_row().expect("200MB spill → row");
1329        assert!(
1330            value.contains('⚠') && value.contains("batch_size"),
1331            "spill over 100 MB carries the tuning hint: {value:?}"
1332        );
1333    }
1334
1335    #[test]
1336    fn outcome_rows_format_reconcile_and_suppress_nudge_when_checked() {
1337        let mut s = RunSummary::stub_for_testing("r", "orders");
1338        s.reconciled = Some(true);
1339        s.source_count = Some(1_000);
1340        s.total_rows = 1_000;
1341        assert!(
1342            s.outcome_rows()
1343                .iter()
1344                .any(|(l, v)| *l == "reconcile" && v == "MATCH (1,000/1,000)"),
1345            "match wording: {:?}",
1346            s.outcome_rows()
1347        );
1348
1349        s.reconciled = Some(false);
1350        s.source_count = Some(1_200);
1351        let rows = s.outcome_rows();
1352        let recon = rows
1353            .iter()
1354            .find(|(l, _)| *l == "reconcile")
1355            .expect("reconcile row");
1356        assert!(
1357            recon.1.contains("MISMATCH") && recon.1.contains("1,000") && recon.1.contains("1,200"),
1358            "mismatch names both sides: {:?}",
1359            recon
1360        );
1361
1362        // A set reconcile result suppresses the verify nudge even on a
1363        // files-produced success (the nudge's gate lives beside this field).
1364        s.status = "success".into();
1365        s.files_produced = 2;
1366        assert!(
1367            !s.outcome_rows().iter().any(|(l, _)| *l == "verify"),
1368            "a reconciled run must not also nudge"
1369        );
1370    }
1371
1372    #[test]
1373    fn render_surfaces_cursor_position_on_zero_new_incremental() {
1374        // L27: a 0-new incremental run shows `0 rows  0 files`; without a
1375        // cursor line the operator can't tell the boundary held. Assert the
1376        // dedicated `cursor:` line appears, derived from `skip_reason`.
1377        let mut s = RunSummary::new(&plan_for("orders"));
1378        s.status = "skipped".into();
1379        s.skip_reason = Some("no new rows since cursor 'updated_at'".into());
1380
1381        let block = s.render();
1382        let cursor_line = block
1383            .lines()
1384            .find(|l| l.trim_start().starts_with("cursor:"))
1385            .unwrap_or_else(|| panic!("expected a cursor: line in block: {block}"));
1386        assert!(
1387            cursor_line.contains("'updated_at'"),
1388            "cursor line names the column: {cursor_line:?}"
1389        );
1390        assert!(
1391            cursor_line.contains("unchanged"),
1392            "cursor line reports the position held: {cursor_line:?}"
1393        );
1394    }
1395
1396    #[test]
1397    fn incremental_position_line_only_for_cursor_skips() {
1398        // The non-cursor 0-row skip and the no-skip case produce no cursor line.
1399        assert_eq!(
1400            incremental_position_line(Some("no new rows since cursor 'ts'")),
1401            Some("'ts' unchanged (no new rows this run)".into())
1402        );
1403        assert_eq!(
1404            incremental_position_line(Some("source returned 0 rows")),
1405            None
1406        );
1407        assert_eq!(incremental_position_line(None), None);
1408    }
1409
1410    #[test]
1411    fn render_surfaces_window_position_on_zero_row_time_window() {
1412        // L27 (time_window arm): a 0-row time_window run reports the generic
1413        // `"source returned 0 rows"` skip (the strategy has no cursor column),
1414        // so the `cursor:` branch never fires. Without a `window:` line the
1415        // operator can't tell an empty window from a wrong column/window —
1416        // assert the dedicated `window:` line appears for this mode.
1417        let mut s = RunSummary::new(&plan_for("events"));
1418        s.status = "skipped".into();
1419        s.mode = "timewindow".into();
1420        s.skip_reason = Some("source returned 0 rows".into());
1421
1422        let block = s.render();
1423        let window_line = block
1424            .lines()
1425            .find(|l| l.trim_start().starts_with("window:"))
1426            .unwrap_or_else(|| panic!("expected a window: line in block: {block}"));
1427        assert!(
1428            window_line.contains("matched no rows"),
1429            "window line reports the empty window: {window_line:?}"
1430        );
1431        assert!(
1432            window_line.contains("time_column") && window_line.contains("days_window"),
1433            "window line points at the window config to check: {window_line:?}"
1434        );
1435        // The generic 0-row skip must not also produce a `cursor:` line.
1436        assert!(
1437            !block.lines().any(|l| l.trim_start().starts_with("cursor:")),
1438            "no cursor line for a non-cursor strategy: {block}"
1439        );
1440    }
1441
1442    #[test]
1443    fn time_window_skip_line_only_for_skipped_time_window() {
1444        // Fires only when the run skipped AND the strategy is time_window.
1445        assert_eq!(
1446            time_window_skip_line("timewindow", Some("source returned 0 rows")),
1447            Some("rolling time window matched no rows — check `time_column`/`days_window`".into())
1448        );
1449        // Wrong mode → no window line (incremental/snapshot handle their own).
1450        assert_eq!(
1451            time_window_skip_line("incremental", Some("source returned 0 rows")),
1452            None
1453        );
1454        assert_eq!(
1455            time_window_skip_line("full", Some("source returned 0 rows")),
1456            None
1457        );
1458        // A time_window run that produced rows (no skip) gets no window line.
1459        assert_eq!(time_window_skip_line("timewindow", None), None);
1460    }
1461}