Skip to main content

rivet/pipeline/
summary.rs

1//! **Layer: Observability**
2//!
3//! `RunSummary` is the single observability artifact for a pipeline run.
4//! It accumulates operational data during execution and is consumed by:
5//! - the end-of-run terminal output (`print`)
6//! - the metrics store (`state::record_metric`)
7//! - the notification system (`notify::maybe_send`)
8//!
9//! `RunSummary` is written to by execution modules (row counts, byte counts, retries)
10//! but it makes no execution decisions itself — it is a pure data accumulator.
11//!
12//! It embeds a `RunJournal` so that all pipeline modules — which already hold
13//! `&mut RunSummary` — can record structured events via `summary.journal.record()`
14//! without any signature changes.  In a future epic the relationship will invert:
15//! `RunSummary` will be derived from `RunJournal`.
16
17use super::ipc::{self, ChildEvent};
18use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
19use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
20use crate::manifest::ManifestPart;
21use crate::plan::ResolvedRunPlan;
22
23/// Build a `PlanSnapshot` from a `ResolvedRunPlan`.
24///
25/// Lives here rather than on `journal` itself so that the journal module
26/// stays free of plan/pipeline dependencies (avoids the state→pipeline cycle
27/// we used to have via `state::journal_store`).
28fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
29    PlanSnapshot {
30        export_name: plan.export_name.clone(),
31        base_query: plan.base_query.clone(),
32        strategy: plan.strategy.mode_label().to_string(),
33        format: plan.format.label().to_string(),
34        compression: plan.compression.label().to_string(),
35        destination_type: plan.destination.destination_type.label().to_string(),
36        tuning_profile: plan.tuning_profile_label.clone(),
37        batch_size: plan.tuning.batch_size,
38        validate: plan.validate,
39        reconcile: plan.reconcile,
40        resume: plan.resume,
41    }
42}
43
44/// Context recorded when a run was launched via `rivet apply` rather than
45/// `rivet run`.  Provides the audit trail for **F5**: which plan artifact
46/// was applied, whether `--force` was passed, and which preflight checks
47/// `--force` actually bypassed.
48///
49/// `None` on `RunSummary` means the run came from `rivet run` (no plan
50/// artifact was applied).
51#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct ApplyContext {
53    /// `plan_id` from the applied `PlanArtifact`.
54    pub plan_id: String,
55    /// Was `--force` passed to `rivet apply`?
56    pub forced: bool,
57    /// Names of preflight checks that `--force` actually overrode for this
58    /// run.  Possible values: `"staleness"`, `"cursor_drift"`.  Empty when
59    /// `forced` is true but no check actually had to be bypassed (the plan
60    /// was fresh and the cursor matched).
61    pub force_bypassed: Vec<String>,
62}
63
64/// Accumulates operational data during a pipeline run for summary and metrics.
65///
66/// The embedded `journal` is the structured event log for this run.  Use
67/// `summary.journal.record(event)` at any call site that already holds
68/// `&mut RunSummary`.
69#[derive(Debug, Clone)]
70pub struct RunSummary {
71    pub run_id: String,
72    pub export_name: String,
73    pub status: String,
74    pub total_rows: i64,
75    pub files_produced: usize,
76    pub bytes_written: u64,
77    /// Incremented after each successful `dest.write()`. Non-zero means a previous
78    /// attempt already committed data — retrying from the same cursor would duplicate rows.
79    pub files_committed: usize,
80    pub duration_ms: i64,
81    pub peak_rss_mb: i64,
82    pub retries: u32,
83    pub validated: Option<bool>,
84    pub schema_changed: Option<bool>,
85    pub quality_passed: Option<bool>,
86    pub error_message: Option<String>,
87    /// `profile` from YAML, or `balanced (default)` if omitted.
88    pub tuning_profile: String,
89    /// Configured `batch_size` from YAML/profile (FETCH cap before `batch_size_memory_mb` override).
90    pub batch_size: usize,
91    /// When set, actual FETCH size is derived from schema (see logs).
92    pub batch_size_memory_mb: Option<usize>,
93    pub format: String,
94    pub mode: String,
95    pub compression: String,
96    /// Postgres `pg_stat_database.temp_bytes` delta around the run. `None` for
97    /// non-Postgres sources or when the snapshot probe failed (no admin perms
98    /// not required — the view is readable by any role). When set and large,
99    /// indicates cursor / sort spill to `pgsql_tmp/` — the safe action is to
100    /// shrink `tuning.batch_size` or set `tuning.batch_size_memory_mb` below
101    /// PG's `work_mem`.
102    pub pg_temp_bytes_delta: Option<i64>,
103    /// Human-readable parenthetical attached to `status: skipped` so the
104    /// operator knows *why* there was nothing to export this run (e.g.
105    /// `"no new rows since cursor 'updated_at'"`). Always `None` when
106    /// `status != "skipped"`. Surfaced in the console summary card as
107    /// `status: skipped (<reason>)`.
108    pub skip_reason: Option<String>,
109    /// Source COUNT(*) result for reconciliation (None = not requested or not applicable).
110    pub source_count: Option<i64>,
111    /// Whether reconciliation passed (Some(true) = match, Some(false) = mismatch, None = skipped).
112    pub reconciled: Option<bool>,
113    /// Committed parts accumulated during the run, in commit order.  Populated by
114    /// `pipeline::manifest_writer::record_committed_part` at each `dest.write`
115    /// site (ADR-0012 M1 — Parts Before Manifest).  Drained at finalize into a
116    /// `RunManifest` by [`crate::pipeline::manifest_writer::write_manifest`].
117    pub manifest_parts: Vec<ManifestPart>,
118    /// xxh3 fingerprint of the dest-facing column schema for this run, in the
119    /// canonical `xxh3:<16-hex>` form produced by [`crate::state::schema_fingerprint`].
120    ///
121    /// Recorded by [`crate::pipeline::manifest_writer::record_run_schema_fingerprint`]
122    /// the first time the sink has resolved a schema (i.e. on the first batch
123    /// of any chunk).  Idempotent within a run — the schema is identical across
124    /// chunks, so later writes are no-ops.
125    ///
126    /// `finalize_manifest` reads this directly so the manifest's
127    /// `schema_fingerprint` no longer depends on the per-export schema row
128    /// happening to land in `state` before the manifest write.  The state
129    /// lookup remains a fallback for resume scenarios where the summary was
130    /// reconstructed without ever seeing a live schema.
131    pub schema_fingerprint: Option<String>,
132    /// Result of the manifest-aware `--validate` pass (ADR-0012 M5/M6,
133    /// ADR-0013).  Populated by `pipeline::job::finalize_validate_manifest`
134    /// after `finalize_manifest` succeeds; `None` when the run targeted a
135    /// streaming destination, when `--validate` was not requested, or when
136    /// the run failed before any manifest could be written.
137    pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
138    /// Apply-time context (plan_id, --force usage, bypassed checks).
139    /// `None` when the run came from `rivet run` rather than `rivet apply`.
140    /// See [`ApplyContext`] and finding **F5** of the 0.7.5 audit.
141    pub apply_context: Option<ApplyContext>,
142    /// Structured event log for this run.  Answers the four DoD observability questions.
143    pub journal: RunJournal,
144}
145
146impl RunSummary {
147    pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
148        let run_id = format!(
149            "{}_{}",
150            plan.export_name,
151            chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
152        );
153        let mut journal = RunJournal::new(&run_id, &plan.export_name);
154        journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
155
156        ipc::emit_event(&ChildEvent::Started {
157            export_name: plan.export_name.clone(),
158            run_id: run_id.clone(),
159            mode: plan.strategy.mode_label().to_string(),
160            tuning_profile: plan.tuning_profile_label.clone(),
161            batch_size: plan.tuning.batch_size,
162        });
163
164        Self {
165            run_id,
166            export_name: plan.export_name.clone(),
167            status: "running".into(),
168            total_rows: 0,
169            files_produced: 0,
170            bytes_written: 0,
171            files_committed: 0,
172            duration_ms: 0,
173            peak_rss_mb: 0,
174            retries: 0,
175            validated: None,
176            schema_changed: None,
177            quality_passed: None,
178            error_message: None,
179            tuning_profile: plan.tuning_profile_label.clone(),
180            batch_size: plan.tuning.batch_size,
181            batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
182            format: plan.format.label().to_string(),
183            mode: plan.strategy.mode_label().to_string(),
184            compression: plan.compression.label().to_string(),
185            pg_temp_bytes_delta: None,
186            skip_reason: None,
187            source_count: None,
188            reconciled: None,
189            manifest_parts: Vec::new(),
190            schema_fingerprint: None,
191            manifest_verification: None,
192            apply_context: None,
193            journal,
194        }
195    }
196
197    /// One canonical builder for tests across the crate + integration suite.
198    ///
199    /// Every field is filled with a sensible default; callers tweak only what
200    /// the test cares about via the chainable setters below.  This replaces
201    /// the seven copies of `stub_summary` / `fresh_summary` / `make_summary`
202    /// / `dummy_summary` / `empty_summary` that used to live in `notify.rs`,
203    /// `report.rs`, `chunked/{exec,mod}.rs`, `manifest_writer.rs`, and the
204    /// two integration-test files.  When `RunSummary` gains a field, it is
205    /// updated here once instead of across nine sites.
206    ///
207    /// Available outside `pipeline` (`pub` not `pub(crate)`) so integration
208    /// tests in `tests/` can use it via `RunSummary::stub_for_testing(...)`.
209    /// The `_for_testing` suffix is the convention from elsewhere in the
210    /// codebase (`destination_for_tests`, etc.) — production code should
211    /// never call it.
212    ///
213    /// `#[allow(dead_code)]` on each helper because the bin target's
214    /// dead-code analysis doesn't see uses from integration tests in `tests/`.
215    /// The lib's unit tests + the integration suite together exercise every
216    /// helper; the attribute is a no-op for them.
217    #[doc(hidden)]
218    #[allow(dead_code)]
219    pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
220        let run_id = run_id.into();
221        let export_name = export_name.into();
222        let journal = RunJournal::new(&run_id, &export_name);
223        Self {
224            run_id,
225            export_name,
226            status: "running".into(),
227            total_rows: 0,
228            files_produced: 0,
229            bytes_written: 0,
230            files_committed: 0,
231            duration_ms: 0,
232            peak_rss_mb: 0,
233            retries: 0,
234            validated: None,
235            schema_changed: None,
236            quality_passed: None,
237            error_message: None,
238            tuning_profile: "balanced".into(),
239            batch_size: 1000,
240            batch_size_memory_mb: None,
241            format: "parquet".into(),
242            mode: "snapshot".into(),
243            compression: "zstd".into(),
244            pg_temp_bytes_delta: None,
245            skip_reason: None,
246            source_count: None,
247            reconciled: None,
248            manifest_parts: Vec::new(),
249            schema_fingerprint: None,
250            manifest_verification: None,
251            apply_context: None,
252            journal,
253        }
254    }
255
256    /// Test-only chainable setter for the run's status field.
257    ///
258    /// Used to build success/failed/running variants without re-listing every
259    /// field.  Keeps the journal in sync: terminal statuses get a matching
260    /// `RunCompleted` event recorded so consumers reading
261    /// `journal.final_outcome()` see the right shape.
262    #[doc(hidden)]
263    #[allow(dead_code)]
264    pub fn with_status(mut self, status: impl Into<String>) -> Self {
265        let s = status.into();
266        if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
267            self.journal.record(RunEvent::RunCompleted {
268                status: s.clone(),
269                error_message: self.error_message.clone(),
270                duration_ms: self.duration_ms,
271            });
272        }
273        self.status = s;
274        self
275    }
276
277    /// Test-only setter — record `files_committed` so resume-hint logic
278    /// (`pipeline::report`) can detect the "failed run with committed files"
279    /// path that produces a resume command.
280    #[doc(hidden)]
281    #[allow(dead_code)]
282    pub fn with_files_committed(mut self, n: usize) -> Self {
283        self.files_committed = n;
284        self
285    }
286
287    /// Test-only setter — replace the recorded manifest parts (and adjust
288    /// `total_rows` / `bytes_written` / `files_produced` to keep them
289    /// consistent with the parts list, the way real production code does).
290    #[doc(hidden)]
291    #[allow(dead_code)]
292    pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
293        self.total_rows = parts.iter().map(|p| p.rows).sum();
294        self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
295        self.files_produced = parts.len();
296        self.files_committed = parts.len();
297        self.manifest_parts = parts;
298        self
299    }
300
301    /// Test-only setter — error_message + (optionally) status.  Common
302    /// shape for the "failed-run" fixtures that populated 4+ existing
303    /// stubs.
304    #[doc(hidden)]
305    #[allow(dead_code)]
306    pub fn with_error(mut self, msg: impl Into<String>) -> Self {
307        self.error_message = Some(msg.into());
308        self
309    }
310
311    /// Test-only setter — record a PlanResolved event in the journal so
312    /// downstream observability paths (`journal.plan_snapshot()`,
313    /// `RunReport::from_summary` plan_origin lookup) see the same shape
314    /// they would on a real run.
315    #[doc(hidden)]
316    #[allow(dead_code)]
317    pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
318        self.journal.record(RunEvent::PlanResolved(snap));
319        self
320    }
321
322    pub(super) fn print(&self) {
323        // Capturing mode (IPC child or in-process channel): emit a
324        // `Finished` event and let the unified UI thread render the card.
325        // No stderr block here — the renderer owns the screen.
326        if ipc::capturing_events() {
327            ipc::emit_event(&ChildEvent::Finished {
328                export_name: self.export_name.clone(),
329                run_id: self.run_id.clone(),
330                status: self.status.clone(),
331                total_rows: self.total_rows,
332                files_produced: self.files_produced as u64,
333                bytes_written: self.bytes_written,
334                duration_ms: self.duration_ms,
335                peak_rss_mb: self.peak_rss_mb,
336                error_message: self.error_message.clone(),
337            });
338            return;
339        }
340
341        self.print_stderr_block();
342    }
343
344    /// Write the per-export summary block to stderr, bypassing IPC capture.
345    /// Used after the in-process card UI finishes on single-export sequential
346    /// runs so operators still get the detailed `── name ──` block below the
347    /// live card line.
348    pub(super) fn print_stderr_block(&self) {
349        let block = if multi_export_mode() {
350            self.render_compact()
351        } else {
352            // Render the whole block into a single buffer so the call site
353            // emits one `write_all` to stderr.  Without this, parallel
354            // exports could interleave individual lines from different
355            // `RunSummary::print()` calls — visible as garbled blocks in
356            // `--parallel-exports` runs.
357            self.render().trim_end_matches('\n').to_string()
358        };
359
360        use std::io::Write;
361        // V9 (CWE-150): the block embeds error_message, which can carry
362        // attacker-controlled ANSI/OSC escapes from a malicious source DB. The
363        // single-export path reaches the operator terminal here (the parallel
364        // renderer sanitises separately). Funnel the whole block through the
365        // shared sanitiser before write — it preserves the renderer's own
366        // multi-byte glyphs (✓/✗/──) and strips only C0/C1/DEL control bytes.
367        let mut buf = super::parent_ui::sanitize_terminal(&block);
368        buf.push('\n');
369        let stderr = std::io::stderr();
370        let mut handle = stderr.lock();
371        let _ = handle.write_all(buf.as_bytes());
372        let _ = handle.flush();
373    }
374
375    /// Compact one-line summary used when several exports run in the same
376    /// invocation.  Mirrors the parent_ui card line so `--parallel-exports`
377    /// (threads), sequential, and `--parallel-export-processes` (processes)
378    /// produce visually consistent per-export rows.
379    fn render_compact(&self) -> String {
380        const NAME_COL: usize = 22;
381        const MODE_COL: usize = 8;
382        let icon = match self.status.as_str() {
383            "success" => "✓",
384            "failed" => "✗",
385            _ => "•",
386        };
387        let body = if self.status == "failed" {
388            let err = self
389                .error_message
390                .as_deref()
391                .unwrap_or("(no error message recorded)");
392            let (cause, _) = strip_chunked_recovery_hint(err);
393            // Collapse multi-line / extremely long errors so the compact
394            // line stays one row tall.  Full payload lives in the stderr
395            // log above the run summary.
396            compact_error(cause)
397        } else {
398            let rss = if self.peak_rss_mb > 0 {
399                format!("  RSS {} MB", fmt_thousands(self.peak_rss_mb))
400            } else {
401                String::new()
402            };
403            format!(
404                "{} rows  {} files  {}  {}{}",
405                fmt_thousands(self.total_rows),
406                fmt_thousands(self.files_produced as i64),
407                format_bytes(self.bytes_written),
408                fmt_duration_ms(self.duration_ms),
409                rss
410            )
411        };
412        format!(
413            "{} {:<name$}  {:<mode$}  {}",
414            icon,
415            self.export_name,
416            self.mode,
417            body,
418            name = NAME_COL,
419            mode = MODE_COL,
420        )
421    }
422
423    /// Build the block as a string.  Public to the module so tests can assert
424    /// formatting without capturing stderr.
425    fn render(&self) -> String {
426        // Adaptive layout: collect (label, value) pairs that actually apply to
427        // this run, then pad labels to the longest one so columns line up
428        // *within* the block.  Header is a fixed width so consecutive blocks
429        // look uniform regardless of which optional fields are present.
430        let mut rows: Vec<(&'static str, String)> = Vec::with_capacity(16);
431        rows.push(("run_id", self.run_id.clone()));
432        let status_value = match (&self.status, &self.skip_reason) {
433            (s, Some(reason)) if s == "skipped" => format!("{s} ({reason})"),
434            (s, _) => s.clone(),
435        };
436        rows.push(("status", status_value));
437
438        let tuning_value = match self.batch_size_memory_mb {
439            Some(mem) => format!(
440                "profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
441                self.tuning_profile,
442                fmt_thousands(self.batch_size as i64),
443                mem
444            ),
445            None => format!(
446                "profile={}, batch_size={}",
447                self.tuning_profile,
448                fmt_thousands(self.batch_size as i64)
449            ),
450        };
451        rows.push(("tuning", tuning_value));
452
453        rows.push(("rows", fmt_thousands(self.total_rows)));
454        rows.push(("files", fmt_thousands(self.files_produced as i64)));
455        // On a 0-new incremental run, `0 rows  0 files` alone hides *why*
456        // nothing moved. Surface the cursor position as its own line so the
457        // operator sees the incremental boundary held — not just an empty run.
458        // The cursor *value* lives in the runner and isn't plumbed onto the
459        // summary yet, so this reports the column-level position derived from
460        // `skip_reason` (the value is a follow-up once it reaches here).
461        if let Some(pos) = incremental_position_line(self.skip_reason.as_deref()) {
462            rows.push(("cursor", pos));
463        } else if let Some(window) = time_window_skip_line(&self.mode, self.skip_reason.as_deref())
464        {
465            // A time_window run that returned 0 rows reports the generic
466            // `"source returned 0 rows"` skip (`cursor_column()` is `None` for
467            // this strategy), so the incremental branch above never fires. Add
468            // an explicit `window:` line so the operator can tell an *empty
469            // window* apart from a *wrong column / window* — otherwise the
470            // summary is indistinguishable from any other empty run. The window
471            // column / days / computed bound are not plumbed onto `RunSummary`,
472            // so this is the strategy-level signal reachable here (the concrete
473            // bound is a follow-up once the runner records it on the summary).
474            rows.push(("window", window));
475        }
476        if self.bytes_written > 0 {
477            rows.push(("bytes", format_bytes(self.bytes_written)));
478        }
479        rows.push(("duration", fmt_duration_ms(self.duration_ms)));
480
481        if self.peak_rss_mb > 0 {
482            rows.push((
483                "peak RSS",
484                format!(
485                    "{} MB (sampled during run)",
486                    fmt_thousands(self.peak_rss_mb)
487                ),
488            ));
489        }
490        if let Some(temp) = self.pg_temp_bytes_delta {
491            // Skip when the cluster reported no delta — only chatter when there
492            // was actual spill. > 100 MB is annotated with a tuning hint;
493            // smaller numbers are reported as plain info.
494            if temp > 0 {
495                let temp_mb = temp as f64 / (1024.0 * 1024.0);
496                let label = if temp > 100 * 1024 * 1024 {
497                    format!(
498                        "{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
499                        temp_mb
500                    )
501                } else {
502                    format!("{:.1} MB", temp_mb)
503                };
504                rows.push(("pg temp spill", label));
505            }
506        }
507        if self.format == "parquet" && self.compression != "zstd" {
508            rows.push(("compression", self.compression.clone()));
509        }
510        if self.retries > 0 {
511            rows.push(("retries", self.retries.to_string()));
512        }
513        if let Some(v) = self.validated {
514            rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
515        }
516        if let Some(sc) = self.schema_changed {
517            rows.push((
518                "schema",
519                if sc {
520                    "CHANGED".into()
521                } else {
522                    "unchanged".into()
523                },
524            ));
525        }
526        if let Some(q) = self.quality_passed {
527            rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
528        }
529        if let Some(reconciled) = self.reconciled {
530            let src = self
531                .source_count
532                .map(fmt_thousands)
533                .unwrap_or_else(|| "?".into());
534            let exported = fmt_thousands(self.total_rows);
535            let value = if reconciled {
536                format!("MATCH ({exported}/{src})")
537            } else {
538                format!("MISMATCH (exported {exported} vs source {src})")
539            };
540            rows.push(("reconcile", value));
541        }
542        if let Some(err) = &self.error_message {
543            // Preserve the error's own line structure: the detailed block has
544            // room for it and `format_block` now indents continuation lines
545            // under the value column. Flattening to `"; "`-joined text (the
546            // compact one-liner's job) made multi-line errors — e.g. a quality
547            // failure's `failed:\n  - <check>\n  Fix …` — hard to read here.
548            rows.push(("error", err.trim_end().to_string()));
549        }
550
551        format_block(&self.export_name, &rows)
552    }
553
554    /// Sanity-check the post-run summary ↔ manifest_parts coherence. Used as
555    /// a `debug_assert!`-style runtime gate from `finalize_manifest` so any
556    /// future runner that bumps `bytes_written` / `files_committed` /
557    /// `files_produced` without going through `pipeline::commit::record_part`
558    /// is caught the moment it finishes a real export. Compiled out in
559    /// release builds via the `cfg!(debug_assertions)` guard at the call site.
560    ///
561    /// **Resume-safe inequalities only**: on resume, `manifest_parts` carries
562    /// prior runs' parts via `chunked::resume_m8` while `bytes_written` /
563    /// `files_committed` reflect only the current invocation — so strict
564    /// equality is wrong across resume boundaries. Strict equality on the
565    /// non-resume path is pinned by `pipeline::commit::tests`.
566    ///
567    /// Returns `Ok(())` when the summary satisfies the invariants, else an
568    /// `Err(String)` naming which one was violated and by how much.
569    pub fn check_post_run_invariants(&self) -> Result<(), String> {
570        let parts_bytes: u64 = self.manifest_parts.iter().map(|p| p.size_bytes).sum();
571
572        if self.files_committed > self.manifest_parts.len() {
573            return Err(format!(
574                "summary.files_committed ({}) > manifest_parts.len() ({}) — \
575                 a runner bumped files_committed without commit::record_part",
576                self.files_committed,
577                self.manifest_parts.len()
578            ));
579        }
580        if self.files_produced > self.manifest_parts.len() {
581            return Err(format!(
582                "summary.files_produced ({}) > manifest_parts.len() ({}) — \
583                 a runner bumped files_produced without commit::record_part",
584                self.files_produced,
585                self.manifest_parts.len()
586            ));
587        }
588        if self.bytes_written > parts_bytes {
589            return Err(format!(
590                "summary.bytes_written ({}) > sum(manifest_parts.size_bytes) ({}) — \
591                 a runner bumped bytes_written without commit::record_part",
592                self.bytes_written, parts_bytes
593            ));
594        }
595        if self.status == "success" && self.files_committed > 0 && self.manifest_parts.is_empty() {
596            return Err(format!(
597                "success run with files_committed={} has empty manifest_parts — \
598                 cloud manifest (ADR-0012 M1) would ship with no part list \
599                 (this is the gap parallel_checkpoint had before commit e9b0796)",
600                self.files_committed
601            ));
602        }
603        // Invariant audit gap #1, weak form: a successful run that produced
604        // rows for THIS invocation must have committed at least one file.
605        // The strict form ("rows_written <= rows_read") would require a
606        // separate source-side row counter we do not track, and concurrent
607        // INSERTs on the source (live_oltp_load) make a source_count
608        // comparison brittle. This weak form catches a fabrication shape:
609        // total_rows accumulated but nothing reached the destination — a
610        // runner that fetched and silently dropped rows produces exactly
611        // this signature. Resume-safe: total_rows reflects only this
612        // invocation, so a resume with no work to do legitimately ends at
613        // total_rows=0 / files_committed=0 and the guard does not fire.
614        if self.status == "success" && self.total_rows > 0 && self.files_committed == 0 {
615            return Err(format!(
616                "summary.total_rows={} but files_committed=0 — rows extracted from \
617                 source but no files committed (no output reached the destination)",
618                self.total_rows
619            ));
620        }
621        Ok(())
622    }
623}
624
625/// Reduce a possibly-multi-line execution error to a single-line, bounded-
626/// length cause suitable for the per-export summary block and the compact
627/// one-liner.  Keeps the user-actionable bit and drops noisy diagnostic
628/// payloads (long URLs, query strings, repeated chunk errors).
629///
630/// Recognised shapes:
631/// - `parallel checkpoint worker errors:\nchunk N: <msg>\nchunk M: <msg>` →
632///   `parallel checkpoint workers failed: K chunk(s) (chunk N: <truncated>)`.
633///   The full per-chunk detail is already in stderr logs.
634/// - Generic multi-line: newlines are replaced with `; ` and the result is
635///   clamped to 240 characters with an ellipsis.
636fn compact_error(raw: &str) -> String {
637    const MAX_CHARS: usize = 240;
638    if let Some(summary) = summarize_parallel_chunk_errors(raw) {
639        return clamp_chars(&summary, MAX_CHARS);
640    }
641    let collapsed: String = raw
642        .lines()
643        .map(str::trim_end)
644        .filter(|s| !s.is_empty())
645        .collect::<Vec<_>>()
646        .join("; ");
647    clamp_chars(&collapsed, MAX_CHARS)
648}
649
650/// Derive the summary block's `cursor:` line from a `skip_reason`.
651///
652/// `skip_reason` for an incremental no-op is `"no new rows since cursor
653/// '<col>'"` (set by the runner); we lift the column out and report the
654/// position as held. Returns `None` for the non-cursor `"source returned 0
655/// rows"` skip and for `None` (a run that actually produced rows). The cursor
656/// *value* isn't carried on the summary yet, so this is column-level only.
657fn incremental_position_line(skip_reason: Option<&str>) -> Option<String> {
658    let col = skip_reason?
659        .strip_prefix("no new rows since cursor '")?
660        .strip_suffix('\'')?;
661    Some(format!("'{col}' unchanged (no new rows this run)"))
662}
663
664/// Derive the summary block's `window:` line for a time_window run that
665/// returned nothing.
666///
667/// A `TimeWindow` strategy has no cursor column, so a 0-row run reports the
668/// generic `"source returned 0 rows"` skip — the `incremental_position_line`
669/// branch never fires and, without this line, an empty time window looks
670/// identical to any other empty export. Surfacing it lets the operator tell an
671/// *empty window* (data simply outside the rolling range) from a *misconfigured
672/// window* (wrong `time_column` / `days_window`).
673///
674/// Keyed on `mode == "timewindow"` (set from `ExtractionStrategy::mode_label`)
675/// plus a set skip reason, so it only fires on a skipped time_window run and
676/// never on incremental/snapshot/chunked/keyset. The window column, days, and
677/// computed lower bound are not carried on `RunSummary`, so this reports the
678/// strategy-level fact and where to look — the concrete bound is a follow-up
679/// once the runner records it onto the summary.
680fn time_window_skip_line(mode: &str, skip_reason: Option<&str>) -> Option<String> {
681    skip_reason?;
682    if mode != "timewindow" {
683        return None;
684    }
685    Some("rolling time window matched no rows — check `time_column`/`days_window`".to_string())
686}
687
688fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
689    let header_pos = raw.find("parallel checkpoint worker errors:")?;
690    let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
691    let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
692
693    let chunk_lines: Vec<&str> = tail
694        .lines()
695        .map(str::trim)
696        .filter(|l| l.starts_with("chunk "))
697        .collect();
698    if chunk_lines.is_empty() {
699        return None;
700    }
701    let first_chunk_full = chunk_lines[0];
702    // Truncate the example chunk message; the URL/payload is in stderr logs.
703    let first_chunk_short = clamp_chars(first_chunk_full, 140);
704    let prefix = if prefix.is_empty() {
705        String::new()
706    } else {
707        format!("{}: ", prefix)
708    };
709    Some(format!(
710        "{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
711        prefix,
712        chunk_lines.len(),
713        first_chunk_short
714    ))
715}
716
717fn clamp_chars(s: &str, max_chars: usize) -> String {
718    if max_chars == 0 {
719        return String::new();
720    }
721    if s.chars().count() <= max_chars {
722        return s.to_string();
723    }
724    let keep = max_chars.saturating_sub(1);
725    let mut out: String = s.chars().take(keep).collect();
726    out.push('…');
727    out
728}
729
730/// Render a `── name ─────…─` header plus one indented `label:  value` line
731/// per row, all joined into a single string ending with `\n`.
732fn format_block(name: &str, rows: &[(&str, String)]) -> String {
733    const HEADER_WIDTH: usize = 60;
734    let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
735
736    let prefix = format!("── {} ", name);
737    let prefix_chars = prefix.chars().count();
738    let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
739    let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
740    out.push('\n');
741    out.push_str(&prefix);
742    for _ in 0..dashes {
743        out.push('─');
744    }
745    out.push('\n');
746    // Continuation lines of a multi-line value (e.g. the multi-line `error`
747    // row) are indented to align under the value column, so block-shaped
748    // messages stay readable instead of being flattened onto one line.
749    let value_indent = " ".repeat(2 + (label_w + 1) + 2);
750    for (label, value) in rows {
751        // `label_w + 1` so the colon stays attached to the label and the
752        // value column starts uniformly two spaces after it.
753        let mut lines = value.split('\n');
754        let first = lines.next().unwrap_or("");
755        out.push_str(&format!(
756            "  {:<width$}  {}\n",
757            format!("{label}:"),
758            first,
759            width = label_w + 1
760        ));
761        for cont in lines {
762            out.push_str(&value_indent);
763            out.push_str(cont);
764            out.push('\n');
765        }
766    }
767    out
768}
769
770fn fmt_duration_ms(ms: i64) -> String {
771    if ms < 1000 {
772        return format!("{}ms", ms);
773    }
774    let total_secs = ms / 1000;
775    let h = total_secs / 3600;
776    let m = (total_secs % 3600) / 60;
777    let s_frac = (ms % 60_000) as f64 / 1000.0;
778    if h > 0 {
779        format!("{}h {:02}m {:04.1}s", h, m, s_frac)
780    } else if m > 0 {
781        format!("{}m {:04.1}s", m, s_frac)
782    } else {
783        format!("{:.1}s", ms as f64 / 1000.0)
784    }
785}
786
787/// Format integers with a comma every three digits.  Negative values keep
788/// their sign.  Used for rows / files / batch_size so large numbers stay
789/// readable: `39_990_376` → `39,990,376`.
790fn fmt_thousands(n: i64) -> String {
791    let abs = n.unsigned_abs();
792    let s = abs.to_string();
793    let bytes = s.as_bytes();
794    let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
795    if n < 0 {
796        out.push('-');
797    }
798    for (i, b) in bytes.iter().enumerate() {
799        let from_end = bytes.len() - i;
800        if i > 0 && from_end.is_multiple_of(3) {
801            out.push(',');
802        }
803        out.push(*b as char);
804    }
805    out
806}
807
808#[cfg(test)]
809mod tests {
810    use super::*;
811
812    #[test]
813    fn fmt_thousands_handles_small_and_large() {
814        assert_eq!(fmt_thousands(0), "0");
815        assert_eq!(fmt_thousands(7), "7");
816        assert_eq!(fmt_thousands(999), "999");
817        assert_eq!(fmt_thousands(1_000), "1,000");
818        assert_eq!(fmt_thousands(1_000_908), "1,000,908");
819        assert_eq!(fmt_thousands(39_990_376), "39,990,376");
820        assert_eq!(fmt_thousands(-1_234), "-1,234");
821        assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
822    }
823
824    #[test]
825    fn fmt_duration_picks_unit() {
826        assert_eq!(fmt_duration_ms(0), "0ms");
827        assert_eq!(fmt_duration_ms(800), "800ms");
828        assert_eq!(fmt_duration_ms(1_500), "1.5s");
829        assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
830        assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
831    }
832
833    #[test]
834    fn format_block_pads_labels_uniformly() {
835        let rows = vec![
836            ("run_id", "abc".to_string()),
837            ("rows", "42".to_string()),
838            ("compression", "zstd".to_string()),
839        ];
840        let out = format_block("orders", &rows);
841
842        // Each value column starts at the same character position.
843        let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
844        assert_eq!(lines.len(), 3);
845        let value_starts: Vec<usize> = lines
846            .iter()
847            .map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
848            .collect();
849        // The value (after `label:` plus padding plus two spaces) starts at the
850        // same column for every row.  We verify by checking all lines have the
851        // value substring at the same byte offset.
852        let value_col = lines[0].rfind("abc").unwrap();
853        assert_eq!(lines[1].rfind("42").unwrap(), value_col);
854        assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
855        // Sanity: silence unused.
856        let _ = value_starts;
857    }
858
859    #[test]
860    fn format_block_header_has_consistent_width() {
861        let block_a = format_block("a", &[("rows", "1".into())]);
862        let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
863        let header_a = block_a.lines().nth(1).unwrap();
864        let header_b = block_b.lines().nth(1).unwrap();
865        assert_eq!(
866            header_a.chars().count(),
867            header_b.chars().count(),
868            "headers must be the same width regardless of name length: {:?} vs {:?}",
869            header_a,
870            header_b
871        );
872    }
873
874    #[test]
875    fn render_produces_a_single_string_with_trailing_newline() {
876        use crate::plan::{
877            CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
878            MetaColumns, ResolvedRunPlan,
879        };
880        use crate::tuning::SourceTuning;
881        let plan = ResolvedRunPlan {
882            export_name: "orders".into(),
883            base_query: "SELECT 1".into(),
884            strategy: ExtractionStrategy::Snapshot,
885            format: FormatType::Parquet,
886            compression: CompressionType::default(),
887            compression_level: None,
888            max_file_size_bytes: None,
889            skip_empty: false,
890            meta_columns: MetaColumns::default(),
891            destination: DestinationConfig {
892                destination_type: DestinationType::Local,
893                path: Some("./out".into()),
894                ..Default::default()
895            },
896            quality: None,
897            tuning: SourceTuning::from_config(None),
898            tuning_profile_label: "balanced (default)".into(),
899            validate: false,
900            reconcile: false,
901            resume: false,
902            source: crate::config::SourceConfig {
903                source_type: crate::config::SourceType::Postgres,
904                url: Some("postgresql://localhost/test".into()),
905                url_env: None,
906                url_file: None,
907                host: None,
908                port: None,
909                user: None,
910                password: None,
911                password_env: None,
912                database: None,
913                environment: None,
914                tuning: None,
915                tls: None,
916            },
917            column_overrides: Default::default(),
918            verify: crate::config::VerifyMode::Size,
919            schema_drift_policy: Default::default(),
920            shape_drift_warn_factor: 2.0,
921            parquet: None,
922        };
923        let mut s = RunSummary::new(&plan);
924        s.status = "success".into();
925        s.total_rows = 1_000_908;
926        s.files_produced = 11;
927        s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
928        s.duration_ms = 68_400;
929        s.peak_rss_mb = 884;
930
931        let block = s.render();
932        assert!(
933            block.starts_with('\n'),
934            "block should start with a blank line"
935        );
936        assert!(block.ends_with('\n'), "block should end with a newline");
937        assert!(block.contains("── orders "));
938        assert!(
939            block.contains("1,000,908"),
940            "rows should be formatted with thousands separator: {}",
941            block
942        );
943        assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
944        // No raw progress-bar bleed: header dashes still present, no carriage
945        // returns or escape sequences.
946        assert!(!block.contains('\r'));
947
948        // Compact one-liner used in multi-export runs.
949        let line = s.render_compact();
950        assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
951        assert!(line.contains("orders"), "export name present: {:?}", line);
952        assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
953        assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
954        assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
955        assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
956        assert!(!line.contains('\n'), "single line: {:?}", line);
957    }
958
959    #[test]
960    fn compact_error_summarises_parallel_chunk_errors() {
961        let raw = "export 'page_views': parallel checkpoint worker errors:\n\
962                   chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
963                   chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
964        let out = compact_error(raw);
965        assert!(
966            out.contains("2 chunk(s)"),
967            "should report number of failed chunks: {:?}",
968            out
969        );
970        assert!(
971            out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
972            "should keep export prefix and use compact phrasing: {:?}",
973            out
974        );
975        assert!(
976            out.contains("chunk 4:"),
977            "should include the first chunk as an example: {:?}",
978            out
979        );
980        assert!(!out.contains('\n'), "single line output: {:?}", out);
981        assert!(
982            out.chars().count() <= 240,
983            "must be clamped to <=240 chars, got {}: {:?}",
984            out.chars().count(),
985            out
986        );
987    }
988
989    #[test]
990    fn compact_error_collapses_generic_multiline() {
991        let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
992        let out = compact_error(raw);
993        assert_eq!(
994            out, "first line of trouble; second line with detail; third line",
995            "newlines should collapse to '; ' and blanks dropped"
996        );
997    }
998
999    #[test]
1000    fn compact_error_clamps_excessively_long_lines() {
1001        let raw = "x".repeat(1_000);
1002        let out = compact_error(&raw);
1003        assert_eq!(out.chars().count(), 240);
1004        assert!(out.ends_with('…'));
1005    }
1006
1007    #[test]
1008    fn render_compact_strips_chunked_recovery_hint_for_failed() {
1009        use crate::plan::{
1010            CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1011            MetaColumns, ResolvedRunPlan,
1012        };
1013        use crate::tuning::SourceTuning;
1014        let plan = ResolvedRunPlan {
1015            export_name: "events".into(),
1016            base_query: "SELECT 1".into(),
1017            strategy: ExtractionStrategy::Snapshot,
1018            format: FormatType::Parquet,
1019            compression: CompressionType::default(),
1020            compression_level: None,
1021            max_file_size_bytes: None,
1022            skip_empty: false,
1023            meta_columns: MetaColumns::default(),
1024            destination: DestinationConfig {
1025                destination_type: DestinationType::Local,
1026                path: Some("./out".into()),
1027                ..Default::default()
1028            },
1029            quality: None,
1030            tuning: SourceTuning::from_config(None),
1031            tuning_profile_label: "balanced (default)".into(),
1032            validate: false,
1033            reconcile: false,
1034            resume: false,
1035            source: crate::config::SourceConfig {
1036                source_type: crate::config::SourceType::Postgres,
1037                url: Some("postgresql://localhost/test".into()),
1038                url_env: None,
1039                url_file: None,
1040                host: None,
1041                port: None,
1042                user: None,
1043                password: None,
1044                password_env: None,
1045                database: None,
1046                environment: None,
1047                tuning: None,
1048                tls: None,
1049            },
1050            column_overrides: Default::default(),
1051            verify: crate::config::VerifyMode::Size,
1052            schema_drift_policy: Default::default(),
1053            shape_drift_warn_factor: 2.0,
1054            parquet: None,
1055        };
1056        let mut s = RunSummary::new(&plan);
1057        s.status = "failed".into();
1058        s.error_message = Some(
1059            "export 'events': --resume but no in-progress chunk checkpoint; \
1060             run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
1061                .to_string(),
1062        );
1063
1064        let line = s.render_compact();
1065        assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
1066        assert!(line.contains("events"), "name present: {:?}", line);
1067        assert!(
1068            line.contains("--resume but no in-progress chunk checkpoint"),
1069            "cause kept: {:?}",
1070            line
1071        );
1072        assert!(
1073            !line.contains("rivet state reset-chunks"),
1074            "recovery hint should be stripped from per-export line: {:?}",
1075            line
1076        );
1077        assert!(!line.contains('\n'), "single line: {:?}", line);
1078    }
1079
1080    fn plan_for(export_name: &str) -> crate::plan::ResolvedRunPlan {
1081        use crate::plan::{
1082            CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
1083            MetaColumns, ResolvedRunPlan,
1084        };
1085        use crate::tuning::SourceTuning;
1086        ResolvedRunPlan {
1087            export_name: export_name.into(),
1088            base_query: "SELECT 1".into(),
1089            strategy: ExtractionStrategy::Snapshot,
1090            format: FormatType::Parquet,
1091            compression: CompressionType::default(),
1092            compression_level: None,
1093            max_file_size_bytes: None,
1094            skip_empty: false,
1095            meta_columns: MetaColumns::default(),
1096            destination: DestinationConfig {
1097                destination_type: DestinationType::Local,
1098                path: Some("./out".into()),
1099                ..Default::default()
1100            },
1101            quality: None,
1102            tuning: SourceTuning::from_config(None),
1103            tuning_profile_label: "balanced (default)".into(),
1104            validate: false,
1105            reconcile: false,
1106            resume: false,
1107            source: crate::config::SourceConfig {
1108                source_type: crate::config::SourceType::Postgres,
1109                url: Some("postgresql://localhost/test".into()),
1110                url_env: None,
1111                url_file: None,
1112                host: None,
1113                port: None,
1114                user: None,
1115                password: None,
1116                password_env: None,
1117                database: None,
1118                environment: None,
1119                tuning: None,
1120                tls: None,
1121            },
1122            column_overrides: Default::default(),
1123            verify: crate::config::VerifyMode::Size,
1124            schema_drift_policy: Default::default(),
1125            shape_drift_warn_factor: 2.0,
1126            parquet: None,
1127        }
1128    }
1129
1130    #[test]
1131    fn render_preserves_multiline_error_block() {
1132        // L19: a multi-line error (a quality failure here) must stay multi-line
1133        // in the detailed single-export block — not collapsed to `"; "`-joined
1134        // text the way the compact one-liner does.
1135        let mut s = RunSummary::new(&plan_for("orders"));
1136        s.status = "failed".into();
1137        s.error_message = Some(
1138            "export 'orders': 1 quality check(s) failed:\n  \
1139             - row_count 10 below minimum 999999\n  \
1140             Fix the source data, or adjust the thresholds under `quality:` in your config."
1141                .to_string(),
1142        );
1143
1144        let block = s.render();
1145        // The collapsed form joined lines with `"; "` — assert that flattening
1146        // is gone and the original newline structure survives.
1147        assert!(
1148            !block.contains("failed:;"),
1149            "error must not be '; '-flattened in the detailed block: {block}"
1150        );
1151        assert!(
1152            block.contains("- row_count 10 below minimum 999999"),
1153            "failing check line present: {block}"
1154        );
1155        // Each part of the multi-line error lands on its own line.
1156        let err_lines: Vec<&str> = block
1157            .lines()
1158            .filter(|l| {
1159                l.contains("quality check(s) failed")
1160                    || l.contains("row_count 10 below minimum")
1161                    || l.contains("Fix the source data")
1162            })
1163            .collect();
1164        assert_eq!(
1165            err_lines.len(),
1166            3,
1167            "all three error lines should render on separate lines: {block}"
1168        );
1169        // Continuation lines are indented under the value column, not at col 0.
1170        for l in &err_lines {
1171            assert!(l.starts_with(' '), "error line should be indented: {l:?}");
1172        }
1173    }
1174
1175    #[test]
1176    fn render_surfaces_cursor_position_on_zero_new_incremental() {
1177        // L27: a 0-new incremental run shows `0 rows  0 files`; without a
1178        // cursor line the operator can't tell the boundary held. Assert the
1179        // dedicated `cursor:` line appears, derived from `skip_reason`.
1180        let mut s = RunSummary::new(&plan_for("orders"));
1181        s.status = "skipped".into();
1182        s.skip_reason = Some("no new rows since cursor 'updated_at'".into());
1183
1184        let block = s.render();
1185        let cursor_line = block
1186            .lines()
1187            .find(|l| l.trim_start().starts_with("cursor:"))
1188            .unwrap_or_else(|| panic!("expected a cursor: line in block: {block}"));
1189        assert!(
1190            cursor_line.contains("'updated_at'"),
1191            "cursor line names the column: {cursor_line:?}"
1192        );
1193        assert!(
1194            cursor_line.contains("unchanged"),
1195            "cursor line reports the position held: {cursor_line:?}"
1196        );
1197    }
1198
1199    #[test]
1200    fn incremental_position_line_only_for_cursor_skips() {
1201        // The non-cursor 0-row skip and the no-skip case produce no cursor line.
1202        assert_eq!(
1203            incremental_position_line(Some("no new rows since cursor 'ts'")),
1204            Some("'ts' unchanged (no new rows this run)".into())
1205        );
1206        assert_eq!(
1207            incremental_position_line(Some("source returned 0 rows")),
1208            None
1209        );
1210        assert_eq!(incremental_position_line(None), None);
1211    }
1212
1213    #[test]
1214    fn render_surfaces_window_position_on_zero_row_time_window() {
1215        // L27 (time_window arm): a 0-row time_window run reports the generic
1216        // `"source returned 0 rows"` skip (the strategy has no cursor column),
1217        // so the `cursor:` branch never fires. Without a `window:` line the
1218        // operator can't tell an empty window from a wrong column/window —
1219        // assert the dedicated `window:` line appears for this mode.
1220        let mut s = RunSummary::new(&plan_for("events"));
1221        s.status = "skipped".into();
1222        s.mode = "timewindow".into();
1223        s.skip_reason = Some("source returned 0 rows".into());
1224
1225        let block = s.render();
1226        let window_line = block
1227            .lines()
1228            .find(|l| l.trim_start().starts_with("window:"))
1229            .unwrap_or_else(|| panic!("expected a window: line in block: {block}"));
1230        assert!(
1231            window_line.contains("matched no rows"),
1232            "window line reports the empty window: {window_line:?}"
1233        );
1234        assert!(
1235            window_line.contains("time_column") && window_line.contains("days_window"),
1236            "window line points at the window config to check: {window_line:?}"
1237        );
1238        // The generic 0-row skip must not also produce a `cursor:` line.
1239        assert!(
1240            !block.lines().any(|l| l.trim_start().starts_with("cursor:")),
1241            "no cursor line for a non-cursor strategy: {block}"
1242        );
1243    }
1244
1245    #[test]
1246    fn time_window_skip_line_only_for_skipped_time_window() {
1247        // Fires only when the run skipped AND the strategy is time_window.
1248        assert_eq!(
1249            time_window_skip_line("timewindow", Some("source returned 0 rows")),
1250            Some("rolling time window matched no rows — check `time_column`/`days_window`".into())
1251        );
1252        // Wrong mode → no window line (incremental/snapshot handle their own).
1253        assert_eq!(
1254            time_window_skip_line("incremental", Some("source returned 0 rows")),
1255            None
1256        );
1257        assert_eq!(
1258            time_window_skip_line("full", Some("source returned 0 rows")),
1259            None
1260        );
1261        // A time_window run that produced rows (no skip) gets no window line.
1262        assert_eq!(time_window_skip_line("timewindow", None), None);
1263    }
1264}