Skip to main content

rivet/pipeline/
summary.rs

1//! **Layer: Observability**
2//!
3//! `RunSummary` is the single observability artifact for a pipeline run.
4//! It accumulates operational data during execution and is consumed by:
5//! - the end-of-run terminal output (`print`)
6//! - the metrics store (`state::record_metric`)
7//! - the notification system (`notify::maybe_send`)
8//!
9//! `RunSummary` is written to by execution modules (row counts, byte counts, retries)
10//! but it makes no execution decisions itself — it is a pure data accumulator.
11//!
12//! It embeds a `RunJournal` so that all pipeline modules — which already hold
13//! `&mut RunSummary` — can record structured events via `summary.journal.record()`
14//! without any signature changes.  In a future epic the relationship will invert:
15//! `RunSummary` will be derived from `RunJournal`.
16
17use super::ipc::{self, ChildEvent};
18use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
19use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
20use crate::manifest::ManifestPart;
21use crate::plan::ResolvedRunPlan;
22
23/// Build a `PlanSnapshot` from a `ResolvedRunPlan`.
24///
25/// Lives here rather than on `journal` itself so that the journal module
26/// stays free of plan/pipeline dependencies (avoids the state→pipeline cycle
27/// we used to have via `state::journal_store`).
28fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
29    PlanSnapshot {
30        export_name: plan.export_name.clone(),
31        base_query: plan.base_query.clone(),
32        strategy: plan.strategy.mode_label().to_string(),
33        format: plan.format.label().to_string(),
34        compression: plan.compression.label().to_string(),
35        destination_type: plan.destination.destination_type.label().to_string(),
36        tuning_profile: plan.tuning_profile_label.clone(),
37        batch_size: plan.tuning.batch_size,
38        validate: plan.validate,
39        reconcile: plan.reconcile,
40        resume: plan.resume,
41    }
42}
43
44/// Context recorded when a run was launched via `rivet apply` rather than
45/// `rivet run`.  Provides the audit trail for **F5**: which plan artifact
46/// was applied, whether `--force` was passed, and which preflight checks
47/// `--force` actually bypassed.
48///
49/// `None` on `RunSummary` means the run came from `rivet run` (no plan
50/// artifact was applied).
51#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct ApplyContext {
53    /// `plan_id` from the applied `PlanArtifact`.
54    pub plan_id: String,
55    /// Was `--force` passed to `rivet apply`?
56    pub forced: bool,
57    /// Names of preflight checks that `--force` actually overrode for this
58    /// run.  Possible values: `"staleness"`, `"cursor_drift"`.  Empty when
59    /// `forced` is true but no check actually had to be bypassed (the plan
60    /// was fresh and the cursor matched).
61    pub force_bypassed: Vec<String>,
62}
63
64/// Accumulates operational data during a pipeline run for summary and metrics.
65///
66/// The embedded `journal` is the structured event log for this run.  Use
67/// `summary.journal.record(event)` at any call site that already holds
68/// `&mut RunSummary`.
69#[derive(Debug, Clone)]
70pub struct RunSummary {
71    pub run_id: String,
72    pub export_name: String,
73    pub status: String,
74    pub total_rows: i64,
75    pub files_produced: usize,
76    pub bytes_written: u64,
77    /// Incremented after each successful `dest.write()`. Non-zero means a previous
78    /// attempt already committed data — retrying from the same cursor would duplicate rows.
79    pub files_committed: usize,
80    pub duration_ms: i64,
81    pub peak_rss_mb: i64,
82    pub retries: u32,
83    pub validated: Option<bool>,
84    pub schema_changed: Option<bool>,
85    pub quality_passed: Option<bool>,
86    pub error_message: Option<String>,
87    /// `profile` from YAML, or `balanced (default)` if omitted.
88    pub tuning_profile: String,
89    /// Configured `batch_size` from YAML/profile (FETCH cap before `batch_size_memory_mb` override).
90    pub batch_size: usize,
91    /// When set, actual FETCH size is derived from schema (see logs).
92    pub batch_size_memory_mb: Option<usize>,
93    pub format: String,
94    pub mode: String,
95    pub compression: String,
96    /// Where the files were written, as an operator would type it to find them
97    /// again (`./output`, `s3://bucket/prefix`, …). `None` for stdout or a
98    /// summary built outside the run path (tests). Surfaced on the success card
99    /// so a newcomer isn't left wondering where their data landed.
100    pub destination_uri: Option<String>,
101    /// Postgres `pg_stat_database.temp_bytes` delta around the run. `None` for
102    /// non-Postgres sources or when the snapshot probe failed (no admin perms
103    /// not required — the view is readable by any role). When set and large,
104    /// indicates cursor / sort spill to `pgsql_tmp/` — the safe action is to
105    /// shrink `tuning.batch_size` or set `tuning.batch_size_memory_mb` below
106    /// PG's `work_mem`.
107    pub pg_temp_bytes_delta: Option<i64>,
108    /// Human-readable parenthetical attached to `status: skipped` so the
109    /// operator knows *why* there was nothing to export this run (e.g.
110    /// `"no new rows since cursor 'updated_at'"`). Always `None` when
111    /// `status != "skipped"`. Surfaced in the console summary card as
112    /// `status: skipped (<reason>)`.
113    pub skip_reason: Option<String>,
114    /// Source COUNT(*) result for reconciliation (None = not requested or not applicable).
115    pub source_count: Option<i64>,
116    /// Whether reconciliation passed (Some(true) = match, Some(false) = mismatch, None = skipped).
117    pub reconciled: Option<bool>,
118    /// Committed parts accumulated during the run, in commit order.  Populated by
119    /// `pipeline::manifest_writer::record_committed_part` at each `dest.write`
120    /// site (ADR-0012 M1 — Parts Before Manifest).  Drained at finalize into a
121    /// `RunManifest` by [`crate::pipeline::manifest_writer::write_manifest`].
122    pub manifest_parts: Vec<ManifestPart>,
123    /// xxh3 fingerprint of the dest-facing column schema for this run, in the
124    /// canonical `xxh3:<16-hex>` form produced by [`crate::state::schema_fingerprint`].
125    ///
126    /// Recorded by [`crate::pipeline::manifest_writer::record_run_schema_fingerprint`]
127    /// the first time the sink has resolved a schema (i.e. on the first batch
128    /// of any chunk).  Idempotent within a run — the schema is identical across
129    /// chunks, so later writes are no-ops.
130    ///
131    /// `finalize_manifest` reads this directly so the manifest's
132    /// `schema_fingerprint` no longer depends on the per-export schema row
133    /// happening to land in `state` before the manifest write.  The state
134    /// lookup remains a fallback for resume scenarios where the summary was
135    /// reconstructed without ever seeing a live schema.
136    pub schema_fingerprint: Option<String>,
137    /// Result of the manifest-aware `--validate` pass (ADR-0012 M5/M6,
138    /// ADR-0013).  Populated by `pipeline::job::finalize_validate_manifest`
139    /// after `finalize_manifest` succeeds; `None` when the run targeted a
140    /// streaming destination, when `--validate` was not requested, or when
141    /// the run failed before any manifest could be written.
142    pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
143    /// Apply-time context (plan_id, --force usage, bypassed checks).
144    /// `None` when the run came from `rivet run` rather than `rivet apply`.
145    /// See [`ApplyContext`] and finding **F5** of the 0.7.5 audit.
146    pub apply_context: Option<ApplyContext>,
147    /// Structured event log for this run.  Answers the four DoD observability questions.
148    pub journal: RunJournal,
149}
150
151impl RunSummary {
152    pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
153        let run_id = format!(
154            "{}_{}",
155            plan.export_name,
156            chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
157        );
158        let mut journal = RunJournal::new(&run_id, &plan.export_name);
159        journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
160
161        ipc::emit_event(&ChildEvent::Started {
162            export_name: plan.export_name.clone(),
163            run_id: run_id.clone(),
164            mode: plan.strategy.mode_label().to_string(),
165            tuning_profile: plan.tuning_profile_label.clone(),
166            batch_size: plan.tuning.batch_size,
167        });
168
169        Self {
170            run_id,
171            export_name: plan.export_name.clone(),
172            status: "running".into(),
173            total_rows: 0,
174            files_produced: 0,
175            bytes_written: 0,
176            files_committed: 0,
177            duration_ms: 0,
178            peak_rss_mb: 0,
179            retries: 0,
180            validated: None,
181            schema_changed: None,
182            quality_passed: None,
183            error_message: None,
184            tuning_profile: plan.tuning_profile_label.clone(),
185            batch_size: plan.tuning.batch_size,
186            batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
187            format: plan.format.label().to_string(),
188            mode: plan.strategy.mode_label().to_string(),
189            compression: plan.compression.label().to_string(),
190            destination_uri: (!matches!(
191                plan.destination.destination_type,
192                crate::config::DestinationType::Stdout
193            ))
194            .then(|| crate::pipeline::finalize::destination_uri_for_manifest(&plan.destination)),
195            pg_temp_bytes_delta: None,
196            skip_reason: None,
197            source_count: None,
198            reconciled: None,
199            manifest_parts: Vec::new(),
200            schema_fingerprint: None,
201            manifest_verification: None,
202            apply_context: None,
203            journal,
204        }
205    }
206
207    /// One canonical builder for tests across the crate + integration suite.
208    ///
209    /// Every field is filled with a sensible default; callers tweak only what
210    /// the test cares about via the chainable setters below.  This replaces
211    /// the seven copies of `stub_summary` / `fresh_summary` / `make_summary`
212    /// / `dummy_summary` / `empty_summary` that used to live in `notify.rs`,
213    /// `report.rs`, `chunked/{exec,mod}.rs`, `manifest_writer.rs`, and the
214    /// two integration-test files.  When `RunSummary` gains a field, it is
215    /// updated here once instead of across nine sites.
216    ///
217    /// Available outside `pipeline` (`pub` not `pub(crate)`) so integration
218    /// tests in `tests/` can use it via `RunSummary::stub_for_testing(...)`.
219    /// The `_for_testing` suffix is the convention from elsewhere in the
220    /// codebase (`destination_for_tests`, etc.) — production code should
221    /// never call it.
222    ///
223    /// `#[allow(dead_code)]` on each helper because the bin target's
224    /// dead-code analysis doesn't see uses from integration tests in `tests/`.
225    /// The lib's unit tests + the integration suite together exercise every
226    /// helper; the attribute is a no-op for them.
227    #[doc(hidden)]
228    #[allow(dead_code)]
229    pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
230        let run_id = run_id.into();
231        let export_name = export_name.into();
232        let journal = RunJournal::new(&run_id, &export_name);
233        Self {
234            run_id,
235            export_name,
236            status: "running".into(),
237            total_rows: 0,
238            files_produced: 0,
239            bytes_written: 0,
240            files_committed: 0,
241            duration_ms: 0,
242            peak_rss_mb: 0,
243            retries: 0,
244            validated: None,
245            schema_changed: None,
246            quality_passed: None,
247            error_message: None,
248            tuning_profile: "balanced".into(),
249            batch_size: 1000,
250            batch_size_memory_mb: None,
251            format: "parquet".into(),
252            mode: "snapshot".into(),
253            compression: "zstd".into(),
254            destination_uri: None,
255            pg_temp_bytes_delta: None,
256            skip_reason: None,
257            source_count: None,
258            reconciled: None,
259            manifest_parts: Vec::new(),
260            schema_fingerprint: None,
261            manifest_verification: None,
262            apply_context: None,
263            journal,
264        }
265    }
266
267    /// Test-only chainable setter for the run's status field.
268    ///
269    /// Used to build success/failed/running variants without re-listing every
270    /// field.  Keeps the journal in sync: terminal statuses get a matching
271    /// `RunCompleted` event recorded so consumers reading
272    /// `journal.final_outcome()` see the right shape.
273    #[doc(hidden)]
274    #[allow(dead_code)]
275    pub fn with_status(mut self, status: impl Into<String>) -> Self {
276        let s = status.into();
277        if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
278            self.journal.record(RunEvent::RunCompleted {
279                status: s.clone(),
280                error_message: self.error_message.clone(),
281                duration_ms: self.duration_ms,
282            });
283        }
284        self.status = s;
285        self
286    }
287
288    /// Test-only setter — record `files_committed` so resume-hint logic
289    /// (`pipeline::report`) can detect the "failed run with committed files"
290    /// path that produces a resume command.
291    #[doc(hidden)]
292    #[allow(dead_code)]
293    pub fn with_files_committed(mut self, n: usize) -> Self {
294        self.files_committed = n;
295        self
296    }
297
298    /// Test-only setter — replace the recorded manifest parts (and adjust
299    /// `total_rows` / `bytes_written` / `files_produced` to keep them
300    /// consistent with the parts list, the way real production code does).
301    #[doc(hidden)]
302    #[allow(dead_code)]
303    pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
304        self.total_rows = parts.iter().map(|p| p.rows).sum();
305        self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
306        self.files_produced = parts.len();
307        self.files_committed = parts.len();
308        self.manifest_parts = parts;
309        self
310    }
311
312    /// Test-only setter — error_message + (optionally) status.  Common
313    /// shape for the "failed-run" fixtures that populated 4+ existing
314    /// stubs.
315    #[doc(hidden)]
316    #[allow(dead_code)]
317    pub fn with_error(mut self, msg: impl Into<String>) -> Self {
318        self.error_message = Some(msg.into());
319        self
320    }
321
322    /// Test-only setter — record a PlanResolved event in the journal so
323    /// downstream observability paths (`journal.plan_snapshot()`,
324    /// `RunReport::from_summary` plan_origin lookup) see the same shape
325    /// they would on a real run.
326    #[doc(hidden)]
327    #[allow(dead_code)]
328    pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
329        self.journal.record(RunEvent::PlanResolved(snap));
330        self
331    }
332
333    pub(super) fn print(&self) {
334        // Capturing mode (IPC child or in-process channel): emit a
335        // `Finished` event and let the unified UI thread render the card.
336        // No stderr block here — the renderer owns the screen.
337        if ipc::capturing_events() {
338            ipc::emit_event(&ChildEvent::Finished {
339                export_name: self.export_name.clone(),
340                run_id: self.run_id.clone(),
341                status: self.status.clone(),
342                total_rows: self.total_rows,
343                files_produced: self.files_produced as u64,
344                bytes_written: self.bytes_written,
345                duration_ms: self.duration_ms,
346                peak_rss_mb: self.peak_rss_mb,
347                error_message: self.error_message.clone(),
348            });
349            return;
350        }
351
352        self.print_stderr_block();
353    }
354
355    /// Write the per-export summary block to stderr, bypassing IPC capture.
356    /// Used after the in-process card UI finishes on single-export sequential
357    /// runs so operators still get the detailed `── name ──` block below the
358    /// live card line.
359    pub(super) fn print_stderr_block(&self) {
360        let block = if multi_export_mode() {
361            self.render_compact()
362        } else {
363            // Render the whole block into a single buffer so the call site
364            // emits one `write_all` to stderr.  Without this, parallel
365            // exports could interleave individual lines from different
366            // `RunSummary::print()` calls — visible as garbled blocks in
367            // `--parallel-exports` runs.
368            self.render().trim_end_matches('\n').to_string()
369        };
370
371        use std::io::Write;
372        // V9 (CWE-150): the block embeds error_message, which can carry
373        // attacker-controlled ANSI/OSC escapes from a malicious source DB. The
374        // single-export path reaches the operator terminal here (the parallel
375        // renderer sanitises separately). Funnel the whole block through the
376        // shared sanitiser before write — it preserves the renderer's own
377        // multi-byte glyphs (✓/✗/──) and strips only C0/C1/DEL control bytes.
378        let mut buf = super::parent_ui::sanitize_terminal(&block);
379        buf.push('\n');
380        let stderr = std::io::stderr();
381        let mut handle = stderr.lock();
382        let _ = handle.write_all(buf.as_bytes());
383        let _ = handle.flush();
384    }
385
386    /// Compact one-line summary used when several exports run in the same
387    /// invocation.  Mirrors the parent_ui card line so `--parallel-exports`
388    /// (threads), sequential, and `--parallel-export-processes` (processes)
389    /// produce visually consistent per-export rows.
390    fn render_compact(&self) -> String {
391        const NAME_COL: usize = 22;
392        const MODE_COL: usize = 8;
393        let icon = match self.status.as_str() {
394            "success" => "✓",
395            "failed" => "✗",
396            _ => "•",
397        };
398        let body = if self.status == "failed" {
399            let err = self
400                .error_message
401                .as_deref()
402                .unwrap_or("(no error message recorded)");
403            let (cause, _) = strip_chunked_recovery_hint(err);
404            // Collapse multi-line / extremely long errors so the compact
405            // line stays one row tall.  Full payload lives in the stderr
406            // log above the run summary.
407            compact_error(cause)
408        } else {
409            let rss = if self.peak_rss_mb > 0 {
410                format!("  RSS {} MB", fmt_thousands(self.peak_rss_mb))
411            } else {
412                String::new()
413            };
414            format!(
415                "{} rows  {} files  {}  {}{}",
416                fmt_thousands(self.total_rows),
417                fmt_thousands(self.files_produced as i64),
418                format_bytes(self.bytes_written),
419                fmt_duration_ms(self.duration_ms),
420                rss
421            )
422        };
423        format!(
424            "{} {:<name$}  {:<mode$}  {}",
425            icon,
426            self.export_name,
427            self.mode,
428            body,
429            name = NAME_COL,
430            mode = MODE_COL,
431        )
432    }
433
434    /// Build the block as a string.  Public to the module so tests can assert
435    /// formatting without capturing stderr.
436    fn render(&self) -> String {
437        // Adaptive layout: collect (label, value) pairs that actually apply to
438        // this run, then pad labels to the longest one so columns line up
439        // *within* the block.  Header is a fixed width so consecutive blocks
440        // look uniform regardless of which optional fields are present.
441        let mut rows: Vec<(&'static str, String)> = Vec::with_capacity(16);
442        rows.push(("run_id", self.run_id.clone()));
443        let status_value = match (&self.status, &self.skip_reason) {
444            (s, Some(reason)) if s == "skipped" => format!("{s} ({reason})"),
445            (s, _) => s.clone(),
446        };
447        rows.push(("status", status_value));
448
449        let tuning_value = match self.batch_size_memory_mb {
450            Some(mem) => format!(
451                "profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
452                self.tuning_profile,
453                fmt_thousands(self.batch_size as i64),
454                mem
455            ),
456            None => format!(
457                "profile={}, batch_size={}",
458                self.tuning_profile,
459                fmt_thousands(self.batch_size as i64)
460            ),
461        };
462        rows.push(("tuning", tuning_value));
463
464        rows.push(("rows", fmt_thousands(self.total_rows)));
465        rows.push(("files", fmt_thousands(self.files_produced as i64)));
466        // Where the files landed — so a newcomer isn't left guessing after a
467        // successful export. Only when we actually wrote somewhere addressable
468        // (skips stdout and 0-file skips).
469        if self.files_produced > 0
470            && let Some(uri) = &self.destination_uri
471        {
472            rows.push(("output", uri.clone()));
473        }
474        // On a 0-new incremental run, `0 rows  0 files` alone hides *why*
475        // nothing moved. Surface the cursor position as its own line so the
476        // operator sees the incremental boundary held — not just an empty run.
477        // The cursor *value* lives in the runner and isn't plumbed onto the
478        // summary yet, so this reports the column-level position derived from
479        // `skip_reason` (the value is a follow-up once it reaches here).
480        if let Some(pos) = incremental_position_line(self.skip_reason.as_deref()) {
481            rows.push(("cursor", pos));
482        } else if let Some(window) = time_window_skip_line(&self.mode, self.skip_reason.as_deref())
483        {
484            // A time_window run that returned 0 rows reports the generic
485            // `"source returned 0 rows"` skip (`cursor_column()` is `None` for
486            // this strategy), so the incremental branch above never fires. Add
487            // an explicit `window:` line so the operator can tell an *empty
488            // window* apart from a *wrong column / window* — otherwise the
489            // summary is indistinguishable from any other empty run. The window
490            // column / days / computed bound are not plumbed onto `RunSummary`,
491            // so this is the strategy-level signal reachable here (the concrete
492            // bound is a follow-up once the runner records it on the summary).
493            rows.push(("window", window));
494        }
495        if self.bytes_written > 0 {
496            rows.push(("bytes", format_bytes(self.bytes_written)));
497        }
498        rows.push(("duration", fmt_duration_ms(self.duration_ms)));
499
500        if self.peak_rss_mb > 0 {
501            rows.push((
502                "peak RSS",
503                format!(
504                    "{} MB (sampled during run)",
505                    fmt_thousands(self.peak_rss_mb)
506                ),
507            ));
508        }
509        if let Some(temp) = self.pg_temp_bytes_delta {
510            // Skip when the cluster reported no delta — only chatter when there
511            // was actual spill. > 100 MB is annotated with a tuning hint;
512            // smaller numbers are reported as plain info.
513            if temp > 0 {
514                let temp_mb = temp as f64 / (1024.0 * 1024.0);
515                let label = if temp > 100 * 1024 * 1024 {
516                    format!(
517                        "{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
518                        temp_mb
519                    )
520                } else {
521                    format!("{:.1} MB", temp_mb)
522                };
523                rows.push(("pg temp spill", label));
524            }
525        }
526        if self.format == "parquet" && self.compression != "zstd" {
527            rows.push(("compression", self.compression.clone()));
528        }
529        if self.retries > 0 {
530            rows.push(("retries", self.retries.to_string()));
531        }
532        if let Some(v) = self.validated {
533            rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
534        }
535        if let Some(sc) = self.schema_changed {
536            rows.push((
537                "schema",
538                if sc {
539                    "CHANGED".into()
540                } else {
541                    "unchanged".into()
542                },
543            ));
544        }
545        if let Some(q) = self.quality_passed {
546            rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
547        }
548        if let Some(reconciled) = self.reconciled {
549            let src = self
550                .source_count
551                .map(fmt_thousands)
552                .unwrap_or_else(|| "?".into());
553            let exported = fmt_thousands(self.total_rows);
554            let value = if reconciled {
555                format!("MATCH ({exported}/{src})")
556            } else {
557                format!("MISMATCH (exported {exported} vs source {src})")
558            };
559            rows.push(("reconcile", value));
560        }
561        if let Some(err) = &self.error_message {
562            // Preserve the error's own line structure: the detailed block has
563            // room for it and `format_block` now indents continuation lines
564            // under the value column. Flattening to `"; "`-joined text (the
565            // compact one-liner's job) made multi-line errors — e.g. a quality
566            // failure's `failed:\n  - <check>\n  Fix …` — hard to read here.
567            rows.push(("error", err.trim_end().to_string()));
568        }
569
570        format_block(&self.export_name, &rows)
571    }
572
573    /// Sanity-check the post-run summary ↔ manifest_parts coherence. Used as
574    /// a `debug_assert!`-style runtime gate from `finalize_manifest` so any
575    /// future runner that bumps `bytes_written` / `files_committed` /
576    /// `files_produced` without going through `pipeline::commit::record_part`
577    /// is caught the moment it finishes a real export. Compiled out in
578    /// release builds via the `cfg!(debug_assertions)` guard at the call site.
579    ///
580    /// **Resume-safe inequalities only**: on resume, `manifest_parts` carries
581    /// prior runs' parts via `chunked::resume_m8` while `bytes_written` /
582    /// `files_committed` reflect only the current invocation — so strict
583    /// equality is wrong across resume boundaries. Strict equality on the
584    /// non-resume path is pinned by `pipeline::commit::tests`.
585    ///
586    /// Returns `Ok(())` when the summary satisfies the invariants, else an
587    /// `Err(String)` naming which one was violated and by how much.
588    pub fn check_post_run_invariants(&self) -> Result<(), String> {
589        let parts_bytes: u64 = self.manifest_parts.iter().map(|p| p.size_bytes).sum();
590
591        if self.files_committed > self.manifest_parts.len() {
592            return Err(format!(
593                "summary.files_committed ({}) > manifest_parts.len() ({}) — \
594                 a runner bumped files_committed without commit::record_part",
595                self.files_committed,
596                self.manifest_parts.len()
597            ));
598        }
599        if self.files_produced > self.manifest_parts.len() {
600            return Err(format!(
601                "summary.files_produced ({}) > manifest_parts.len() ({}) — \
602                 a runner bumped files_produced without commit::record_part",
603                self.files_produced,
604                self.manifest_parts.len()
605            ));
606        }
607        if self.bytes_written > parts_bytes {
608            return Err(format!(
609                "summary.bytes_written ({}) > sum(manifest_parts.size_bytes) ({}) — \
610                 a runner bumped bytes_written without commit::record_part",
611                self.bytes_written, parts_bytes
612            ));
613        }
614        if self.status == "success" && self.files_committed > 0 && self.manifest_parts.is_empty() {
615            return Err(format!(
616                "success run with files_committed={} has empty manifest_parts — \
617                 cloud manifest (ADR-0012 M1) would ship with no part list \
618                 (this is the gap parallel_checkpoint had before commit e9b0796)",
619                self.files_committed
620            ));
621        }
622        // Invariant audit gap #1, weak form: a successful run that produced
623        // rows for THIS invocation must have committed at least one file.
624        // The strict form ("rows_written <= rows_read") would require a
625        // separate source-side row counter we do not track, and concurrent
626        // INSERTs on the source (live_oltp_load) make a source_count
627        // comparison brittle. This weak form catches a fabrication shape:
628        // total_rows accumulated but nothing reached the destination — a
629        // runner that fetched and silently dropped rows produces exactly
630        // this signature. Resume-safe: total_rows reflects only this
631        // invocation, so a resume with no work to do legitimately ends at
632        // total_rows=0 / files_committed=0 and the guard does not fire.
633        if self.status == "success" && self.total_rows > 0 && self.files_committed == 0 {
634            return Err(format!(
635                "summary.total_rows={} but files_committed=0 — rows extracted from \
636                 source but no files committed (no output reached the destination)",
637                self.total_rows
638            ));
639        }
640        Ok(())
641    }
642}
643
644/// Reduce a possibly-multi-line execution error to a single-line, bounded-
645/// length cause suitable for the per-export summary block and the compact
646/// one-liner.  Keeps the user-actionable bit and drops noisy diagnostic
647/// payloads (long URLs, query strings, repeated chunk errors).
648///
649/// Recognised shapes:
650/// - `parallel checkpoint worker errors:\nchunk N: <msg>\nchunk M: <msg>` →
651///   `parallel checkpoint workers failed: K chunk(s) (chunk N: <truncated>)`.
652///   The full per-chunk detail is already in stderr logs.
653/// - Generic multi-line: newlines are replaced with `; ` and the result is
654///   clamped to 240 characters with an ellipsis.
655fn compact_error(raw: &str) -> String {
656    const MAX_CHARS: usize = 240;
657    if let Some(summary) = summarize_parallel_chunk_errors(raw) {
658        return clamp_chars(&summary, MAX_CHARS);
659    }
660    let collapsed: String = raw
661        .lines()
662        .map(str::trim_end)
663        .filter(|s| !s.is_empty())
664        .collect::<Vec<_>>()
665        .join("; ");
666    clamp_chars(&collapsed, MAX_CHARS)
667}
668
669/// Derive the summary block's `cursor:` line from a `skip_reason`.
670///
671/// `skip_reason` for an incremental no-op is `"no new rows since cursor
672/// '<col>'"` (set by the runner); we lift the column out and report the
673/// position as held. Returns `None` for the non-cursor `"source returned 0
674/// rows"` skip and for `None` (a run that actually produced rows). The cursor
675/// *value* isn't carried on the summary yet, so this is column-level only.
676fn incremental_position_line(skip_reason: Option<&str>) -> Option<String> {
677    let col = skip_reason?
678        .strip_prefix("no new rows since cursor '")?
679        .strip_suffix('\'')?;
680    Some(format!("'{col}' unchanged (no new rows this run)"))
681}
682
683/// Derive the summary block's `window:` line for a time_window run that
684/// returned nothing.
685///
686/// A `TimeWindow` strategy has no cursor column, so a 0-row run reports the
687/// generic `"source returned 0 rows"` skip — the `incremental_position_line`
688/// branch never fires and, without this line, an empty time window looks
689/// identical to any other empty export. Surfacing it lets the operator tell an
690/// *empty window* (data simply outside the rolling range) from a *misconfigured
691/// window* (wrong `time_column` / `days_window`).
692///
693/// Keyed on `mode == "timewindow"` (set from `ExtractionStrategy::mode_label`)
694/// plus a set skip reason, so it only fires on a skipped time_window run and
695/// never on incremental/snapshot/chunked/keyset. The window column, days, and
696/// computed lower bound are not carried on `RunSummary`, so this reports the
697/// strategy-level fact and where to look — the concrete bound is a follow-up
698/// once the runner records it onto the summary.
699fn time_window_skip_line(mode: &str, skip_reason: Option<&str>) -> Option<String> {
700    skip_reason?;
701    if mode != "timewindow" {
702        return None;
703    }
704    Some("rolling time window matched no rows — check `time_column`/`days_window`".to_string())
705}
706
707fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
708    let header_pos = raw.find("parallel checkpoint worker errors:")?;
709    let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
710    let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
711
712    let chunk_lines: Vec<&str> = tail
713        .lines()
714        .map(str::trim)
715        .filter(|l| l.starts_with("chunk "))
716        .collect();
717    if chunk_lines.is_empty() {
718        return None;
719    }
720    let first_chunk_full = chunk_lines[0];
721    // Truncate the example chunk message; the URL/payload is in stderr logs.
722    let first_chunk_short = clamp_chars(first_chunk_full, 140);
723    let prefix = if prefix.is_empty() {
724        String::new()
725    } else {
726        format!("{}: ", prefix)
727    };
728    Some(format!(
729        "{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
730        prefix,
731        chunk_lines.len(),
732        first_chunk_short
733    ))
734}
735
736fn clamp_chars(s: &str, max_chars: usize) -> String {
737    if max_chars == 0 {
738        return String::new();
739    }
740    if s.chars().count() <= max_chars {
741        return s.to_string();
742    }
743    let keep = max_chars.saturating_sub(1);
744    let mut out: String = s.chars().take(keep).collect();
745    out.push('…');
746    out
747}
748
749/// Render a `── name ─────…─` header plus one indented `label:  value` line
750/// per row, all joined into a single string ending with `\n`.
751fn format_block(name: &str, rows: &[(&str, String)]) -> String {
752    const HEADER_WIDTH: usize = 60;
753    let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
754
755    let prefix = format!("── {} ", name);
756    let prefix_chars = prefix.chars().count();
757    let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
758    let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
759    out.push('\n');
760    out.push_str(&prefix);
761    for _ in 0..dashes {
762        out.push('─');
763    }
764    out.push('\n');
765    // Continuation lines of a multi-line value (e.g. the multi-line `error`
766    // row) are indented to align under the value column, so block-shaped
767    // messages stay readable instead of being flattened onto one line.
768    let value_indent = " ".repeat(2 + (label_w + 1) + 2);
769    for (label, value) in rows {
770        // `label_w + 1` so the colon stays attached to the label and the
771        // value column starts uniformly two spaces after it.
772        let mut lines = value.split('\n');
773        let first = lines.next().unwrap_or("");
774        out.push_str(&format!(
775            "  {:<width$}  {}\n",
776            format!("{label}:"),
777            first,
778            width = label_w + 1
779        ));
780        for cont in lines {
781            out.push_str(&value_indent);
782            out.push_str(cont);
783            out.push('\n');
784        }
785    }
786    out
787}
788
789fn fmt_duration_ms(ms: i64) -> String {
790    if ms < 1000 {
791        return format!("{}ms", ms);
792    }
793    let total_secs = ms / 1000;
794    let h = total_secs / 3600;
795    let m = (total_secs % 3600) / 60;
796    let s_frac = (ms % 60_000) as f64 / 1000.0;
797    if h > 0 {
798        format!("{}h {:02}m {:04.1}s", h, m, s_frac)
799    } else if m > 0 {
800        format!("{}m {:04.1}s", m, s_frac)
801    } else {
802        format!("{:.1}s", ms as f64 / 1000.0)
803    }
804}
805
806/// Format integers with a comma every three digits.  Negative values keep
807/// their sign.  Used for rows / files / batch_size so large numbers stay
808/// readable: `39_990_376` → `39,990,376`.
809fn fmt_thousands(n: i64) -> String {
810    let abs = n.unsigned_abs();
811    let s = abs.to_string();
812    let bytes = s.as_bytes();
813    let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
814    if n < 0 {
815        out.push('-');
816    }
817    for (i, b) in bytes.iter().enumerate() {
818        let from_end = bytes.len() - i;
819        if i > 0 && from_end.is_multiple_of(3) {
820            out.push(',');
821        }
822        out.push(*b as char);
823    }
824    out
825}
826
827#[cfg(test)]
828mod tests {
829    use super::*;
830
831    #[test]
832    fn fmt_thousands_handles_small_and_large() {
833        assert_eq!(fmt_thousands(0), "0");
834        assert_eq!(fmt_thousands(7), "7");
835        assert_eq!(fmt_thousands(999), "999");
836        assert_eq!(fmt_thousands(1_000), "1,000");
837        assert_eq!(fmt_thousands(1_000_908), "1,000,908");
838        assert_eq!(fmt_thousands(39_990_376), "39,990,376");
839        assert_eq!(fmt_thousands(-1_234), "-1,234");
840        assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
841    }
842
843    #[test]
844    fn fmt_duration_picks_unit() {
845        assert_eq!(fmt_duration_ms(0), "0ms");
846        assert_eq!(fmt_duration_ms(800), "800ms");
847        assert_eq!(fmt_duration_ms(1_500), "1.5s");
848        assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
849        assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
850    }
851
852    #[test]
853    fn format_block_pads_labels_uniformly() {
854        let rows = vec![
855            ("run_id", "abc".to_string()),
856            ("rows", "42".to_string()),
857            ("compression", "zstd".to_string()),
858        ];
859        let out = format_block("orders", &rows);
860
861        // Each value column starts at the same character position.
862        let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
863        assert_eq!(lines.len(), 3);
864        let value_starts: Vec<usize> = lines
865            .iter()
866            .map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
867            .collect();
868        // The value (after `label:` plus padding plus two spaces) starts at the
869        // same column for every row.  We verify by checking all lines have the
870        // value substring at the same byte offset.
871        let value_col = lines[0].rfind("abc").unwrap();
872        assert_eq!(lines[1].rfind("42").unwrap(), value_col);
873        assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
874        // Sanity: silence unused.
875        let _ = value_starts;
876    }
877
878    #[test]
879    fn format_block_header_has_consistent_width() {
880        let block_a = format_block("a", &[("rows", "1".into())]);
881        let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
882        let header_a = block_a.lines().nth(1).unwrap();
883        let header_b = block_b.lines().nth(1).unwrap();
884        assert_eq!(
885            header_a.chars().count(),
886            header_b.chars().count(),
887            "headers must be the same width regardless of name length: {:?} vs {:?}",
888            header_a,
889            header_b
890        );
891    }
892
893    #[test]
894    fn render_produces_a_single_string_with_trailing_newline() {
895        use crate::plan::{
896            CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
897            MetaColumns, ResolvedRunPlan,
898        };
899        use crate::tuning::SourceTuning;
900        let plan = ResolvedRunPlan {
901            export_name: "orders".into(),
902            base_query: "SELECT 1".into(),
903            strategy: ExtractionStrategy::Snapshot,
904            format: FormatType::Parquet,
905            compression: CompressionType::default(),
906            compression_level: None,
907            max_file_size_bytes: None,
908            skip_empty: false,
909            meta_columns: MetaColumns::default(),
910            destination: DestinationConfig {
911                destination_type: DestinationType::Local,
912                path: Some("./out".into()),
913                ..Default::default()
914            },
915            quality: None,
916            tuning: SourceTuning::from_config(None),
917            tuning_profile_label: "balanced (default)".into(),
918            validate: false,
919            reconcile: false,
920            resume: false,
921            source: crate::config::SourceConfig {
922                source_type: crate::config::SourceType::Postgres,
923                url: Some("postgresql://localhost/test".into()),
924                url_env: None,
925                url_file: None,
926                host: None,
927                port: None,
928                user: None,
929                password: None,
930                password_env: None,
931                database: None,
932                environment: None,
933                tuning: None,
934                tls: None,
935            },
936            column_overrides: Default::default(),
937            verify: crate::config::VerifyMode::Size,
938            schema_drift_policy: Default::default(),
939            shape_drift_warn_factor: 2.0,
940            parquet: None,
941        };
942        let mut s = RunSummary::new(&plan);
943        s.status = "success".into();
944        s.total_rows = 1_000_908;
945        s.files_produced = 11;
946        s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
947        s.duration_ms = 68_400;
948        s.peak_rss_mb = 884;
949
950        let block = s.render();
951        assert!(
952            block.starts_with('\n'),
953            "block should start with a blank line"
954        );
955        assert!(block.ends_with('\n'), "block should end with a newline");
956        assert!(block.contains("── orders "));
957        assert!(
958            block.contains("1,000,908"),
959            "rows should be formatted with thousands separator: {}",
960            block
961        );
962        assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
963        // No raw progress-bar bleed: header dashes still present, no carriage
964        // returns or escape sequences.
965        assert!(!block.contains('\r'));
966
967        // Compact one-liner used in multi-export runs.
968        let line = s.render_compact();
969        assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
970        assert!(line.contains("orders"), "export name present: {:?}", line);
971        assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
972        assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
973        assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
974        assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
975        assert!(!line.contains('\n'), "single line: {:?}", line);
976    }
977
978    #[test]
979    fn compact_error_summarises_parallel_chunk_errors() {
980        let raw = "export 'page_views': parallel checkpoint worker errors:\n\
981                   chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
982                   chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
983        let out = compact_error(raw);
984        assert!(
985            out.contains("2 chunk(s)"),
986            "should report number of failed chunks: {:?}",
987            out
988        );
989        assert!(
990            out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
991            "should keep export prefix and use compact phrasing: {:?}",
992            out
993        );
994        assert!(
995            out.contains("chunk 4:"),
996            "should include the first chunk as an example: {:?}",
997            out
998        );
999        assert!(!out.contains('\n'), "single line output: {:?}", out);
1000        assert!(
1001            out.chars().count() <= 240,
1002            "must be clamped to <=240 chars, got {}: {:?}",
1003            out.chars().count(),
1004            out
1005        );
1006    }
1007
1008    #[test]
1009    fn compact_error_collapses_generic_multiline() {
1010        let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
1011        let out = compact_error(raw);
1012        assert_eq!(
1013            out, "first line of trouble; second line with detail; third line",
1014            "newlines should collapse to '; ' and blanks dropped"
1015        );
1016    }
1017
1018    #[test]
1019    fn compact_error_clamps_excessively_long_lines() {
1020        let raw = "x".repeat(1_000);
1021        let out = compact_error(&raw);
1022        assert_eq!(out.chars().count(), 240);
1023        assert!(out.ends_with('…'));
1024    }
1025
1026    #[test]
1027    fn render_compact_strips_chunked_recovery_hint_for_failed() {
1028        use crate::plan::{
1029            CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1030            MetaColumns, ResolvedRunPlan,
1031        };
1032        use crate::tuning::SourceTuning;
1033        let plan = ResolvedRunPlan {
1034            export_name: "events".into(),
1035            base_query: "SELECT 1".into(),
1036            strategy: ExtractionStrategy::Snapshot,
1037            format: FormatType::Parquet,
1038            compression: CompressionType::default(),
1039            compression_level: None,
1040            max_file_size_bytes: None,
1041            skip_empty: false,
1042            meta_columns: MetaColumns::default(),
1043            destination: DestinationConfig {
1044                destination_type: DestinationType::Local,
1045                path: Some("./out".into()),
1046                ..Default::default()
1047            },
1048            quality: None,
1049            tuning: SourceTuning::from_config(None),
1050            tuning_profile_label: "balanced (default)".into(),
1051            validate: false,
1052            reconcile: false,
1053            resume: false,
1054            source: crate::config::SourceConfig {
1055                source_type: crate::config::SourceType::Postgres,
1056                url: Some("postgresql://localhost/test".into()),
1057                url_env: None,
1058                url_file: None,
1059                host: None,
1060                port: None,
1061                user: None,
1062                password: None,
1063                password_env: None,
1064                database: None,
1065                environment: None,
1066                tuning: None,
1067                tls: None,
1068            },
1069            column_overrides: Default::default(),
1070            verify: crate::config::VerifyMode::Size,
1071            schema_drift_policy: Default::default(),
1072            shape_drift_warn_factor: 2.0,
1073            parquet: None,
1074        };
1075        let mut s = RunSummary::new(&plan);
1076        s.status = "failed".into();
1077        s.error_message = Some(
1078            "export 'events': --resume but no in-progress chunk checkpoint; \
1079             run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
1080                .to_string(),
1081        );
1082
1083        let line = s.render_compact();
1084        assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
1085        assert!(line.contains("events"), "name present: {:?}", line);
1086        assert!(
1087            line.contains("--resume but no in-progress chunk checkpoint"),
1088            "cause kept: {:?}",
1089            line
1090        );
1091        assert!(
1092            !line.contains("rivet state reset-chunks"),
1093            "recovery hint should be stripped from per-export line: {:?}",
1094            line
1095        );
1096        assert!(!line.contains('\n'), "single line: {:?}", line);
1097    }
1098
1099    fn plan_for(export_name: &str) -> crate::plan::ResolvedRunPlan {
1100        use crate::plan::{
1101            CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1102            MetaColumns, ResolvedRunPlan,
1103        };
1104        use crate::tuning::SourceTuning;
1105        ResolvedRunPlan {
1106            export_name: export_name.into(),
1107            base_query: "SELECT 1".into(),
1108            strategy: ExtractionStrategy::Snapshot,
1109            format: FormatType::Parquet,
1110            compression: CompressionType::default(),
1111            compression_level: None,
1112            max_file_size_bytes: None,
1113            skip_empty: false,
1114            meta_columns: MetaColumns::default(),
1115            destination: DestinationConfig {
1116                destination_type: DestinationType::Local,
1117                path: Some("./out".into()),
1118                ..Default::default()
1119            },
1120            quality: None,
1121            tuning: SourceTuning::from_config(None),
1122            tuning_profile_label: "balanced (default)".into(),
1123            validate: false,
1124            reconcile: false,
1125            resume: false,
1126            source: crate::config::SourceConfig {
1127                source_type: crate::config::SourceType::Postgres,
1128                url: Some("postgresql://localhost/test".into()),
1129                url_env: None,
1130                url_file: None,
1131                host: None,
1132                port: None,
1133                user: None,
1134                password: None,
1135                password_env: None,
1136                database: None,
1137                environment: None,
1138                tuning: None,
1139                tls: None,
1140            },
1141            column_overrides: Default::default(),
1142            verify: crate::config::VerifyMode::Size,
1143            schema_drift_policy: Default::default(),
1144            shape_drift_warn_factor: 2.0,
1145            parquet: None,
1146        }
1147    }
1148
1149    #[test]
1150    fn render_preserves_multiline_error_block() {
1151        // L19: a multi-line error (a quality failure here) must stay multi-line
1152        // in the detailed single-export block — not collapsed to `"; "`-joined
1153        // text the way the compact one-liner does.
1154        let mut s = RunSummary::new(&plan_for("orders"));
1155        s.status = "failed".into();
1156        s.error_message = Some(
1157            "export 'orders': 1 quality check(s) failed:\n  \
1158             - row_count 10 below minimum 999999\n  \
1159             Fix the source data, or adjust the thresholds under `quality:` in your config."
1160                .to_string(),
1161        );
1162
1163        let block = s.render();
1164        // The collapsed form joined lines with `"; "` — assert that flattening
1165        // is gone and the original newline structure survives.
1166        assert!(
1167            !block.contains("failed:;"),
1168            "error must not be '; '-flattened in the detailed block: {block}"
1169        );
1170        assert!(
1171            block.contains("- row_count 10 below minimum 999999"),
1172            "failing check line present: {block}"
1173        );
1174        // Each part of the multi-line error lands on its own line.
1175        let err_lines: Vec<&str> = block
1176            .lines()
1177            .filter(|l| {
1178                l.contains("quality check(s) failed")
1179                    || l.contains("row_count 10 below minimum")
1180                    || l.contains("Fix the source data")
1181            })
1182            .collect();
1183        assert_eq!(
1184            err_lines.len(),
1185            3,
1186            "all three error lines should render on separate lines: {block}"
1187        );
1188        // Continuation lines are indented under the value column, not at col 0.
1189        for l in &err_lines {
1190            assert!(l.starts_with(' '), "error line should be indented: {l:?}");
1191        }
1192    }
1193
1194    #[test]
1195    fn render_surfaces_cursor_position_on_zero_new_incremental() {
1196        // L27: a 0-new incremental run shows `0 rows  0 files`; without a
1197        // cursor line the operator can't tell the boundary held. Assert the
1198        // dedicated `cursor:` line appears, derived from `skip_reason`.
1199        let mut s = RunSummary::new(&plan_for("orders"));
1200        s.status = "skipped".into();
1201        s.skip_reason = Some("no new rows since cursor 'updated_at'".into());
1202
1203        let block = s.render();
1204        let cursor_line = block
1205            .lines()
1206            .find(|l| l.trim_start().starts_with("cursor:"))
1207            .unwrap_or_else(|| panic!("expected a cursor: line in block: {block}"));
1208        assert!(
1209            cursor_line.contains("'updated_at'"),
1210            "cursor line names the column: {cursor_line:?}"
1211        );
1212        assert!(
1213            cursor_line.contains("unchanged"),
1214            "cursor line reports the position held: {cursor_line:?}"
1215        );
1216    }
1217
1218    #[test]
1219    fn incremental_position_line_only_for_cursor_skips() {
1220        // The non-cursor 0-row skip and the no-skip case produce no cursor line.
1221        assert_eq!(
1222            incremental_position_line(Some("no new rows since cursor 'ts'")),
1223            Some("'ts' unchanged (no new rows this run)".into())
1224        );
1225        assert_eq!(
1226            incremental_position_line(Some("source returned 0 rows")),
1227            None
1228        );
1229        assert_eq!(incremental_position_line(None), None);
1230    }
1231
1232    #[test]
1233    fn render_surfaces_window_position_on_zero_row_time_window() {
1234        // L27 (time_window arm): a 0-row time_window run reports the generic
1235        // `"source returned 0 rows"` skip (the strategy has no cursor column),
1236        // so the `cursor:` branch never fires. Without a `window:` line the
1237        // operator can't tell an empty window from a wrong column/window —
1238        // assert the dedicated `window:` line appears for this mode.
1239        let mut s = RunSummary::new(&plan_for("events"));
1240        s.status = "skipped".into();
1241        s.mode = "timewindow".into();
1242        s.skip_reason = Some("source returned 0 rows".into());
1243
1244        let block = s.render();
1245        let window_line = block
1246            .lines()
1247            .find(|l| l.trim_start().starts_with("window:"))
1248            .unwrap_or_else(|| panic!("expected a window: line in block: {block}"));
1249        assert!(
1250            window_line.contains("matched no rows"),
1251            "window line reports the empty window: {window_line:?}"
1252        );
1253        assert!(
1254            window_line.contains("time_column") && window_line.contains("days_window"),
1255            "window line points at the window config to check: {window_line:?}"
1256        );
1257        // The generic 0-row skip must not also produce a `cursor:` line.
1258        assert!(
1259            !block.lines().any(|l| l.trim_start().starts_with("cursor:")),
1260            "no cursor line for a non-cursor strategy: {block}"
1261        );
1262    }
1263
1264    #[test]
1265    fn time_window_skip_line_only_for_skipped_time_window() {
1266        // Fires only when the run skipped AND the strategy is time_window.
1267        assert_eq!(
1268            time_window_skip_line("timewindow", Some("source returned 0 rows")),
1269            Some("rolling time window matched no rows — check `time_column`/`days_window`".into())
1270        );
1271        // Wrong mode → no window line (incremental/snapshot handle their own).
1272        assert_eq!(
1273            time_window_skip_line("incremental", Some("source returned 0 rows")),
1274            None
1275        );
1276        assert_eq!(
1277            time_window_skip_line("full", Some("source returned 0 rows")),
1278            None
1279        );
1280        // A time_window run that produced rows (no skip) gets no window line.
1281        assert_eq!(time_window_skip_line("timewindow", None), None);
1282    }
1283}