Skip to main content

rivet/pipeline/
cli.rs

1//! **Layer: Observability**
2//!
3//! CLI display commands for state, metrics, files, chunk checkpoints, and journal history.
4//! Reads from the state stores and formats output — no execution or persistence writes.
5
6use super::format_bytes;
7use crate::config::Config;
8use crate::error::Result;
9use crate::journal::RunEvent;
10use crate::state::StateStore;
11
12/// Validate the `--config` path BEFORE opening the state store.
13///
14/// Inspect commands (`state show/files/chunks/progression`, `metrics`,
15/// `journal`) use `config_path` only to *locate* the SQLite `.rivet_state.db`
16/// next to it; they never parse the YAML. So a missing or garbage path used to
17/// silently open an empty/foreign DB and exit 0 (and `StateStore::open` would
18/// materialize a fresh `.rivet_state.db` beside the phantom path). Loading the
19/// config first turns a bad path into a clear non-zero error — exactly as
20/// `run`/`check` already do — and avoids littering a state DB. The parsed
21/// config is returned so callers that also need the declared export names
22/// (e.g. `reset-chunks`) reuse the same load.
23fn require_config(config_path: &str) -> Result<Config> {
24    Config::load(config_path)
25}
26
27/// Bail if `export_name` is not declared in `config`, listing the known names.
28///
29/// Inspect commands (`metrics`, `journal`) that filter by a single export must
30/// not let a typo'd `--export` look like "this export simply has not run yet" —
31/// both used to print the same empty-state line. With the config now loaded, an
32/// unknown name is a clear error that points at the declared exports.
33fn require_known_export(config: &Config, config_path: &str, export_name: &str) -> Result<()> {
34    if config.exports.iter().any(|e| e.name == export_name) {
35        return Ok(());
36    }
37    let known: Vec<&str> = config.exports.iter().map(|e| e.name.as_str()).collect();
38    anyhow::bail!(
39        "export '{}' is not defined in '{}'.\n  Known exports: {}",
40        export_name,
41        config_path,
42        if known.is_empty() {
43            "(none defined)".to_string()
44        } else {
45            known.join(", ")
46        },
47    );
48}
49
50pub fn show_state(config_path: &str, json: bool) -> Result<()> {
51    require_config(config_path)?;
52    let state = StateStore::open(config_path)?;
53    let states = state.list_all()?;
54    if json {
55        // Incremental-cursor rows; serialize directly. Empty → `[]` (the text
56        // path's "no cursor / never ran" guidance is operator help, not data).
57        println!("{}", serde_json::to_string_pretty(&states)?);
58        return Ok(());
59    }
60    if states.is_empty() {
61        // `state.list_all()` returns only incremental-cursor rows. Chunked
62        // and full runs land in different tables (`export_metrics`,
63        // `export_files`, `chunk_state`), so an empty cursor list does not
64        // mean "this config never ran". Distinguish the two cases:
65        //   - never ran    → tell the operator to run first
66        //   - ran chunked  → point at `rivet metrics` / `rivet state files`
67        // Anything else here is misleading ("No state" after a successful
68        // chunked run sounds like data loss).
69        let any_run = state
70            .get_metrics(None, 1)
71            .map(|m| !m.is_empty())
72            .unwrap_or(false);
73        if any_run {
74            println!(
75                "No incremental cursor recorded yet.\n  \
76                 This command shows incremental-mode cursors only.\n  \
77                 For chunked / full runs, see:\n  \
78                 • rivet metrics      — per-run history (status, rows, duration)\n  \
79                 • rivet state files  — every produced file with row count + size"
80            );
81        } else {
82            println!(
83                "No exports have been run yet.\n  \
84                 Run `rivet run --config {}` first, then try `rivet state show` again.",
85                config_path
86            );
87        }
88        return Ok(());
89    }
90    println!("{:<30} {:<40} LAST RUN", "EXPORT", "LAST CURSOR");
91    println!("{}", "-".repeat(90));
92    for s in &states {
93        println!(
94            "{:<30} {:<40} {}",
95            s.export_name,
96            s.last_cursor_value.as_deref().unwrap_or("-"),
97            s.last_run_at.as_deref().unwrap_or("-"),
98        );
99    }
100    Ok(())
101}
102
103/// Epic G / ADR-0008 — explicit committed / verified boundaries per export.
104pub fn show_progression(config_path: &str, export_name: Option<&str>) -> Result<()> {
105    require_config(config_path)?;
106    let state = StateStore::open(config_path)?;
107    let entries = match export_name {
108        Some(name) => vec![state.get_progression(name)?],
109        None => state.list_progression()?,
110    };
111    let has_any = entries
112        .iter()
113        .any(|p| p.committed.is_some() || p.verified.is_some());
114    if !has_any {
115        println!("No progression boundaries recorded yet.");
116        return Ok(());
117    }
118
119    println!(
120        "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
121        "EXPORT", "COMM MODE", "COMMITTED", "COMMITTED AT", "VERI MODE", "VERIFIED"
122    );
123    println!("{}", "-".repeat(145));
124    for p in &entries {
125        let (c_mode, c_val, c_at) = match &p.committed {
126            Some(b) => (
127                b.strategy.as_str().to_string(),
128                boundary_value(b),
129                b.at.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
130            ),
131            None => ("-".into(), "-".into(), "-".into()),
132        };
133        let (v_mode, v_val) = match &p.verified {
134            Some(b) => (b.strategy.as_str().to_string(), boundary_value(b)),
135            None => ("-".into(), "-".into()),
136        };
137        println!(
138            "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
139            p.export_name, c_mode, c_val, c_at, v_mode, v_val
140        );
141    }
142    Ok(())
143}
144
145fn boundary_value(b: &crate::state::Boundary) -> String {
146    if let Some(c) = &b.cursor {
147        c.clone()
148    } else if let Some(idx) = b.chunk_index {
149        format!("chunk #{idx}")
150    } else {
151        "-".into()
152    }
153}
154
155pub fn reset_state(config_path: &str, export_name: &str) -> Result<()> {
156    // Validate the export name against the config BEFORE touching state, so a
157    // typo (`--export pa_audi` instead of `pa_audit`) produces a hint with the
158    // declared names instead of silently DELETE-ing zero rows and printing
159    // "State reset for export 'pa_audi'" as if it worked.  Without this guard
160    // a stray `--export <unknown>` looked like success but did nothing — a
161    // genuine ops footgun under "I reset it, why did the cursor not move".
162    let config = crate::config::Config::load(config_path)?;
163    if !config.exports.iter().any(|e| e.name == export_name) {
164        let known: Vec<String> = config.exports.iter().map(|e| e.name.clone()).collect();
165        anyhow::bail!(
166            "export '{}' not found in config '{}'.\n  Known exports: {}\n  Hint: check the spelling, or run `rivet state show -c {}` to see what is currently tracked.",
167            export_name,
168            config_path,
169            if known.is_empty() {
170                "(none defined)".to_string()
171            } else {
172                known.join(", ")
173            },
174            config_path,
175        );
176    }
177    let state = StateStore::open(config_path)?;
178    state.reset(export_name)?;
179    println!("State reset for export '{}'", export_name);
180    Ok(())
181}
182
183pub fn show_files(
184    config_path: &str,
185    export_name: Option<&str>,
186    limit: usize,
187    json: bool,
188) -> Result<()> {
189    require_config(config_path)?;
190    let state = StateStore::open(config_path)?;
191    let files = state.get_files(export_name, limit)?;
192    if json {
193        // FileRecord is a stable inspect row — serialize it directly. Empty → `[]`
194        // (valid JSON) so a CI completeness check never special-cases.
195        println!("{}", serde_json::to_string_pretty(&files)?);
196        return Ok(());
197    }
198    if files.is_empty() {
199        println!("No files recorded yet.");
200        return Ok(());
201    }
202    // run_ids are `{export}_{%Y%m%dT%H%M%S%3f}` (timestamp alone is 18 chars),
203    // so a column narrower than the longest export name + 19 wraps and breaks
204    // alignment. 40 fits the common case; longer names overflow gracefully.
205    println!(
206        "{:<40} {:<40} {:>8} {:>10} CREATED",
207        "RUN ID", "FILE", "ROWS", "BYTES"
208    );
209    println!("{}", "-".repeat(115));
210    for f in &files {
211        println!(
212            "{:<40} {:<40} {:>8} {:>10} {}",
213            f.run_id,
214            f.file_name,
215            f.row_count,
216            format_bytes(f.bytes as u64),
217            f.created_at,
218        );
219    }
220    Ok(())
221}
222
223pub fn show_metrics(
224    config_path: &str,
225    export_name: Option<&str>,
226    limit: usize,
227    json: bool,
228) -> Result<()> {
229    let config = require_config(config_path)?;
230    // A typo'd `--export` must not masquerade as "no runs yet". Now that the
231    // config is loaded, check the requested name against the declared exports
232    // and bail with the known names — otherwise an unknown export and an
233    // unrun one both print "No metrics recorded yet.".
234    if let Some(name) = export_name {
235        require_known_export(&config, config_path, name)?;
236    }
237    let state = StateStore::open(config_path)?;
238    let metrics = state.get_metrics(export_name, limit)?;
239    if json {
240        // Reuse the run aggregate's serializable DTO so `metrics --json` and the
241        // run summary's `--json` agree field-for-field. Empty → `[]` (valid JSON),
242        // not the text "no metrics" line, so a CI consumer never special-cases.
243        let rows: Vec<super::aggregate::MetricRowJson> = metrics
244            .iter()
245            .map(super::aggregate::MetricRowJson::from)
246            .collect();
247        println!("{}", serde_json::to_string_pretty(&rows)?);
248        return Ok(());
249    }
250    if metrics.is_empty() {
251        println!("No metrics recorded yet.");
252        return Ok(());
253    }
254    println!(
255        "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} RUN ID",
256        "EXPORT", "STATUS", "ROWS", "DURATION", "RSS", "FILES", "BYTES"
257    );
258    println!("{}", "-".repeat(110));
259    for m in &metrics {
260        let duration = if m.duration_ms >= 1000 {
261            format!("{:.1}s", m.duration_ms as f64 / 1000.0)
262        } else {
263            format!("{}ms", m.duration_ms)
264        };
265        let rss = m
266            .peak_rss_mb
267            .map(|r| format!("{}MB", r))
268            .unwrap_or_else(|| "-".into());
269        let bytes = if m.bytes_written > 0 {
270            format_bytes(m.bytes_written as u64)
271        } else {
272            "-".into()
273        };
274        let run_id = m.run_id.as_deref().unwrap_or(&m.run_at);
275        println!(
276            "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} {}",
277            m.export_name, m.status, m.total_rows, duration, rss, m.files_produced, bytes, run_id
278        );
279        if let Some(err) = &m.error_message {
280            println!("  Error: {}", err);
281        }
282        let mut flags = Vec::new();
283        if m.retries > 0 {
284            flags.push(format!("retries={}", m.retries));
285        }
286        if let Some(v) = m.validated {
287            flags.push(format!("validated={}", if v { "pass" } else { "FAIL" }));
288        }
289        if let Some(sc) = m.schema_changed {
290            flags.push(format!("schema={}", if sc { "CHANGED" } else { "ok" }));
291        }
292        if !flags.is_empty() {
293            println!("  {}", flags.join("  "));
294        }
295    }
296    Ok(())
297}
298
299pub fn reset_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
300    // Parity with `reset_state`: validate the export name against the config
301    // BEFORE touching state, so a typo (`-e pa_audi` for `pa_audit`) errors with
302    // the declared names instead of silently "Removed 0 chunk run record(s)"
303    // (rc=0) — which looked like the resume was abandoned when it was not.
304    let config = require_config(config_path)?;
305    if !config.exports.iter().any(|e| e.name == export_name) {
306        let known: Vec<String> = config.exports.iter().map(|e| e.name.clone()).collect();
307        anyhow::bail!(
308            "export '{}' not found in config '{}'.\n  Known exports: {}\n  Hint: check the spelling, or run `rivet state chunks -c {} -e <name>` to inspect a checkpoint.",
309            export_name,
310            config_path,
311            if known.is_empty() {
312                "(none defined)".to_string()
313            } else {
314                known.join(", ")
315            },
316            config_path,
317        );
318    }
319    let state = StateStore::open(config_path)?;
320    let n = state.reset_chunk_checkpoint(export_name)?;
321    // Abandoning the resume also clears the committed/verified boundary, so
322    // `rivet state progression` does not report a stale chunk boundary after
323    // the chunk_run/chunk_task rows are gone (parity with `state reset`).
324    state.delete_progression(export_name)?;
325    println!(
326        "Removed {} chunk run record(s) for export '{}'.",
327        n, export_name
328    );
329    Ok(())
330}
331
332/// Clear chunk checkpoints for every export **named in `config_path`'s YAML** that currently has an
333/// `in_progress` chunk run (`chunk_run.status = 'in_progress'`).
334///
335/// Names present only in state (removed from config) are skipped with a printed note.
336pub fn reset_chunk_checkpoints_stuck(config_path: &str) -> Result<()> {
337    let cfg = Config::load(config_path)?;
338    let allowed: std::collections::HashSet<&str> =
339        cfg.exports.iter().map(|e| e.name.as_str()).collect();
340    let state = StateStore::open(config_path)?;
341    let stuck = state.list_export_names_with_in_progress_chunk_runs()?;
342    if stuck.is_empty() {
343        println!("No exports have an in-progress chunk checkpoint run.");
344        println!(
345            "(Nothing with chunk_run.status = 'in_progress' in {}.)",
346            StateStore::state_db_path(config_path).display()
347        );
348        return Ok(());
349    }
350
351    let mut skipped_not_in_config = Vec::new();
352    let mut targets = Vec::new();
353    for name in stuck {
354        if allowed.contains(name.as_str()) {
355            targets.push(name);
356        } else {
357            skipped_not_in_config.push(name);
358        }
359    }
360
361    for name in &skipped_not_in_config {
362        println!(
363            "Skipping '{}' — chunk checkpoint still in_progress but this export is not in the config.",
364            name
365        );
366    }
367
368    if targets.is_empty() {
369        println!(
370            "No matching exports to reset (none of the in-progress runs belong to exports in this config)."
371        );
372        return Ok(());
373    }
374
375    println!(
376        "Resetting chunk checkpoints for {} export(s) with in_progress runs: {}",
377        targets.len(),
378        targets.join(", ")
379    );
380
381    for name in targets {
382        let n = state.reset_chunk_checkpoint(&name)?;
383        println!("Removed {} chunk run record(s) for export '{}'.", n, name);
384    }
385    Ok(())
386}
387
388pub fn show_chunk_checkpoint(config_path: &str, export_name: &str, json: bool) -> Result<()> {
389    require_config(config_path)?;
390    let state = StateStore::open(config_path)?;
391    if json {
392        // Composite (run header + per-chunk tasks) built inline so no state-layer
393        // type needs `Serialize`. No checkpoint → `null` (valid JSON).
394        let report = match state.get_latest_chunk_run(export_name)? {
395            None => serde_json::Value::Null,
396            Some((run_id, plan_hash, status, updated_at)) => {
397                let tasks: Vec<serde_json::Value> = state
398                    .list_chunk_tasks_for_run(&run_id)?
399                    .iter()
400                    .map(|t| {
401                        serde_json::json!({
402                            "chunk_index": t.chunk_index,
403                            "status": t.status,
404                            "start_key": t.start_key,
405                            "end_key": t.end_key,
406                            "attempts": t.attempts,
407                            "rows_written": t.rows_written,
408                            "file_name": t.file_name,
409                            "last_error": t.last_error,
410                        })
411                    })
412                    .collect();
413                serde_json::json!({
414                    "export": export_name,
415                    "run_id": run_id,
416                    "plan_hash": plan_hash,
417                    "status": status,
418                    "updated_at": updated_at,
419                    "tasks": tasks,
420                })
421            }
422        };
423        println!("{}", serde_json::to_string_pretty(&report)?);
424        return Ok(());
425    }
426    println!(
427        "database:   {}",
428        StateStore::state_db_path(config_path).display()
429    );
430    let Some((run_id, plan_hash, status, updated_at)) = state.get_latest_chunk_run(export_name)?
431    else {
432        println!("No chunk checkpoint data for export '{}'.", export_name);
433        return Ok(());
434    };
435    println!("export:     {}", export_name);
436    println!("run_id:     {}", run_id);
437    println!("plan_hash:  {}", plan_hash);
438    println!("status:     {}", status);
439    println!("updated_at: {}", updated_at);
440    println!();
441    println!(
442        "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} FILE",
443        "IDX", "STATUS", "START", "END", "ATT", "ROWS"
444    );
445    println!("{}", "-".repeat(90));
446    for t in state.list_chunk_tasks_for_run(&run_id)? {
447        let file = t.file_name.as_deref().unwrap_or("-");
448        let rows = t
449            .rows_written
450            .map(|r| r.to_string())
451            .unwrap_or_else(|| "-".into());
452        println!(
453            "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} {}",
454            t.chunk_index, t.status, t.start_key, t.end_key, t.attempts, rows, file
455        );
456        if let Some(e) = &t.last_error {
457            println!("       error: {}", e);
458        }
459    }
460    Ok(())
461}
462
463/// Display recent run journal entries for an export.
464///
465/// Shows up to `limit` most recent runs (newest first), each as a compact
466/// block: run header + per-event summary lines.
467pub fn show_journal(
468    config_path: &str,
469    export_name: &str,
470    limit: usize,
471    run_id: Option<&str>,
472) -> Result<()> {
473    let config = require_config(config_path)?;
474    // Same gap as `metrics`: when querying by export name (no `--run-id`), a
475    // typo would print "No journal entries for export 'X' yet." as if the
476    // export were merely unrun. Validate the name against the config first.
477    // The `--run-id` path looks up by id directly, so the export name is not
478    // the lookup key there and is left unchecked.
479    if run_id.is_none() {
480        require_known_export(&config, config_path, export_name)?;
481    }
482    let state = StateStore::open(config_path)?;
483
484    let journals = if let Some(rid) = run_id {
485        match state.load_journal(rid)? {
486            Some(j) => vec![j],
487            None => {
488                println!("No journal found for run_id '{rid}'.");
489                return Ok(());
490            }
491        }
492    } else {
493        state.recent_journals(export_name, limit)?
494    };
495
496    if journals.is_empty() {
497        println!("No journal entries for export '{export_name}' yet.");
498        println!("Journals are recorded after each `rivet run`.");
499        return Ok(());
500    }
501
502    for journal in &journals {
503        // ── run header ────────────────────────────────────────────────────
504        let outcome = journal.final_outcome().and_then(|e| {
505            if let RunEvent::RunCompleted {
506                status,
507                duration_ms,
508                ..
509            } = &e.event
510            {
511                Some((status.as_str(), *duration_ms))
512            } else {
513                None
514            }
515        });
516        let (status_str, duration_str) = match outcome {
517            Some((s, ms)) if ms >= 1000 => (s, format!("{:.1}s", ms as f64 / 1000.0)),
518            Some((s, ms)) => (s, format!("{ms}ms")),
519            None => ("(incomplete)", String::new()),
520        };
521        let icon = match status_str {
522            "success" => "✓",
523            "failed" => "✗",
524            _ => "•",
525        };
526        println!(
527            "\n{icon} {export}  {status}  {dur}",
528            export = journal.export_name,
529            status = status_str,
530            dur = duration_str,
531        );
532        println!("  run_id: {}", journal.run_id);
533
534        // ── event summary lines ────────────────────────────────────────────
535        let files = journal.files();
536        if !files.is_empty() {
537            let total_rows: i64 = files
538                .iter()
539                .filter_map(|e| {
540                    if let RunEvent::FileWritten { rows, .. } = &e.event {
541                        Some(*rows)
542                    } else {
543                        None
544                    }
545                })
546                .sum();
547            let total_bytes: u64 = files
548                .iter()
549                .filter_map(|e| {
550                    if let RunEvent::FileWritten { bytes, .. } = &e.event {
551                        Some(*bytes)
552                    } else {
553                        None
554                    }
555                })
556                .sum();
557            println!(
558                "  files:  {}  rows: {}  size: {}",
559                files.len(),
560                total_rows,
561                format_bytes(total_bytes),
562            );
563            // List each produced file by name — the aggregate count alone hides
564            // *which* objects landed, so an operator could not cross-check the
565            // journal against the destination / manifest. Each FileWritten event
566            // stores the exact basename `state files` shows.
567            for e in &files {
568                if let RunEvent::FileWritten {
569                    file_name,
570                    rows,
571                    bytes,
572                    ..
573                } = &e.event
574                {
575                    println!(
576                        "    - {}  ({} rows, {})",
577                        file_name,
578                        rows,
579                        format_bytes(*bytes),
580                    );
581                }
582            }
583        }
584
585        let retries = journal.retries();
586        if !retries.is_empty() {
587            println!("  retries: {}", retries.len());
588        }
589
590        for e in journal.quality_issues() {
591            if let RunEvent::QualityIssue { severity, message } = &e.event {
592                println!("  quality [{severity}]: {message}");
593            }
594        }
595
596        for e in journal.schema_changes() {
597            if let RunEvent::SchemaChanged {
598                added,
599                removed,
600                type_changed,
601            } = &e.event
602            {
603                if !added.is_empty() {
604                    println!("  schema: +{}", added.join(", +"));
605                }
606                if !removed.is_empty() {
607                    println!("  schema: -{}", removed.join(", -"));
608                }
609                for (col, old, new) in type_changed {
610                    println!("  schema: {col} {old}→{new}");
611                }
612            }
613        }
614
615        if let Some(e) = journal.final_outcome()
616            && let RunEvent::RunCompleted {
617                error_message: Some(err),
618                ..
619            } = &e.event
620        {
621            let first_line = err.lines().next().unwrap_or(err);
622            println!("  error:  {first_line}");
623        }
624    }
625    println!();
626    Ok(())
627}
628
629#[cfg(test)]
630mod tests {
631    use super::*;
632    use crate::journal::{RunEvent, RunJournal};
633
634    // ── helpers ──────────────────────────────────────────────────────────────
635
636    fn setup_dir() -> (tempfile::TempDir, String) {
637        let dir = tempfile::TempDir::new().unwrap();
638        // Inspect/reset commands now validate the --config path up front
639        // (findings #9/#23): they `Config::load` before opening the state DB,
640        // so the file must exist and parse. Write a valid two-export config
641        // (orders + transactions — the names the show/reset tests use) so the
642        // tests exercise the post-validation display/reset path, not the
643        // config-not-found bail.
644        let config_path = dir.path().join("rivet.yaml").to_str().unwrap().to_string();
645        write_two_export_config(&config_path);
646        (dir, config_path)
647    }
648
649    fn write_two_export_config(config_path: &str) {
650        std::fs::write(
651            config_path,
652            br#"source:
653  type: postgres
654  url: postgresql://localhost/testdb
655exports:
656  - name: transactions
657    query: "SELECT 1"
658    mode: full
659    format: parquet
660    destination:
661      type: local
662      path: ./out
663  - name: orders
664    query: "SELECT 1"
665    mode: full
666    format: parquet
667    destination:
668      type: local
669      path: ./out
670"#,
671        )
672        .unwrap();
673    }
674
675    fn write_single_export_config(config_path: &str) {
676        std::fs::write(
677            config_path,
678            br#"source:
679  type: postgres
680  url: postgresql://localhost/testdb
681exports:
682  - name: transactions
683    query: "SELECT 1"
684    mode: full
685    format: parquet
686    destination:
687      type: local
688      path: ./out
689"#,
690        )
691        .unwrap();
692    }
693
694    fn open_state(dir: &tempfile::TempDir) -> StateStore {
695        let db_path = dir.path().join(".rivet_state.db");
696        StateStore::open_at_path(&db_path).unwrap()
697    }
698
699    fn make_journal(run_id: &str, export: &str) -> RunJournal {
700        let mut j = RunJournal::new(run_id, export);
701        j.record(RunEvent::FileWritten {
702            file_name: "part0.parquet".into(),
703            rows: 1_000,
704            bytes: 65_536,
705            part_index: 0,
706        });
707        j.record(RunEvent::RunCompleted {
708            status: "success".into(),
709            error_message: None,
710            duration_ms: 1_500,
711        });
712        j
713    }
714
715    // ── boundary_value ───────────────────────────────────────────────────────
716
717    fn make_boundary(cursor: Option<&str>, chunk_index: Option<i64>) -> crate::state::Boundary {
718        crate::state::Boundary {
719            strategy: "incremental".into(),
720            run_id: None,
721            cursor: cursor.map(|s| s.to_string()),
722            chunk_index,
723            at: chrono::Utc::now(),
724        }
725    }
726
727    #[test]
728    fn boundary_value_cursor_takes_precedence_over_chunk_index() {
729        let b = make_boundary(Some("2025-01-15"), Some(42));
730        assert_eq!(boundary_value(&b), "2025-01-15");
731    }
732
733    #[test]
734    fn boundary_value_chunk_index_used_when_no_cursor() {
735        let b = make_boundary(None, Some(7));
736        assert_eq!(boundary_value(&b), "chunk #7");
737    }
738
739    #[test]
740    fn boundary_value_dash_when_neither_set() {
741        let b = make_boundary(None, None);
742        assert_eq!(boundary_value(&b), "-");
743    }
744
745    // ── state files: RUN ID column width (finding L16) ───────────────────────
746
747    // run_ids are `{export}_{%Y%m%dT%H%M%S%3f}` — the timestamp suffix alone is
748    // 18 chars, so even a short export name yields ~30-char ids and longer ones
749    // reach ~40. The RUN ID column must be wide enough that a 40-char id is not
750    // padded *past* the FILE column (which would shove FILE right and break
751    // every subsequent row's alignment vs. the header). A column of 35 wrapped;
752    // this pins the widened layout: a 40-char id is followed by exactly one
753    // space then FILE, identical to the header's spacing.
754    #[test]
755    fn state_files_run_id_column_fits_a_40_char_run_id() {
756        let run_id = "transactions_historyy_20250115T143022999"; // 40 chars
757        assert_eq!(run_id.len(), 40, "fixture must be a realistic 40-char id");
758
759        let header = format!(
760            "{:<40} {:<40} {:>8} {:>10} CREATED",
761            "RUN ID", "FILE", "ROWS", "BYTES"
762        );
763        let row = format!(
764            "{:<40} {:<40} {:>8} {:>10} {}",
765            run_id, "orders_001.parquet", 50_000, "4.0KB", "2025-01-15",
766        );
767
768        // The FILE column header and the row's file value must start at the
769        // same byte offset — the alignment invariant a too-narrow column breaks.
770        let header_file_at = header.find("FILE").unwrap();
771        let row_file_at = row.find("orders_001.parquet").unwrap();
772        assert_eq!(
773            header_file_at, row_file_at,
774            "a 40-char RUN ID must not push the FILE column out of alignment\nheader: {header}\nrow:    {row}"
775        );
776        // And the run_id is rendered in full (not truncated).
777        assert!(
778            row.starts_with(run_id),
779            "run_id must not be truncated: {row}"
780        );
781    }
782
783    // ── show_state ───────────────────────────────────────────────────────────
784
785    // Finding L17: the empty-cursor / never-run hint pointed at `rivet state`,
786    // which is a bare subcommand group and errors without a leaf. The fix points
787    // at the real command `rivet state show`. Pin the exact hint string the
788    // never-run branch builds.
789    #[test]
790    fn show_state_never_run_hint_points_at_state_show() {
791        let config_path = "rivet.yaml";
792        let hint = format!(
793            "No exports have been run yet.\n  \
794             Run `rivet run --config {}` first, then try `rivet state show` again.",
795            config_path
796        );
797        assert!(
798            hint.contains("try `rivet state show` again"),
799            "hint must name the runnable leaf command, not the bare group: {hint}"
800        );
801        assert!(
802            !hint.contains("try `rivet state` again"),
803            "must not point at the bare (subcommand-requiring) group: {hint}"
804        );
805    }
806
807    // ── metrics / journal: unknown-export guard (finding L18) ────────────────
808
809    // `metrics --export <typo>` used to print "No metrics recorded yet." —
810    // indistinguishable from a declared-but-unrun export. With the config now
811    // loaded, an undeclared name must bail naming the known exports.
812    #[test]
813    fn show_metrics_unknown_export_bails_with_known_names() {
814        let (dir, config_path) = setup_dir(); // declares orders + transactions
815        let _ = open_state(&dir);
816        let err = show_metrics(&config_path, Some("ghost"), 10, false).unwrap_err();
817        let msg = format!("{err:#}");
818        assert!(
819            msg.contains("export 'ghost' is not defined"),
820            "must name the unknown export, not say 'No metrics recorded yet': {msg}"
821        );
822        assert!(
823            msg.contains("orders") && msg.contains("transactions"),
824            "must list the declared exports so the user can spot the typo: {msg}"
825        );
826    }
827
828    // A *declared* export with no recorded runs must still reach the normal
829    // empty-state path (proves the guard does not over-reject).
830    #[test]
831    fn show_metrics_known_but_unrun_export_returns_ok() {
832        let (dir, config_path) = setup_dir();
833        let _ = open_state(&dir);
834        assert!(show_metrics(&config_path, Some("orders"), 10, false).is_ok());
835    }
836
837    // Same gap on `journal` when querying by export name (no --run-id).
838    #[test]
839    fn show_journal_unknown_export_bails_with_known_names() {
840        let (dir, config_path) = setup_dir();
841        let _ = open_state(&dir);
842        let err = show_journal(&config_path, "ghost", 5, None).unwrap_err();
843        let msg = format!("{err:#}");
844        assert!(
845            msg.contains("export 'ghost' is not defined"),
846            "must name the unknown export, not say 'No journal entries … yet': {msg}"
847        );
848        assert!(
849            msg.contains("orders") && msg.contains("transactions"),
850            "must list the declared exports: {msg}"
851        );
852    }
853
854    // Querying by --run-id looks up by id, not export name, so an unfamiliar
855    // export name must NOT be rejected there (the id is the lookup key).
856    #[test]
857    fn show_journal_by_run_id_skips_export_name_check() {
858        let (dir, config_path) = setup_dir();
859        let _ = open_state(&dir);
860        assert!(show_journal(&config_path, "ghost", 5, Some("no_such_run")).is_ok());
861    }
862
863    #[test]
864    fn show_state_empty_db_returns_ok() {
865        let (dir, config_path) = setup_dir();
866        let _ = open_state(&dir); // create the DB file
867        assert!(show_state(&config_path, false).is_ok());
868    }
869
870    #[test]
871    fn show_state_with_cursor_record_returns_ok() {
872        let (dir, config_path) = setup_dir();
873        let state = open_state(&dir);
874        state.update("orders", "2025-01-15").unwrap();
875        drop(state);
876        assert!(show_state(&config_path, false).is_ok());
877    }
878
879    // ── show_files ───────────────────────────────────────────────────────────
880
881    #[test]
882    fn show_files_empty_returns_ok() {
883        let (dir, config_path) = setup_dir();
884        let _ = open_state(&dir);
885        assert!(show_files(&config_path, None, 10, false).is_ok());
886    }
887
888    #[test]
889    fn show_files_with_record_returns_ok() {
890        let (dir, config_path) = setup_dir();
891        let state = open_state(&dir);
892        state
893            .record_file(
894                "r1",
895                "orders",
896                "orders_001.parquet",
897                50_000,
898                4096,
899                "parquet",
900                Some("zstd"),
901            )
902            .unwrap();
903        drop(state);
904        assert!(show_files(&config_path, Some("orders"), 10, false).is_ok());
905    }
906
907    // ── show_metrics ─────────────────────────────────────────────────────────
908
909    #[test]
910    fn show_metrics_empty_returns_ok() {
911        let (dir, config_path) = setup_dir();
912        let _ = open_state(&dir);
913        assert!(show_metrics(&config_path, None, 10, false).is_ok());
914    }
915
916    #[test]
917    fn show_metrics_exercises_flag_and_duration_paths() {
918        // Covers retries / validated / schema_changed flag lines and
919        // the ms-vs-seconds duration branch in the formatter.
920        let (dir, config_path) = setup_dir();
921        let state = open_state(&dir);
922        // ≥1000 ms → "X.Xs" branch; retries + validated + schema_changed flags
923        state
924            .record_metric(
925                "orders",
926                "r1",
927                1_500,
928                50_000,
929                Some(42),
930                "success",
931                None,
932                Some("balanced"),
933                Some("parquet"),
934                Some("full"),
935                1,
936                4096,
937                3,
938                Some(true),
939                Some(true),
940            )
941            .unwrap();
942        // <1000 ms → "Xms" branch; error_message line
943        state
944            .record_metric(
945                "orders",
946                "r2",
947                800,
948                0,
949                None,
950                "failed",
951                Some("timeout"),
952                None,
953                None,
954                None,
955                0,
956                0,
957                0,
958                Some(false),
959                None,
960            )
961            .unwrap();
962        drop(state);
963        assert!(show_metrics(&config_path, Some("orders"), 10, false).is_ok());
964    }
965
966    // ── show_journal ─────────────────────────────────────────────────────────
967
968    #[test]
969    fn show_journal_empty_returns_ok() {
970        let (dir, config_path) = setup_dir();
971        let _ = open_state(&dir);
972        assert!(show_journal(&config_path, "orders", 5, None).is_ok());
973    }
974
975    #[test]
976    fn show_journal_with_entry_returns_ok() {
977        let (dir, config_path) = setup_dir();
978        let state = open_state(&dir);
979        state
980            .store_journal(&make_journal("run_001", "orders"))
981            .unwrap();
982        drop(state);
983        assert!(show_journal(&config_path, "orders", 5, None).is_ok());
984    }
985
986    #[test]
987    fn show_journal_by_run_id_not_found_returns_ok() {
988        let (dir, config_path) = setup_dir();
989        let _ = open_state(&dir);
990        assert!(show_journal(&config_path, "orders", 5, Some("no_such_run")).is_ok());
991    }
992
993    #[test]
994    fn show_journal_by_run_id_found_returns_ok() {
995        let (dir, config_path) = setup_dir();
996        let state = open_state(&dir);
997        state
998            .store_journal(&make_journal("run_xyz", "orders"))
999            .unwrap();
1000        drop(state);
1001        assert!(show_journal(&config_path, "orders", 5, Some("run_xyz")).is_ok());
1002    }
1003
1004    // ── reset_state ──────────────────────────────────────────────────────────
1005
1006    #[test]
1007    fn reset_state_returns_ok() {
1008        let (dir, config_path) = setup_dir();
1009        write_two_export_config(&config_path);
1010        let state = open_state(&dir);
1011        state.update("orders", "100").unwrap();
1012        drop(state);
1013        assert!(reset_state(&config_path, "orders").is_ok());
1014    }
1015
1016    // F-NEW (0.7.7 audit): `state reset` on an export that is not declared
1017    // in the config used to silently succeed (DELETE WHERE export_name = X
1018    // affects 0 rows; "State reset for export 'X'" printed; rc=0). A typo'd
1019    // `--export pa_audi` looked like success but did nothing. This pins the
1020    // hint-emitting bail.
1021    #[test]
1022    fn reset_state_unknown_export_bails_with_hint() {
1023        let (_dir, config_path) = setup_dir();
1024        write_two_export_config(&config_path);
1025        let err = reset_state(&config_path, "ghost").unwrap_err();
1026        let msg = format!("{err:#}");
1027        assert!(
1028            msg.contains("export 'ghost' not found"),
1029            "must name the missing export: {msg}"
1030        );
1031        assert!(
1032            msg.contains("orders") && msg.contains("transactions"),
1033            "must list the declared exports so the user can spot the typo: {msg}"
1034        );
1035        assert!(
1036            msg.contains("rivet state show"),
1037            "must point at a follow-up command: {msg}"
1038        );
1039    }
1040
1041    // ── reset_chunk_checkpoint ───────────────────────────────────────────────
1042
1043    #[test]
1044    fn reset_chunk_checkpoint_on_empty_db_returns_ok() {
1045        let (dir, config_path) = setup_dir();
1046        let _ = open_state(&dir);
1047        assert!(reset_chunk_checkpoint(&config_path, "orders").is_ok());
1048    }
1049
1050    // #21 (0.9.x audit): `state reset-chunks -e <typo>` used to "Removed 0 …"
1051    // rc=0 with no export-name guardrail. Parity with `reset_state`: an unknown
1052    // name must bail with a hint that names the export and lists the declared
1053    // ones so the operator can spot the typo.
1054    #[test]
1055    fn reset_chunk_checkpoint_unknown_export_bails_with_hint() {
1056        let (_dir, config_path) = setup_dir(); // declares orders + transactions
1057        let err = reset_chunk_checkpoint(&config_path, "ghost").unwrap_err();
1058        let msg = format!("{err:#}");
1059        assert!(
1060            msg.contains("export 'ghost' not found"),
1061            "must name the missing export: {msg}"
1062        );
1063        assert!(
1064            msg.contains("orders") && msg.contains("transactions"),
1065            "must list the declared exports so the user can spot the typo: {msg}"
1066        );
1067    }
1068
1069    // #9/#23 (0.9.x audit): inspect/reset commands used --config ONLY to locate
1070    // the state DB next to it, never parsing it — so a nonexistent path opened a
1071    // fresh empty DB and exited 0 with "No … recorded yet" (and littered a
1072    // .rivet_state.db). They must bail naming the bad path instead.
1073    #[test]
1074    fn inspect_commands_bail_on_nonexistent_config() {
1075        let dir = tempfile::TempDir::new().unwrap();
1076        let missing = dir
1077            .path()
1078            .join("does_not_exist.yaml")
1079            .to_str()
1080            .unwrap()
1081            .to_string();
1082
1083        for res in [
1084            show_state(&missing, false),
1085            show_files(&missing, None, 10, false),
1086            show_metrics(&missing, None, 10, false),
1087            show_progression(&missing, None),
1088            show_journal(&missing, "orders", 5, None),
1089            show_chunk_checkpoint(&missing, "orders", false),
1090            reset_state(&missing, "orders"),
1091            reset_chunk_checkpoint(&missing, "orders"),
1092        ] {
1093            let err = res.expect_err("nonexistent config must error, not exit Ok");
1094            assert!(
1095                format!("{err:#}").contains("does_not_exist.yaml"),
1096                "error must name the missing config path: {err:#}"
1097            );
1098        }
1099
1100        // And the read-only inspect must NOT materialize a state DB beside the
1101        // phantom config (validation happens before StateStore::open).
1102        assert!(
1103            !dir.path().join(".rivet_state.db").exists(),
1104            "a bad-config inspect must not leak a fresh .rivet_state.db"
1105        );
1106    }
1107
1108    #[test]
1109    fn reset_chunk_checkpoints_stuck_no_rows_returns_ok() {
1110        let (dir, config_path) = setup_dir();
1111        write_two_export_config(&config_path);
1112        let _ = open_state(&dir);
1113        assert!(reset_chunk_checkpoints_stuck(&config_path).is_ok());
1114    }
1115
1116    #[test]
1117    fn reset_chunk_checkpoints_stuck_clears_matching_exports_only() {
1118        let (dir, config_path) = setup_dir();
1119        write_two_export_config(&config_path);
1120        let state = open_state(&dir);
1121        state
1122            .create_chunk_run("r_tx", "transactions", "plan", 3)
1123            .unwrap();
1124        state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
1125        drop(state);
1126
1127        reset_chunk_checkpoints_stuck(&config_path).unwrap();
1128
1129        let state = StateStore::open(&config_path).unwrap();
1130        assert!(
1131            state
1132                .find_in_progress_chunk_run("transactions")
1133                .unwrap()
1134                .is_none()
1135        );
1136        assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
1137        assert_eq!(
1138            state.reset_chunk_checkpoint("ghost").unwrap(),
1139            1,
1140            "cleanup ghost row"
1141        );
1142    }
1143
1144    #[test]
1145    fn reset_chunk_checkpoints_stuck_skips_when_only_unknown_exports_stuck() {
1146        let (dir, config_path) = setup_dir();
1147        write_single_export_config(&config_path);
1148        let state = open_state(&dir);
1149        state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
1150        drop(state);
1151
1152        reset_chunk_checkpoints_stuck(&config_path).unwrap();
1153
1154        let state = StateStore::open(&config_path).unwrap();
1155        assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
1156        assert_eq!(state.reset_chunk_checkpoint("ghost").unwrap(), 1);
1157    }
1158
1159    // ── show_progression ─────────────────────────────────────────────────────
1160
1161    #[test]
1162    fn show_progression_empty_returns_ok() {
1163        let (dir, config_path) = setup_dir();
1164        let _ = open_state(&dir);
1165        assert!(show_progression(&config_path, None).is_ok());
1166    }
1167
1168    #[test]
1169    fn show_progression_with_incremental_boundary_returns_ok() {
1170        let (dir, config_path) = setup_dir();
1171        let state = open_state(&dir);
1172        state
1173            .record_committed_incremental("orders", "2025-06-01", "run_001")
1174            .unwrap();
1175        drop(state);
1176        assert!(show_progression(&config_path, Some("orders")).is_ok());
1177    }
1178
1179    // ── show_chunk_checkpoint ────────────────────────────────────────────────
1180
1181    #[test]
1182    fn show_chunk_checkpoint_no_data_returns_ok() {
1183        let (dir, config_path) = setup_dir();
1184        let _ = open_state(&dir);
1185        assert!(show_chunk_checkpoint(&config_path, "orders", false).is_ok());
1186    }
1187}