Skip to main content

rivet/pipeline/
summary.rs

1//! **Layer: Observability**
2//!
3//! `RunSummary` is the single observability artifact for a pipeline run.
4//! It accumulates operational data during execution and is consumed by:
5//! - the end-of-run terminal output (`print`)
6//! - the metrics store (`state::record_metric`)
7//! - the notification system (`notify::maybe_send`)
8//!
9//! `RunSummary` is written to by execution modules (row counts, byte counts, retries)
10//! but it makes no execution decisions itself — it is a pure data accumulator.
11//!
12//! It embeds a `RunJournal` so that all pipeline modules — which already hold
13//! `&mut RunSummary` — can record structured events via `summary.journal.record()`
14//! without any signature changes.  In a future epic the relationship will invert:
15//! `RunSummary` will be derived from `RunJournal`.
16
17use super::ipc::{self, ChildEvent};
18use super::{format_bytes, multi_export_mode, strip_chunked_recovery_hint};
19use crate::journal::{PlanSnapshot, RunEvent, RunJournal};
20use crate::manifest::ManifestPart;
21use crate::plan::ResolvedRunPlan;
22
23/// Build a `PlanSnapshot` from a `ResolvedRunPlan`.
24///
25/// Lives here rather than on `journal` itself so that the journal module
26/// stays free of plan/pipeline dependencies (avoids the state→pipeline cycle
27/// we used to have via `state::journal_store`).
28fn plan_snapshot_from(plan: &ResolvedRunPlan) -> PlanSnapshot {
29    PlanSnapshot {
30        export_name: plan.export_name.clone(),
31        base_query: plan.base_query.clone(),
32        strategy: plan.strategy.mode_label().to_string(),
33        format: plan.format.label().to_string(),
34        compression: plan.compression.label().to_string(),
35        destination_type: plan.destination.destination_type.label().to_string(),
36        tuning_profile: plan.tuning_profile_label.clone(),
37        batch_size: plan.tuning.batch_size,
38        validate: plan.validate,
39        reconcile: plan.reconcile,
40        resume: plan.resume,
41    }
42}
43
44/// Context recorded when a run was launched via `rivet apply` rather than
45/// `rivet run`.  Provides the audit trail for **F5**: which plan artifact
46/// was applied, whether `--force` was passed, and which preflight checks
47/// `--force` actually bypassed.
48///
49/// `None` on `RunSummary` means the run came from `rivet run` (no plan
50/// artifact was applied).
51#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct ApplyContext {
53    /// `plan_id` from the applied `PlanArtifact`.
54    pub plan_id: String,
55    /// Was `--force` passed to `rivet apply`?
56    pub forced: bool,
57    /// Names of preflight checks that `--force` actually overrode for this
58    /// run.  Possible values: `"staleness"`, `"cursor_drift"`.  Empty when
59    /// `forced` is true but no check actually had to be bypassed (the plan
60    /// was fresh and the cursor matched).
61    pub force_bypassed: Vec<String>,
62}
63
64/// Accumulates operational data during a pipeline run for summary and metrics.
65///
66/// The embedded `journal` is the structured event log for this run.  Use
67/// `summary.journal.record(event)` at any call site that already holds
68/// `&mut RunSummary`.
69#[derive(Debug, Clone)]
70pub struct RunSummary {
71    pub run_id: String,
72    pub export_name: String,
73    pub status: String,
74    pub total_rows: i64,
75    pub files_produced: usize,
76    pub bytes_written: u64,
77    /// Incremented after each successful `dest.write()`. Non-zero means a previous
78    /// attempt already committed data — retrying from the same cursor would duplicate rows.
79    pub files_committed: usize,
80    pub duration_ms: i64,
81    pub peak_rss_mb: i64,
82    pub retries: u32,
83    pub validated: Option<bool>,
84    pub schema_changed: Option<bool>,
85    pub quality_passed: Option<bool>,
86    pub error_message: Option<String>,
87    /// `profile` from YAML, or `balanced (default)` if omitted.
88    pub tuning_profile: String,
89    /// Configured `batch_size` from YAML/profile (FETCH cap before `batch_size_memory_mb` override).
90    pub batch_size: usize,
91    /// When set, actual FETCH size is derived from schema (see logs).
92    pub batch_size_memory_mb: Option<usize>,
93    pub format: String,
94    pub mode: String,
95    pub compression: String,
96    /// Postgres `pg_stat_database.temp_bytes` delta around the run. `None` for
97    /// non-Postgres sources or when the snapshot probe failed (no admin perms
98    /// not required — the view is readable by any role). When set and large,
99    /// indicates cursor / sort spill to `pgsql_tmp/` — the safe action is to
100    /// shrink `tuning.batch_size` or set `tuning.batch_size_memory_mb` below
101    /// PG's `work_mem`.
102    pub pg_temp_bytes_delta: Option<i64>,
103    /// Human-readable parenthetical attached to `status: skipped` so the
104    /// operator knows *why* there was nothing to export this run (e.g.
105    /// `"no new rows since cursor 'updated_at'"`). Always `None` when
106    /// `status != "skipped"`. Surfaced in the console summary card as
107    /// `status: skipped (<reason>)`.
108    pub skip_reason: Option<String>,
109    /// Source COUNT(*) result for reconciliation (None = not requested or not applicable).
110    pub source_count: Option<i64>,
111    /// Whether reconciliation passed (Some(true) = match, Some(false) = mismatch, None = skipped).
112    pub reconciled: Option<bool>,
113    /// Committed parts accumulated during the run, in commit order.  Populated by
114    /// `pipeline::manifest_writer::record_committed_part` at each `dest.write`
115    /// site (ADR-0012 M1 — Parts Before Manifest).  Drained at finalize into a
116    /// `RunManifest` by [`crate::pipeline::manifest_writer::write_manifest`].
117    pub manifest_parts: Vec<ManifestPart>,
118    /// xxh3 fingerprint of the dest-facing column schema for this run, in the
119    /// canonical `xxh3:<16-hex>` form produced by [`crate::state::schema_fingerprint`].
120    ///
121    /// Recorded by [`crate::pipeline::manifest_writer::record_run_schema_fingerprint`]
122    /// the first time the sink has resolved a schema (i.e. on the first batch
123    /// of any chunk).  Idempotent within a run — the schema is identical across
124    /// chunks, so later writes are no-ops.
125    ///
126    /// `finalize_manifest` reads this directly so the manifest's
127    /// `schema_fingerprint` no longer depends on the per-export schema row
128    /// happening to land in `state` before the manifest write.  The state
129    /// lookup remains a fallback for resume scenarios where the summary was
130    /// reconstructed without ever seeing a live schema.
131    pub schema_fingerprint: Option<String>,
132    /// Result of the manifest-aware `--validate` pass (ADR-0012 M5/M6,
133    /// ADR-0013).  Populated by `pipeline::job::finalize_validate_manifest`
134    /// after `finalize_manifest` succeeds; `None` when the run targeted a
135    /// streaming destination, when `--validate` was not requested, or when
136    /// the run failed before any manifest could be written.
137    pub manifest_verification: Option<crate::pipeline::ManifestVerification>,
138    /// Apply-time context (plan_id, --force usage, bypassed checks).
139    /// `None` when the run came from `rivet run` rather than `rivet apply`.
140    /// See [`ApplyContext`] and finding **F5** of the 0.7.5 audit.
141    pub apply_context: Option<ApplyContext>,
142    /// Structured event log for this run.  Answers the four DoD observability questions.
143    pub journal: RunJournal,
144}
145
146impl RunSummary {
147    pub(super) fn new(plan: &ResolvedRunPlan) -> Self {
148        let run_id = format!(
149            "{}_{}",
150            plan.export_name,
151            chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
152        );
153        let mut journal = RunJournal::new(&run_id, &plan.export_name);
154        journal.record(RunEvent::PlanResolved(plan_snapshot_from(plan)));
155
156        ipc::emit_event(&ChildEvent::Started {
157            export_name: plan.export_name.clone(),
158            run_id: run_id.clone(),
159            mode: plan.strategy.mode_label().to_string(),
160            tuning_profile: plan.tuning_profile_label.clone(),
161            batch_size: plan.tuning.batch_size,
162        });
163
164        Self {
165            run_id,
166            export_name: plan.export_name.clone(),
167            status: "running".into(),
168            total_rows: 0,
169            files_produced: 0,
170            bytes_written: 0,
171            files_committed: 0,
172            duration_ms: 0,
173            peak_rss_mb: 0,
174            retries: 0,
175            validated: None,
176            schema_changed: None,
177            quality_passed: None,
178            error_message: None,
179            tuning_profile: plan.tuning_profile_label.clone(),
180            batch_size: plan.tuning.batch_size,
181            batch_size_memory_mb: plan.tuning.batch_size_memory_mb,
182            format: plan.format.label().to_string(),
183            mode: plan.strategy.mode_label().to_string(),
184            compression: plan.compression.label().to_string(),
185            pg_temp_bytes_delta: None,
186            skip_reason: None,
187            source_count: None,
188            reconciled: None,
189            manifest_parts: Vec::new(),
190            schema_fingerprint: None,
191            manifest_verification: None,
192            apply_context: None,
193            journal,
194        }
195    }
196
197    /// One canonical builder for tests across the crate + integration suite.
198    ///
199    /// Every field is filled with a sensible default; callers tweak only what
200    /// the test cares about via the chainable setters below.  This replaces
201    /// the seven copies of `stub_summary` / `fresh_summary` / `make_summary`
202    /// / `dummy_summary` / `empty_summary` that used to live in `notify.rs`,
203    /// `report.rs`, `chunked/{exec,mod}.rs`, `manifest_writer.rs`, and the
204    /// two integration-test files.  When `RunSummary` gains a field, it is
205    /// updated here once instead of across nine sites.
206    ///
207    /// Available outside `pipeline` (`pub` not `pub(crate)`) so integration
208    /// tests in `tests/` can use it via `RunSummary::stub_for_testing(...)`.
209    /// The `_for_testing` suffix is the convention from elsewhere in the
210    /// codebase (`destination_for_tests`, etc.) — production code should
211    /// never call it.
212    ///
213    /// `#[allow(dead_code)]` on each helper because the bin target's
214    /// dead-code analysis doesn't see uses from integration tests in `tests/`.
215    /// The lib's unit tests + the integration suite together exercise every
216    /// helper; the attribute is a no-op for them.
217    #[doc(hidden)]
218    #[allow(dead_code)]
219    pub fn stub_for_testing(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
220        let run_id = run_id.into();
221        let export_name = export_name.into();
222        let journal = RunJournal::new(&run_id, &export_name);
223        Self {
224            run_id,
225            export_name,
226            status: "running".into(),
227            total_rows: 0,
228            files_produced: 0,
229            bytes_written: 0,
230            files_committed: 0,
231            duration_ms: 0,
232            peak_rss_mb: 0,
233            retries: 0,
234            validated: None,
235            schema_changed: None,
236            quality_passed: None,
237            error_message: None,
238            tuning_profile: "balanced".into(),
239            batch_size: 1000,
240            batch_size_memory_mb: None,
241            format: "parquet".into(),
242            mode: "snapshot".into(),
243            compression: "zstd".into(),
244            pg_temp_bytes_delta: None,
245            skip_reason: None,
246            source_count: None,
247            reconciled: None,
248            manifest_parts: Vec::new(),
249            schema_fingerprint: None,
250            manifest_verification: None,
251            apply_context: None,
252            journal,
253        }
254    }
255
256    /// Test-only chainable setter for the run's status field.
257    ///
258    /// Used to build success/failed/running variants without re-listing every
259    /// field.  Keeps the journal in sync: terminal statuses get a matching
260    /// `RunCompleted` event recorded so consumers reading
261    /// `journal.final_outcome()` see the right shape.
262    #[doc(hidden)]
263    #[allow(dead_code)]
264    pub fn with_status(mut self, status: impl Into<String>) -> Self {
265        let s = status.into();
266        if (s == "success" || s == "failed") && self.journal.final_outcome().is_none() {
267            self.journal.record(RunEvent::RunCompleted {
268                status: s.clone(),
269                error_message: self.error_message.clone(),
270                duration_ms: self.duration_ms,
271            });
272        }
273        self.status = s;
274        self
275    }
276
277    /// Test-only setter — record `files_committed` so resume-hint logic
278    /// (`pipeline::report`) can detect the "failed run with committed files"
279    /// path that produces a resume command.
280    #[doc(hidden)]
281    #[allow(dead_code)]
282    pub fn with_files_committed(mut self, n: usize) -> Self {
283        self.files_committed = n;
284        self
285    }
286
287    /// Test-only setter — replace the recorded manifest parts (and adjust
288    /// `total_rows` / `bytes_written` / `files_produced` to keep them
289    /// consistent with the parts list, the way real production code does).
290    #[doc(hidden)]
291    #[allow(dead_code)]
292    pub fn with_manifest_parts(mut self, parts: Vec<crate::manifest::ManifestPart>) -> Self {
293        self.total_rows = parts.iter().map(|p| p.rows).sum();
294        self.bytes_written = parts.iter().map(|p| p.size_bytes).sum();
295        self.files_produced = parts.len();
296        self.files_committed = parts.len();
297        self.manifest_parts = parts;
298        self
299    }
300
301    /// Test-only setter — error_message + (optionally) status.  Common
302    /// shape for the "failed-run" fixtures that populated 4+ existing
303    /// stubs.
304    #[doc(hidden)]
305    #[allow(dead_code)]
306    pub fn with_error(mut self, msg: impl Into<String>) -> Self {
307        self.error_message = Some(msg.into());
308        self
309    }
310
311    /// Test-only setter — record a PlanResolved event in the journal so
312    /// downstream observability paths (`journal.plan_snapshot()`,
313    /// `RunReport::from_summary` plan_origin lookup) see the same shape
314    /// they would on a real run.
315    #[doc(hidden)]
316    #[allow(dead_code)]
317    pub fn with_plan_snapshot(mut self, snap: PlanSnapshot) -> Self {
318        self.journal.record(RunEvent::PlanResolved(snap));
319        self
320    }
321
322    pub(super) fn print(&self) {
323        // Capturing mode (IPC child or in-process channel): emit a
324        // `Finished` event and let the unified UI thread render the card.
325        // No stderr block here — the renderer owns the screen.
326        if ipc::capturing_events() {
327            ipc::emit_event(&ChildEvent::Finished {
328                export_name: self.export_name.clone(),
329                run_id: self.run_id.clone(),
330                status: self.status.clone(),
331                total_rows: self.total_rows,
332                files_produced: self.files_produced as u64,
333                bytes_written: self.bytes_written,
334                duration_ms: self.duration_ms,
335                peak_rss_mb: self.peak_rss_mb,
336                error_message: self.error_message.clone(),
337            });
338            return;
339        }
340
341        self.print_stderr_block();
342    }
343
344    /// Write the per-export summary block to stderr, bypassing IPC capture.
345    /// Used after the in-process card UI finishes on single-export sequential
346    /// runs so operators still get the detailed `── name ──` block below the
347    /// live card line.
348    pub(super) fn print_stderr_block(&self) {
349        let block = if multi_export_mode() {
350            self.render_compact()
351        } else {
352            // Render the whole block into a single buffer so the call site
353            // emits one `write_all` to stderr.  Without this, parallel
354            // exports could interleave individual lines from different
355            // `RunSummary::print()` calls — visible as garbled blocks in
356            // `--parallel-exports` runs.
357            self.render().trim_end_matches('\n').to_string()
358        };
359
360        use std::io::Write;
361        let mut buf = block;
362        buf.push('\n');
363        let stderr = std::io::stderr();
364        let mut handle = stderr.lock();
365        let _ = handle.write_all(buf.as_bytes());
366        let _ = handle.flush();
367    }
368
369    /// Compact one-line summary used when several exports run in the same
370    /// invocation.  Mirrors the parent_ui card line so `--parallel-exports`
371    /// (threads), sequential, and `--parallel-export-processes` (processes)
372    /// produce visually consistent per-export rows.
373    fn render_compact(&self) -> String {
374        const NAME_COL: usize = 22;
375        const MODE_COL: usize = 8;
376        let icon = match self.status.as_str() {
377            "success" => "✓",
378            "failed" => "✗",
379            _ => "•",
380        };
381        let body = if self.status == "failed" {
382            let err = self
383                .error_message
384                .as_deref()
385                .unwrap_or("(no error message recorded)");
386            let (cause, _) = strip_chunked_recovery_hint(err);
387            // Collapse multi-line / extremely long errors so the compact
388            // line stays one row tall.  Full payload lives in the stderr
389            // log above the run summary.
390            compact_error(cause)
391        } else {
392            let rss = if self.peak_rss_mb > 0 {
393                format!("  RSS {} MB", fmt_thousands(self.peak_rss_mb))
394            } else {
395                String::new()
396            };
397            format!(
398                "{} rows  {} files  {}  {}{}",
399                fmt_thousands(self.total_rows),
400                fmt_thousands(self.files_produced as i64),
401                format_bytes(self.bytes_written),
402                fmt_duration_ms(self.duration_ms),
403                rss
404            )
405        };
406        format!(
407            "{} {:<name$}  {:<mode$}  {}",
408            icon,
409            self.export_name,
410            self.mode,
411            body,
412            name = NAME_COL,
413            mode = MODE_COL,
414        )
415    }
416
417    /// Build the block as a string.  Public to the module so tests can assert
418    /// formatting without capturing stderr.
419    fn render(&self) -> String {
420        // Adaptive layout: collect (label, value) pairs that actually apply to
421        // this run, then pad labels to the longest one so columns line up
422        // *within* the block.  Header is a fixed width so consecutive blocks
423        // look uniform regardless of which optional fields are present.
424        let mut rows: Vec<(&'static str, String)> = Vec::with_capacity(16);
425        rows.push(("run_id", self.run_id.clone()));
426        let status_value = match (&self.status, &self.skip_reason) {
427            (s, Some(reason)) if s == "skipped" => format!("{s} ({reason})"),
428            (s, _) => s.clone(),
429        };
430        rows.push(("status", status_value));
431
432        let tuning_value = match self.batch_size_memory_mb {
433            Some(mem) => format!(
434                "profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
435                self.tuning_profile,
436                fmt_thousands(self.batch_size as i64),
437                mem
438            ),
439            None => format!(
440                "profile={}, batch_size={}",
441                self.tuning_profile,
442                fmt_thousands(self.batch_size as i64)
443            ),
444        };
445        rows.push(("tuning", tuning_value));
446
447        rows.push(("rows", fmt_thousands(self.total_rows)));
448        rows.push(("files", fmt_thousands(self.files_produced as i64)));
449        if self.bytes_written > 0 {
450            rows.push(("bytes", format_bytes(self.bytes_written)));
451        }
452        rows.push(("duration", fmt_duration_ms(self.duration_ms)));
453
454        if self.peak_rss_mb > 0 {
455            rows.push((
456                "peak RSS",
457                format!(
458                    "{} MB (sampled during run)",
459                    fmt_thousands(self.peak_rss_mb)
460                ),
461            ));
462        }
463        if let Some(temp) = self.pg_temp_bytes_delta {
464            // Skip when the cluster reported no delta — only chatter when there
465            // was actual spill. > 100 MB is annotated with a tuning hint;
466            // smaller numbers are reported as plain info.
467            if temp > 0 {
468                let temp_mb = temp as f64 / (1024.0 * 1024.0);
469                let label = if temp > 100 * 1024 * 1024 {
470                    format!(
471                        "{:.1} MB ⚠ shrink tuning.batch_size or set batch_size_memory_mb",
472                        temp_mb
473                    )
474                } else {
475                    format!("{:.1} MB", temp_mb)
476                };
477                rows.push(("pg temp spill", label));
478            }
479        }
480        if self.format == "parquet" && self.compression != "zstd" {
481            rows.push(("compression", self.compression.clone()));
482        }
483        if self.retries > 0 {
484            rows.push(("retries", self.retries.to_string()));
485        }
486        if let Some(v) = self.validated {
487            rows.push(("validated", if v { "pass".into() } else { "FAIL".into() }));
488        }
489        if let Some(sc) = self.schema_changed {
490            rows.push((
491                "schema",
492                if sc {
493                    "CHANGED".into()
494                } else {
495                    "unchanged".into()
496                },
497            ));
498        }
499        if let Some(q) = self.quality_passed {
500            rows.push(("quality", if q { "pass".into() } else { "FAIL".into() }));
501        }
502        if let Some(reconciled) = self.reconciled {
503            let src = self
504                .source_count
505                .map(fmt_thousands)
506                .unwrap_or_else(|| "?".into());
507            let exported = fmt_thousands(self.total_rows);
508            let value = if reconciled {
509                format!("MATCH ({exported}/{src})")
510            } else {
511                format!("MISMATCH (exported {exported} vs source {src})")
512            };
513            rows.push(("reconcile", value));
514        }
515        if let Some(err) = &self.error_message {
516            // Multi-line errors (e.g. `parallel checkpoint worker errors:\n
517            // chunk 4: …\nchunk 5: …`) wreak havoc on the indented block
518            // because `format_block` only knows how to indent the first
519            // line.  Collapse them to a compact single-line cause; the full
520            // multi-line text is already in the structured logs above.
521            rows.push(("error", compact_error(err)));
522        }
523
524        format_block(&self.export_name, &rows)
525    }
526
527    /// Sanity-check the post-run summary ↔ manifest_parts coherence. Used as
528    /// a `debug_assert!`-style runtime gate from `finalize_manifest` so any
529    /// future runner that bumps `bytes_written` / `files_committed` /
530    /// `files_produced` without going through `pipeline::commit::record_part`
531    /// is caught the moment it finishes a real export. Compiled out in
532    /// release builds via the `cfg!(debug_assertions)` guard at the call site.
533    ///
534    /// **Resume-safe inequalities only**: on resume, `manifest_parts` carries
535    /// prior runs' parts via `chunked::resume_m8` while `bytes_written` /
536    /// `files_committed` reflect only the current invocation — so strict
537    /// equality is wrong across resume boundaries. Strict equality on the
538    /// non-resume path is pinned by `pipeline::commit::tests`.
539    ///
540    /// Returns `Ok(())` when the summary satisfies the invariants, else an
541    /// `Err(String)` naming which one was violated and by how much.
542    pub fn check_post_run_invariants(&self) -> Result<(), String> {
543        let parts_bytes: u64 = self.manifest_parts.iter().map(|p| p.size_bytes).sum();
544
545        if self.files_committed > self.manifest_parts.len() {
546            return Err(format!(
547                "summary.files_committed ({}) > manifest_parts.len() ({}) — \
548                 a runner bumped files_committed without commit::record_part",
549                self.files_committed,
550                self.manifest_parts.len()
551            ));
552        }
553        if self.files_produced > self.manifest_parts.len() {
554            return Err(format!(
555                "summary.files_produced ({}) > manifest_parts.len() ({}) — \
556                 a runner bumped files_produced without commit::record_part",
557                self.files_produced,
558                self.manifest_parts.len()
559            ));
560        }
561        if self.bytes_written > parts_bytes {
562            return Err(format!(
563                "summary.bytes_written ({}) > sum(manifest_parts.size_bytes) ({}) — \
564                 a runner bumped bytes_written without commit::record_part",
565                self.bytes_written, parts_bytes
566            ));
567        }
568        if self.status == "success" && self.files_committed > 0 && self.manifest_parts.is_empty() {
569            return Err(format!(
570                "success run with files_committed={} has empty manifest_parts — \
571                 cloud manifest (ADR-0012 M1) would ship with no part list \
572                 (this is the gap parallel_checkpoint had before commit e9b0796)",
573                self.files_committed
574            ));
575        }
576        // Invariant audit gap #1, weak form: a successful run that produced
577        // rows for THIS invocation must have committed at least one file.
578        // The strict form ("rows_written <= rows_read") would require a
579        // separate source-side row counter we do not track, and concurrent
580        // INSERTs on the source (live_oltp_load) make a source_count
581        // comparison brittle. This weak form catches a fabrication shape:
582        // total_rows accumulated but nothing reached the destination — a
583        // runner that fetched and silently dropped rows produces exactly
584        // this signature. Resume-safe: total_rows reflects only this
585        // invocation, so a resume with no work to do legitimately ends at
586        // total_rows=0 / files_committed=0 and the guard does not fire.
587        if self.status == "success" && self.total_rows > 0 && self.files_committed == 0 {
588            return Err(format!(
589                "summary.total_rows={} but files_committed=0 — rows extracted from \
590                 source but no files committed (no output reached the destination)",
591                self.total_rows
592            ));
593        }
594        Ok(())
595    }
596}
597
598/// Reduce a possibly-multi-line execution error to a single-line, bounded-
599/// length cause suitable for the per-export summary block and the compact
600/// one-liner.  Keeps the user-actionable bit and drops noisy diagnostic
601/// payloads (long URLs, query strings, repeated chunk errors).
602///
603/// Recognised shapes:
604/// - `parallel checkpoint worker errors:\nchunk N: <msg>\nchunk M: <msg>` →
605///   `parallel checkpoint workers failed: K chunk(s) (chunk N: <truncated>)`.
606///   The full per-chunk detail is already in stderr logs.
607/// - Generic multi-line: newlines are replaced with `; ` and the result is
608///   clamped to 240 characters with an ellipsis.
609fn compact_error(raw: &str) -> String {
610    const MAX_CHARS: usize = 240;
611    if let Some(summary) = summarize_parallel_chunk_errors(raw) {
612        return clamp_chars(&summary, MAX_CHARS);
613    }
614    let collapsed: String = raw
615        .lines()
616        .map(str::trim_end)
617        .filter(|s| !s.is_empty())
618        .collect::<Vec<_>>()
619        .join("; ");
620    clamp_chars(&collapsed, MAX_CHARS)
621}
622
623fn summarize_parallel_chunk_errors(raw: &str) -> Option<String> {
624    let header_pos = raw.find("parallel checkpoint worker errors:")?;
625    let prefix = raw[..header_pos].trim_end_matches(": ").trim_end();
626    let tail = &raw[header_pos + "parallel checkpoint worker errors:".len()..];
627
628    let chunk_lines: Vec<&str> = tail
629        .lines()
630        .map(str::trim)
631        .filter(|l| l.starts_with("chunk "))
632        .collect();
633    if chunk_lines.is_empty() {
634        return None;
635    }
636    let first_chunk_full = chunk_lines[0];
637    // Truncate the example chunk message; the URL/payload is in stderr logs.
638    let first_chunk_short = clamp_chars(first_chunk_full, 140);
639    let prefix = if prefix.is_empty() {
640        String::new()
641    } else {
642        format!("{}: ", prefix)
643    };
644    Some(format!(
645        "{}parallel checkpoint workers failed: {} chunk(s) ({}); see stderr for full payloads",
646        prefix,
647        chunk_lines.len(),
648        first_chunk_short
649    ))
650}
651
652fn clamp_chars(s: &str, max_chars: usize) -> String {
653    if max_chars == 0 {
654        return String::new();
655    }
656    if s.chars().count() <= max_chars {
657        return s.to_string();
658    }
659    let keep = max_chars.saturating_sub(1);
660    let mut out: String = s.chars().take(keep).collect();
661    out.push('…');
662    out
663}
664
665/// Render a `── name ─────…─` header plus one indented `label:  value` line
666/// per row, all joined into a single string ending with `\n`.
667fn format_block(name: &str, rows: &[(&str, String)]) -> String {
668    const HEADER_WIDTH: usize = 60;
669    let label_w = rows.iter().map(|(l, _)| l.len()).max().unwrap_or(0);
670
671    let prefix = format!("── {} ", name);
672    let prefix_chars = prefix.chars().count();
673    let dashes = HEADER_WIDTH.saturating_sub(prefix_chars);
674    let mut out = String::with_capacity(HEADER_WIDTH * (rows.len() + 3));
675    out.push('\n');
676    out.push_str(&prefix);
677    for _ in 0..dashes {
678        out.push('─');
679    }
680    out.push('\n');
681    for (label, value) in rows {
682        // `label_w + 1` so the colon stays attached to the label and the
683        // value column starts uniformly two spaces after it.
684        out.push_str(&format!(
685            "  {:<width$}  {}\n",
686            format!("{label}:"),
687            value,
688            width = label_w + 1
689        ));
690    }
691    out
692}
693
694fn fmt_duration_ms(ms: i64) -> String {
695    if ms < 1000 {
696        return format!("{}ms", ms);
697    }
698    let total_secs = ms / 1000;
699    let h = total_secs / 3600;
700    let m = (total_secs % 3600) / 60;
701    let s_frac = (ms % 60_000) as f64 / 1000.0;
702    if h > 0 {
703        format!("{}h {:02}m {:04.1}s", h, m, s_frac)
704    } else if m > 0 {
705        format!("{}m {:04.1}s", m, s_frac)
706    } else {
707        format!("{:.1}s", ms as f64 / 1000.0)
708    }
709}
710
711/// Format integers with a comma every three digits.  Negative values keep
712/// their sign.  Used for rows / files / batch_size so large numbers stay
713/// readable: `39_990_376` → `39,990,376`.
714fn fmt_thousands(n: i64) -> String {
715    let abs = n.unsigned_abs();
716    let s = abs.to_string();
717    let bytes = s.as_bytes();
718    let mut out = String::with_capacity(s.len() + s.len() / 3 + 1);
719    if n < 0 {
720        out.push('-');
721    }
722    for (i, b) in bytes.iter().enumerate() {
723        let from_end = bytes.len() - i;
724        if i > 0 && from_end.is_multiple_of(3) {
725            out.push(',');
726        }
727        out.push(*b as char);
728    }
729    out
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735
736    #[test]
737    fn fmt_thousands_handles_small_and_large() {
738        assert_eq!(fmt_thousands(0), "0");
739        assert_eq!(fmt_thousands(7), "7");
740        assert_eq!(fmt_thousands(999), "999");
741        assert_eq!(fmt_thousands(1_000), "1,000");
742        assert_eq!(fmt_thousands(1_000_908), "1,000,908");
743        assert_eq!(fmt_thousands(39_990_376), "39,990,376");
744        assert_eq!(fmt_thousands(-1_234), "-1,234");
745        assert_eq!(fmt_thousands(i64::MAX), "9,223,372,036,854,775,807");
746    }
747
748    #[test]
749    fn fmt_duration_picks_unit() {
750        assert_eq!(fmt_duration_ms(0), "0ms");
751        assert_eq!(fmt_duration_ms(800), "800ms");
752        assert_eq!(fmt_duration_ms(1_500), "1.5s");
753        assert_eq!(fmt_duration_ms(68_400), "1m 08.4s");
754        assert_eq!(fmt_duration_ms(3_725_300), "1h 02m 05.3s");
755    }
756
757    #[test]
758    fn format_block_pads_labels_uniformly() {
759        let rows = vec![
760            ("run_id", "abc".to_string()),
761            ("rows", "42".to_string()),
762            ("compression", "zstd".to_string()),
763        ];
764        let out = format_block("orders", &rows);
765
766        // Each value column starts at the same character position.
767        let lines: Vec<&str> = out.lines().filter(|l| l.contains(':')).collect();
768        assert_eq!(lines.len(), 3);
769        let value_starts: Vec<usize> = lines
770            .iter()
771            .map(|l| l.find(':').unwrap() + l[l.find(':').unwrap()..].find(' ').unwrap())
772            .collect();
773        // The value (after `label:` plus padding plus two spaces) starts at the
774        // same column for every row.  We verify by checking all lines have the
775        // value substring at the same byte offset.
776        let value_col = lines[0].rfind("abc").unwrap();
777        assert_eq!(lines[1].rfind("42").unwrap(), value_col);
778        assert_eq!(lines[2].rfind("zstd").unwrap(), value_col);
779        // Sanity: silence unused.
780        let _ = value_starts;
781    }
782
783    #[test]
784    fn format_block_header_has_consistent_width() {
785        let block_a = format_block("a", &[("rows", "1".into())]);
786        let block_b = format_block("orders_table_xyz", &[("rows", "1".into())]);
787        let header_a = block_a.lines().nth(1).unwrap();
788        let header_b = block_b.lines().nth(1).unwrap();
789        assert_eq!(
790            header_a.chars().count(),
791            header_b.chars().count(),
792            "headers must be the same width regardless of name length: {:?} vs {:?}",
793            header_a,
794            header_b
795        );
796    }
797
798    #[test]
799    fn render_produces_a_single_string_with_trailing_newline() {
800        use crate::plan::{
801            CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
802            MetaColumns, ResolvedRunPlan,
803        };
804        use crate::tuning::SourceTuning;
805        let plan = ResolvedRunPlan {
806            export_name: "orders".into(),
807            base_query: "SELECT 1".into(),
808            strategy: ExtractionStrategy::Snapshot,
809            format: FormatType::Parquet,
810            compression: CompressionType::default(),
811            compression_level: None,
812            max_file_size_bytes: None,
813            skip_empty: false,
814            meta_columns: MetaColumns::default(),
815            destination: DestinationConfig {
816                destination_type: DestinationType::Local,
817                path: Some("./out".into()),
818                ..Default::default()
819            },
820            quality: None,
821            tuning: SourceTuning::from_config(None),
822            tuning_profile_label: "balanced (default)".into(),
823            validate: false,
824            reconcile: false,
825            resume: false,
826            source: crate::config::SourceConfig {
827                source_type: crate::config::SourceType::Postgres,
828                url: Some("postgresql://localhost/test".into()),
829                url_env: None,
830                url_file: None,
831                host: None,
832                port: None,
833                user: None,
834                password: None,
835                password_env: None,
836                database: None,
837                environment: None,
838                tuning: None,
839                tls: None,
840            },
841            column_overrides: Default::default(),
842            schema_drift_policy: Default::default(),
843            shape_drift_warn_factor: 2.0,
844            parquet: None,
845        };
846        let mut s = RunSummary::new(&plan);
847        s.status = "success".into();
848        s.total_rows = 1_000_908;
849        s.files_produced = 11;
850        s.bytes_written = 32 * 1024 * 1024 + 400 * 1024;
851        s.duration_ms = 68_400;
852        s.peak_rss_mb = 884;
853
854        let block = s.render();
855        assert!(
856            block.starts_with('\n'),
857            "block should start with a blank line"
858        );
859        assert!(block.ends_with('\n'), "block should end with a newline");
860        assert!(block.contains("── orders "));
861        assert!(
862            block.contains("1,000,908"),
863            "rows should be formatted with thousands separator: {}",
864            block
865        );
866        assert!(block.contains("1m 08.4s"), "duration formatting: {}", block);
867        // No raw progress-bar bleed: header dashes still present, no carriage
868        // returns or escape sequences.
869        assert!(!block.contains('\r'));
870
871        // Compact one-liner used in multi-export runs.
872        let line = s.render_compact();
873        assert!(line.starts_with("✓ "), "success icon present: {:?}", line);
874        assert!(line.contains("orders"), "export name present: {:?}", line);
875        assert!(line.contains("1,000,908 rows"), "rows present: {:?}", line);
876        assert!(line.contains("32.4 MB"), "bytes present: {:?}", line);
877        assert!(line.contains("1m 08.4s"), "duration present: {:?}", line);
878        assert!(line.contains("RSS 884 MB"), "rss present: {:?}", line);
879        assert!(!line.contains('\n'), "single line: {:?}", line);
880    }
881
882    #[test]
883    fn compact_error_summarises_parallel_chunk_errors() {
884        let raw = "export 'page_views': parallel checkpoint worker errors:\n\
885                   chunk 4: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202442_chunk4.parquet?partNumber=1&uploadId=ABPnzm7RqplA, called: http_util::Client::send } => send http request, source: error sending request: client error (SendRequest): dispatch task is gone\n\
886                   chunk 5: Unexpected (temporary) at write, context: { url: https://storage.googleapis.com/rivet_data_test/exports%2Fpage_views%2Fpage_views_20260430_202443_chunk5.parquet?partNumber=1&uploadId=ABPnzm6q, called: http_util::Client::send } => send http request, source: dispatch task is gone";
887        let out = compact_error(raw);
888        assert!(
889            out.contains("2 chunk(s)"),
890            "should report number of failed chunks: {:?}",
891            out
892        );
893        assert!(
894            out.starts_with("export 'page_views': parallel checkpoint workers failed:"),
895            "should keep export prefix and use compact phrasing: {:?}",
896            out
897        );
898        assert!(
899            out.contains("chunk 4:"),
900            "should include the first chunk as an example: {:?}",
901            out
902        );
903        assert!(!out.contains('\n'), "single line output: {:?}", out);
904        assert!(
905            out.chars().count() <= 240,
906            "must be clamped to <=240 chars, got {}: {:?}",
907            out.chars().count(),
908            out
909        );
910    }
911
912    #[test]
913    fn compact_error_collapses_generic_multiline() {
914        let raw = "first line of trouble\nsecond line with detail\n\nthird line\n";
915        let out = compact_error(raw);
916        assert_eq!(
917            out, "first line of trouble; second line with detail; third line",
918            "newlines should collapse to '; ' and blanks dropped"
919        );
920    }
921
922    #[test]
923    fn compact_error_clamps_excessively_long_lines() {
924        let raw = "x".repeat(1_000);
925        let out = compact_error(&raw);
926        assert_eq!(out.chars().count(), 240);
927        assert!(out.ends_with('…'));
928    }
929
930    #[test]
931    fn render_compact_strips_chunked_recovery_hint_for_failed() {
932        use crate::plan::{
933            CompressionType, DestinationConfig, DestinationType, ExtractionStrategy, FormatType,
934            MetaColumns, ResolvedRunPlan,
935        };
936        use crate::tuning::SourceTuning;
937        let plan = ResolvedRunPlan {
938            export_name: "events".into(),
939            base_query: "SELECT 1".into(),
940            strategy: ExtractionStrategy::Snapshot,
941            format: FormatType::Parquet,
942            compression: CompressionType::default(),
943            compression_level: None,
944            max_file_size_bytes: None,
945            skip_empty: false,
946            meta_columns: MetaColumns::default(),
947            destination: DestinationConfig {
948                destination_type: DestinationType::Local,
949                path: Some("./out".into()),
950                ..Default::default()
951            },
952            quality: None,
953            tuning: SourceTuning::from_config(None),
954            tuning_profile_label: "balanced (default)".into(),
955            validate: false,
956            reconcile: false,
957            resume: false,
958            source: crate::config::SourceConfig {
959                source_type: crate::config::SourceType::Postgres,
960                url: Some("postgresql://localhost/test".into()),
961                url_env: None,
962                url_file: None,
963                host: None,
964                port: None,
965                user: None,
966                password: None,
967                password_env: None,
968                database: None,
969                environment: None,
970                tuning: None,
971                tls: None,
972            },
973            column_overrides: Default::default(),
974            schema_drift_policy: Default::default(),
975            shape_drift_warn_factor: 2.0,
976            parquet: None,
977        };
978        let mut s = RunSummary::new(&plan);
979        s.status = "failed".into();
980        s.error_message = Some(
981            "export 'events': --resume but no in-progress chunk checkpoint; \
982             run without --resume first or `rivet state reset-chunks --config x.yaml --export events`"
983                .to_string(),
984        );
985
986        let line = s.render_compact();
987        assert!(line.starts_with("✗ "), "failure icon: {:?}", line);
988        assert!(line.contains("events"), "name present: {:?}", line);
989        assert!(
990            line.contains("--resume but no in-progress chunk checkpoint"),
991            "cause kept: {:?}",
992            line
993        );
994        assert!(
995            !line.contains("rivet state reset-chunks"),
996            "recovery hint should be stripped from per-export line: {:?}",
997            line
998        );
999        assert!(!line.contains('\n'), "single line: {:?}", line);
1000    }
1001}