Skip to main content

rivet/pipeline/
cli.rs

1//! **Layer: Observability**
2//!
3//! CLI display commands for state, metrics, files, chunk checkpoints, and journal history.
4//! Reads from the state stores and formats output — no execution or persistence writes.
5
6use super::format_bytes;
7use crate::config::Config;
8use crate::error::Result;
9use crate::journal::RunEvent;
10use crate::state::StateStore;
11
12pub fn show_state(config_path: &str) -> Result<()> {
13    let state = StateStore::open(config_path)?;
14    let states = state.list_all()?;
15    if states.is_empty() {
16        println!("No export state recorded yet.");
17        return Ok(());
18    }
19    println!("{:<30} {:<40} LAST RUN", "EXPORT", "LAST CURSOR");
20    println!("{}", "-".repeat(90));
21    for s in &states {
22        println!(
23            "{:<30} {:<40} {}",
24            s.export_name,
25            s.last_cursor_value.as_deref().unwrap_or("-"),
26            s.last_run_at.as_deref().unwrap_or("-"),
27        );
28    }
29    Ok(())
30}
31
32/// Epic G / ADR-0008 — explicit committed / verified boundaries per export.
33pub fn show_progression(config_path: &str, export_name: Option<&str>) -> Result<()> {
34    let state = StateStore::open(config_path)?;
35    let entries = match export_name {
36        Some(name) => vec![state.get_progression(name)?],
37        None => state.list_progression()?,
38    };
39    let has_any = entries
40        .iter()
41        .any(|p| p.committed.is_some() || p.verified.is_some());
42    if !has_any {
43        println!("No progression boundaries recorded yet.");
44        return Ok(());
45    }
46
47    println!(
48        "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
49        "EXPORT", "COMM MODE", "COMMITTED", "COMMITTED AT", "VERI MODE", "VERIFIED"
50    );
51    println!("{}", "-".repeat(145));
52    for p in &entries {
53        let (c_mode, c_val, c_at) = match &p.committed {
54            Some(b) => (
55                b.strategy.as_str().to_string(),
56                boundary_value(b),
57                b.at.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
58            ),
59            None => ("-".into(), "-".into(), "-".into()),
60        };
61        let (v_mode, v_val) = match &p.verified {
62            Some(b) => (b.strategy.as_str().to_string(), boundary_value(b)),
63            None => ("-".into(), "-".into()),
64        };
65        println!(
66            "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
67            p.export_name, c_mode, c_val, c_at, v_mode, v_val
68        );
69    }
70    Ok(())
71}
72
73fn boundary_value(b: &crate::state::Boundary) -> String {
74    if let Some(c) = &b.cursor {
75        c.clone()
76    } else if let Some(idx) = b.chunk_index {
77        format!("chunk #{idx}")
78    } else {
79        "-".into()
80    }
81}
82
83pub fn reset_state(config_path: &str, export_name: &str) -> Result<()> {
84    // Validate the export name against the config BEFORE touching state, so a
85    // typo (`--export pa_audi` instead of `pa_audit`) produces a hint with the
86    // declared names instead of silently DELETE-ing zero rows and printing
87    // "State reset for export 'pa_audi'" as if it worked.  Without this guard
88    // a stray `--export <unknown>` looked like success but did nothing — a
89    // genuine ops footgun under "I reset it, why did the cursor not move".
90    let config = crate::config::Config::load(config_path)?;
91    if !config.exports.iter().any(|e| e.name == export_name) {
92        let known: Vec<String> = config.exports.iter().map(|e| e.name.clone()).collect();
93        anyhow::bail!(
94            "export '{}' not found in config '{}'.\n  Known exports: {}\n  Hint: check the spelling, or run `rivet state show -c {}` to see what is currently tracked.",
95            export_name,
96            config_path,
97            if known.is_empty() {
98                "(none defined)".to_string()
99            } else {
100                known.join(", ")
101            },
102            config_path,
103        );
104    }
105    let state = StateStore::open(config_path)?;
106    state.reset(export_name)?;
107    println!("State reset for export '{}'", export_name);
108    Ok(())
109}
110
111pub fn show_files(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
112    let state = StateStore::open(config_path)?;
113    let files = state.get_files(export_name, limit)?;
114    if files.is_empty() {
115        println!("No files recorded yet.");
116        return Ok(());
117    }
118    println!(
119        "{:<35} {:<40} {:>8} {:>10} CREATED",
120        "RUN ID", "FILE", "ROWS", "BYTES"
121    );
122    println!("{}", "-".repeat(110));
123    for f in &files {
124        println!(
125            "{:<35} {:<40} {:>8} {:>10} {}",
126            f.run_id,
127            f.file_name,
128            f.row_count,
129            format_bytes(f.bytes as u64),
130            f.created_at,
131        );
132    }
133    Ok(())
134}
135
136pub fn show_metrics(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
137    let state = StateStore::open(config_path)?;
138    let metrics = state.get_metrics(export_name, limit)?;
139    if metrics.is_empty() {
140        println!("No metrics recorded yet.");
141        return Ok(());
142    }
143    println!(
144        "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} RUN ID",
145        "EXPORT", "STATUS", "ROWS", "DURATION", "RSS", "FILES", "BYTES"
146    );
147    println!("{}", "-".repeat(110));
148    for m in &metrics {
149        let duration = if m.duration_ms >= 1000 {
150            format!("{:.1}s", m.duration_ms as f64 / 1000.0)
151        } else {
152            format!("{}ms", m.duration_ms)
153        };
154        let rss = m
155            .peak_rss_mb
156            .map(|r| format!("{}MB", r))
157            .unwrap_or_else(|| "-".into());
158        let bytes = if m.bytes_written > 0 {
159            format_bytes(m.bytes_written as u64)
160        } else {
161            "-".into()
162        };
163        let run_id = m.run_id.as_deref().unwrap_or(&m.run_at);
164        println!(
165            "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} {}",
166            m.export_name, m.status, m.total_rows, duration, rss, m.files_produced, bytes, run_id
167        );
168        if let Some(err) = &m.error_message {
169            println!("  Error: {}", err);
170        }
171        let mut flags = Vec::new();
172        if m.retries > 0 {
173            flags.push(format!("retries={}", m.retries));
174        }
175        if let Some(v) = m.validated {
176            flags.push(format!("validated={}", if v { "pass" } else { "FAIL" }));
177        }
178        if let Some(sc) = m.schema_changed {
179            flags.push(format!("schema={}", if sc { "CHANGED" } else { "ok" }));
180        }
181        if !flags.is_empty() {
182            println!("  {}", flags.join("  "));
183        }
184    }
185    Ok(())
186}
187
188pub fn reset_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
189    let state = StateStore::open(config_path)?;
190    let n = state.reset_chunk_checkpoint(export_name)?;
191    println!(
192        "Removed {} chunk run record(s) for export '{}'.",
193        n, export_name
194    );
195    Ok(())
196}
197
198/// Clear chunk checkpoints for every export **named in `config_path`'s YAML** that currently has an
199/// `in_progress` chunk run (`chunk_run.status = 'in_progress'`).
200///
201/// Names present only in state (removed from config) are skipped with a printed note.
202pub fn reset_chunk_checkpoints_stuck(config_path: &str) -> Result<()> {
203    let cfg = Config::load(config_path)?;
204    let allowed: std::collections::HashSet<&str> =
205        cfg.exports.iter().map(|e| e.name.as_str()).collect();
206    let state = StateStore::open(config_path)?;
207    let stuck = state.list_export_names_with_in_progress_chunk_runs()?;
208    if stuck.is_empty() {
209        println!("No exports have an in-progress chunk checkpoint run.");
210        println!(
211            "(Nothing with chunk_run.status = 'in_progress' in {}.)",
212            StateStore::state_db_path(config_path).display()
213        );
214        return Ok(());
215    }
216
217    let mut skipped_not_in_config = Vec::new();
218    let mut targets = Vec::new();
219    for name in stuck {
220        if allowed.contains(name.as_str()) {
221            targets.push(name);
222        } else {
223            skipped_not_in_config.push(name);
224        }
225    }
226
227    for name in &skipped_not_in_config {
228        println!(
229            "Skipping '{}' — chunk checkpoint still in_progress but this export is not in the config.",
230            name
231        );
232    }
233
234    if targets.is_empty() {
235        println!(
236            "No matching exports to reset (none of the in-progress runs belong to exports in this config)."
237        );
238        return Ok(());
239    }
240
241    println!(
242        "Resetting chunk checkpoints for {} export(s) with in_progress runs: {}",
243        targets.len(),
244        targets.join(", ")
245    );
246
247    for name in targets {
248        let n = state.reset_chunk_checkpoint(&name)?;
249        println!("Removed {} chunk run record(s) for export '{}'.", n, name);
250    }
251    Ok(())
252}
253
254pub fn show_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
255    let state = StateStore::open(config_path)?;
256    println!(
257        "database:   {}",
258        StateStore::state_db_path(config_path).display()
259    );
260    let Some((run_id, plan_hash, status, updated_at)) = state.get_latest_chunk_run(export_name)?
261    else {
262        println!("No chunk checkpoint data for export '{}'.", export_name);
263        return Ok(());
264    };
265    println!("export:     {}", export_name);
266    println!("run_id:     {}", run_id);
267    println!("plan_hash:  {}", plan_hash);
268    println!("status:     {}", status);
269    println!("updated_at: {}", updated_at);
270    println!();
271    println!(
272        "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} FILE",
273        "IDX", "STATUS", "START", "END", "ATT", "ROWS"
274    );
275    println!("{}", "-".repeat(90));
276    for t in state.list_chunk_tasks_for_run(&run_id)? {
277        let file = t.file_name.as_deref().unwrap_or("-");
278        let rows = t
279            .rows_written
280            .map(|r| r.to_string())
281            .unwrap_or_else(|| "-".into());
282        println!(
283            "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} {}",
284            t.chunk_index, t.status, t.start_key, t.end_key, t.attempts, rows, file
285        );
286        if let Some(e) = &t.last_error {
287            println!("       error: {}", e);
288        }
289    }
290    Ok(())
291}
292
293/// Display recent run journal entries for an export.
294///
295/// Shows up to `limit` most recent runs (newest first), each as a compact
296/// block: run header + per-event summary lines.
297pub fn show_journal(
298    config_path: &str,
299    export_name: &str,
300    limit: usize,
301    run_id: Option<&str>,
302) -> Result<()> {
303    let state = StateStore::open(config_path)?;
304
305    let journals = if let Some(rid) = run_id {
306        match state.load_journal(rid)? {
307            Some(j) => vec![j],
308            None => {
309                println!("No journal found for run_id '{rid}'.");
310                return Ok(());
311            }
312        }
313    } else {
314        state.recent_journals(export_name, limit)?
315    };
316
317    if journals.is_empty() {
318        println!("No journal entries for export '{export_name}' yet.");
319        println!("Journals are recorded after each `rivet run`.");
320        return Ok(());
321    }
322
323    for journal in &journals {
324        // ── run header ────────────────────────────────────────────────────
325        let outcome = journal.final_outcome().and_then(|e| {
326            if let RunEvent::RunCompleted {
327                status,
328                duration_ms,
329                ..
330            } = &e.event
331            {
332                Some((status.as_str(), *duration_ms))
333            } else {
334                None
335            }
336        });
337        let (status_str, duration_str) = match outcome {
338            Some((s, ms)) if ms >= 1000 => (s, format!("{:.1}s", ms as f64 / 1000.0)),
339            Some((s, ms)) => (s, format!("{ms}ms")),
340            None => ("(incomplete)", String::new()),
341        };
342        let icon = match status_str {
343            "success" => "✓",
344            "failed" => "✗",
345            _ => "•",
346        };
347        println!(
348            "\n{icon} {export}  {status}  {dur}",
349            export = journal.export_name,
350            status = status_str,
351            dur = duration_str,
352        );
353        println!("  run_id: {}", journal.run_id);
354
355        // ── event summary lines ────────────────────────────────────────────
356        let files = journal.files();
357        if !files.is_empty() {
358            let total_rows: i64 = files
359                .iter()
360                .filter_map(|e| {
361                    if let RunEvent::FileWritten { rows, .. } = &e.event {
362                        Some(*rows)
363                    } else {
364                        None
365                    }
366                })
367                .sum();
368            let total_bytes: u64 = files
369                .iter()
370                .filter_map(|e| {
371                    if let RunEvent::FileWritten { bytes, .. } = &e.event {
372                        Some(*bytes)
373                    } else {
374                        None
375                    }
376                })
377                .sum();
378            println!(
379                "  files:  {}  rows: {}  size: {}",
380                files.len(),
381                total_rows,
382                format_bytes(total_bytes),
383            );
384        }
385
386        let retries = journal.retries();
387        if !retries.is_empty() {
388            println!("  retries: {}", retries.len());
389        }
390
391        for e in journal.quality_issues() {
392            if let RunEvent::QualityIssue { severity, message } = &e.event {
393                println!("  quality [{severity}]: {message}");
394            }
395        }
396
397        for e in journal.schema_changes() {
398            if let RunEvent::SchemaChanged {
399                added,
400                removed,
401                type_changed,
402            } = &e.event
403            {
404                if !added.is_empty() {
405                    println!("  schema: +{}", added.join(", +"));
406                }
407                if !removed.is_empty() {
408                    println!("  schema: -{}", removed.join(", -"));
409                }
410                for (col, old, new) in type_changed {
411                    println!("  schema: {col} {old}→{new}");
412                }
413            }
414        }
415
416        if let Some(e) = journal.final_outcome()
417            && let RunEvent::RunCompleted {
418                error_message: Some(err),
419                ..
420            } = &e.event
421        {
422            let first_line = err.lines().next().unwrap_or(err);
423            println!("  error:  {first_line}");
424        }
425    }
426    println!();
427    Ok(())
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use crate::journal::{RunEvent, RunJournal};
434
435    // ── helpers ──────────────────────────────────────────────────────────────
436
437    fn setup_dir() -> (tempfile::TempDir, String) {
438        let dir = tempfile::TempDir::new().unwrap();
439        // config file itself does not need to exist; StateStore::open uses its parent dir
440        let config_path = dir.path().join("rivet.yaml").to_str().unwrap().to_string();
441        (dir, config_path)
442    }
443
444    fn write_two_export_config(config_path: &str) {
445        std::fs::write(
446            config_path,
447            br#"source:
448  type: postgres
449  url: postgresql://localhost/testdb
450exports:
451  - name: transactions
452    query: "SELECT 1"
453    mode: full
454    format: parquet
455    destination:
456      type: local
457      path: ./out
458  - name: orders
459    query: "SELECT 1"
460    mode: full
461    format: parquet
462    destination:
463      type: local
464      path: ./out
465"#,
466        )
467        .unwrap();
468    }
469
470    fn write_single_export_config(config_path: &str) {
471        std::fs::write(
472            config_path,
473            br#"source:
474  type: postgres
475  url: postgresql://localhost/testdb
476exports:
477  - name: transactions
478    query: "SELECT 1"
479    mode: full
480    format: parquet
481    destination:
482      type: local
483      path: ./out
484"#,
485        )
486        .unwrap();
487    }
488
489    fn open_state(dir: &tempfile::TempDir) -> StateStore {
490        let db_path = dir.path().join(".rivet_state.db");
491        StateStore::open_at_path(&db_path).unwrap()
492    }
493
494    fn make_journal(run_id: &str, export: &str) -> RunJournal {
495        let mut j = RunJournal::new(run_id, export);
496        j.record(RunEvent::FileWritten {
497            file_name: "part0.parquet".into(),
498            rows: 1_000,
499            bytes: 65_536,
500            part_index: 0,
501        });
502        j.record(RunEvent::RunCompleted {
503            status: "success".into(),
504            error_message: None,
505            duration_ms: 1_500,
506        });
507        j
508    }
509
510    // ── boundary_value ───────────────────────────────────────────────────────
511
512    fn make_boundary(cursor: Option<&str>, chunk_index: Option<i64>) -> crate::state::Boundary {
513        crate::state::Boundary {
514            strategy: "incremental".into(),
515            run_id: None,
516            cursor: cursor.map(|s| s.to_string()),
517            chunk_index,
518            at: chrono::Utc::now(),
519        }
520    }
521
522    #[test]
523    fn boundary_value_cursor_takes_precedence_over_chunk_index() {
524        let b = make_boundary(Some("2025-01-15"), Some(42));
525        assert_eq!(boundary_value(&b), "2025-01-15");
526    }
527
528    #[test]
529    fn boundary_value_chunk_index_used_when_no_cursor() {
530        let b = make_boundary(None, Some(7));
531        assert_eq!(boundary_value(&b), "chunk #7");
532    }
533
534    #[test]
535    fn boundary_value_dash_when_neither_set() {
536        let b = make_boundary(None, None);
537        assert_eq!(boundary_value(&b), "-");
538    }
539
540    // ── show_state ───────────────────────────────────────────────────────────
541
542    #[test]
543    fn show_state_empty_db_returns_ok() {
544        let (dir, config_path) = setup_dir();
545        let _ = open_state(&dir); // create the DB file
546        assert!(show_state(&config_path).is_ok());
547    }
548
549    #[test]
550    fn show_state_with_cursor_record_returns_ok() {
551        let (dir, config_path) = setup_dir();
552        let state = open_state(&dir);
553        state.update("orders", "2025-01-15").unwrap();
554        drop(state);
555        assert!(show_state(&config_path).is_ok());
556    }
557
558    // ── show_files ───────────────────────────────────────────────────────────
559
560    #[test]
561    fn show_files_empty_returns_ok() {
562        let (dir, config_path) = setup_dir();
563        let _ = open_state(&dir);
564        assert!(show_files(&config_path, None, 10).is_ok());
565    }
566
567    #[test]
568    fn show_files_with_record_returns_ok() {
569        let (dir, config_path) = setup_dir();
570        let state = open_state(&dir);
571        state
572            .record_file(
573                "r1",
574                "orders",
575                "orders_001.parquet",
576                50_000,
577                4096,
578                "parquet",
579                Some("zstd"),
580            )
581            .unwrap();
582        drop(state);
583        assert!(show_files(&config_path, Some("orders"), 10).is_ok());
584    }
585
586    // ── show_metrics ─────────────────────────────────────────────────────────
587
588    #[test]
589    fn show_metrics_empty_returns_ok() {
590        let (dir, config_path) = setup_dir();
591        let _ = open_state(&dir);
592        assert!(show_metrics(&config_path, None, 10).is_ok());
593    }
594
595    #[test]
596    fn show_metrics_exercises_flag_and_duration_paths() {
597        // Covers retries / validated / schema_changed flag lines and
598        // the ms-vs-seconds duration branch in the formatter.
599        let (dir, config_path) = setup_dir();
600        let state = open_state(&dir);
601        // ≥1000 ms → "X.Xs" branch; retries + validated + schema_changed flags
602        state
603            .record_metric(
604                "orders",
605                "r1",
606                1_500,
607                50_000,
608                Some(42),
609                "success",
610                None,
611                Some("balanced"),
612                Some("parquet"),
613                Some("full"),
614                1,
615                4096,
616                3,
617                Some(true),
618                Some(true),
619            )
620            .unwrap();
621        // <1000 ms → "Xms" branch; error_message line
622        state
623            .record_metric(
624                "orders",
625                "r2",
626                800,
627                0,
628                None,
629                "failed",
630                Some("timeout"),
631                None,
632                None,
633                None,
634                0,
635                0,
636                0,
637                Some(false),
638                None,
639            )
640            .unwrap();
641        drop(state);
642        assert!(show_metrics(&config_path, Some("orders"), 10).is_ok());
643    }
644
645    // ── show_journal ─────────────────────────────────────────────────────────
646
647    #[test]
648    fn show_journal_empty_returns_ok() {
649        let (dir, config_path) = setup_dir();
650        let _ = open_state(&dir);
651        assert!(show_journal(&config_path, "orders", 5, None).is_ok());
652    }
653
654    #[test]
655    fn show_journal_with_entry_returns_ok() {
656        let (dir, config_path) = setup_dir();
657        let state = open_state(&dir);
658        state
659            .store_journal(&make_journal("run_001", "orders"))
660            .unwrap();
661        drop(state);
662        assert!(show_journal(&config_path, "orders", 5, None).is_ok());
663    }
664
665    #[test]
666    fn show_journal_by_run_id_not_found_returns_ok() {
667        let (dir, config_path) = setup_dir();
668        let _ = open_state(&dir);
669        assert!(show_journal(&config_path, "orders", 5, Some("no_such_run")).is_ok());
670    }
671
672    #[test]
673    fn show_journal_by_run_id_found_returns_ok() {
674        let (dir, config_path) = setup_dir();
675        let state = open_state(&dir);
676        state
677            .store_journal(&make_journal("run_xyz", "orders"))
678            .unwrap();
679        drop(state);
680        assert!(show_journal(&config_path, "orders", 5, Some("run_xyz")).is_ok());
681    }
682
683    // ── reset_state ──────────────────────────────────────────────────────────
684
685    #[test]
686    fn reset_state_returns_ok() {
687        let (dir, config_path) = setup_dir();
688        write_two_export_config(&config_path);
689        let state = open_state(&dir);
690        state.update("orders", "100").unwrap();
691        drop(state);
692        assert!(reset_state(&config_path, "orders").is_ok());
693    }
694
695    // F-NEW (0.7.7 audit): `state reset` on an export that is not declared
696    // in the config used to silently succeed (DELETE WHERE export_name = X
697    // affects 0 rows; "State reset for export 'X'" printed; rc=0). A typo'd
698    // `--export pa_audi` looked like success but did nothing. This pins the
699    // hint-emitting bail.
700    #[test]
701    fn reset_state_unknown_export_bails_with_hint() {
702        let (_dir, config_path) = setup_dir();
703        write_two_export_config(&config_path);
704        let err = reset_state(&config_path, "ghost").unwrap_err();
705        let msg = format!("{err:#}");
706        assert!(
707            msg.contains("export 'ghost' not found"),
708            "must name the missing export: {msg}"
709        );
710        assert!(
711            msg.contains("orders") && msg.contains("transactions"),
712            "must list the declared exports so the user can spot the typo: {msg}"
713        );
714        assert!(
715            msg.contains("rivet state show"),
716            "must point at a follow-up command: {msg}"
717        );
718    }
719
720    // ── reset_chunk_checkpoint ───────────────────────────────────────────────
721
722    #[test]
723    fn reset_chunk_checkpoint_on_empty_db_returns_ok() {
724        let (dir, config_path) = setup_dir();
725        let _ = open_state(&dir);
726        assert!(reset_chunk_checkpoint(&config_path, "orders").is_ok());
727    }
728
729    #[test]
730    fn reset_chunk_checkpoints_stuck_no_rows_returns_ok() {
731        let (dir, config_path) = setup_dir();
732        write_two_export_config(&config_path);
733        let _ = open_state(&dir);
734        assert!(reset_chunk_checkpoints_stuck(&config_path).is_ok());
735    }
736
737    #[test]
738    fn reset_chunk_checkpoints_stuck_clears_matching_exports_only() {
739        let (dir, config_path) = setup_dir();
740        write_two_export_config(&config_path);
741        let state = open_state(&dir);
742        state
743            .create_chunk_run("r_tx", "transactions", "plan", 3)
744            .unwrap();
745        state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
746        drop(state);
747
748        reset_chunk_checkpoints_stuck(&config_path).unwrap();
749
750        let state = StateStore::open(&config_path).unwrap();
751        assert!(
752            state
753                .find_in_progress_chunk_run("transactions")
754                .unwrap()
755                .is_none()
756        );
757        assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
758        assert_eq!(
759            state.reset_chunk_checkpoint("ghost").unwrap(),
760            1,
761            "cleanup ghost row"
762        );
763    }
764
765    #[test]
766    fn reset_chunk_checkpoints_stuck_skips_when_only_unknown_exports_stuck() {
767        let (dir, config_path) = setup_dir();
768        write_single_export_config(&config_path);
769        let state = open_state(&dir);
770        state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
771        drop(state);
772
773        reset_chunk_checkpoints_stuck(&config_path).unwrap();
774
775        let state = StateStore::open(&config_path).unwrap();
776        assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
777        assert_eq!(state.reset_chunk_checkpoint("ghost").unwrap(), 1);
778    }
779
780    // ── show_progression ─────────────────────────────────────────────────────
781
782    #[test]
783    fn show_progression_empty_returns_ok() {
784        let (dir, config_path) = setup_dir();
785        let _ = open_state(&dir);
786        assert!(show_progression(&config_path, None).is_ok());
787    }
788
789    #[test]
790    fn show_progression_with_incremental_boundary_returns_ok() {
791        let (dir, config_path) = setup_dir();
792        let state = open_state(&dir);
793        state
794            .record_committed_incremental("orders", "2025-06-01", "run_001")
795            .unwrap();
796        drop(state);
797        assert!(show_progression(&config_path, Some("orders")).is_ok());
798    }
799
800    // ── show_chunk_checkpoint ────────────────────────────────────────────────
801
802    #[test]
803    fn show_chunk_checkpoint_no_data_returns_ok() {
804        let (dir, config_path) = setup_dir();
805        let _ = open_state(&dir);
806        assert!(show_chunk_checkpoint(&config_path, "orders").is_ok());
807    }
808}