Skip to main content

rivet/pipeline/
cli.rs

1//! **Layer: Observability**
2//!
3//! CLI display commands for state, metrics, files, chunk checkpoints, and journal history.
4//! Reads from the state stores and formats output — no execution or persistence writes.
5
6use super::format_bytes;
7use crate::config::Config;
8use crate::error::Result;
9use crate::journal::RunEvent;
10use crate::state::StateStore;
11
12pub fn show_state(config_path: &str) -> Result<()> {
13    let state = StateStore::open(config_path)?;
14    let states = state.list_all()?;
15    if states.is_empty() {
16        // `state.list_all()` returns only incremental-cursor rows. Chunked
17        // and full runs land in different tables (`export_metrics`,
18        // `export_files`, `chunk_state`), so an empty cursor list does not
19        // mean "this config never ran". Distinguish the two cases:
20        //   - never ran    → tell the operator to run first
21        //   - ran chunked  → point at `rivet metrics` / `rivet state files`
22        // Anything else here is misleading ("No state" after a successful
23        // chunked run sounds like data loss).
24        let any_run = state
25            .get_metrics(None, 1)
26            .map(|m| !m.is_empty())
27            .unwrap_or(false);
28        if any_run {
29            println!(
30                "No incremental cursor recorded yet.\n  \
31                 This command shows incremental-mode cursors only.\n  \
32                 For chunked / full runs, see:\n  \
33                 • rivet metrics      — per-run history (status, rows, duration)\n  \
34                 • rivet state files  — every produced file with row count + size"
35            );
36        } else {
37            println!(
38                "No exports have been run yet.\n  \
39                 Run `rivet run --config {}` first, then try `rivet state` again.",
40                config_path
41            );
42        }
43        return Ok(());
44    }
45    println!("{:<30} {:<40} LAST RUN", "EXPORT", "LAST CURSOR");
46    println!("{}", "-".repeat(90));
47    for s in &states {
48        println!(
49            "{:<30} {:<40} {}",
50            s.export_name,
51            s.last_cursor_value.as_deref().unwrap_or("-"),
52            s.last_run_at.as_deref().unwrap_or("-"),
53        );
54    }
55    Ok(())
56}
57
58/// Epic G / ADR-0008 — explicit committed / verified boundaries per export.
59pub fn show_progression(config_path: &str, export_name: Option<&str>) -> Result<()> {
60    let state = StateStore::open(config_path)?;
61    let entries = match export_name {
62        Some(name) => vec![state.get_progression(name)?],
63        None => state.list_progression()?,
64    };
65    let has_any = entries
66        .iter()
67        .any(|p| p.committed.is_some() || p.verified.is_some());
68    if !has_any {
69        println!("No progression boundaries recorded yet.");
70        return Ok(());
71    }
72
73    println!(
74        "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
75        "EXPORT", "COMM MODE", "COMMITTED", "COMMITTED AT", "VERI MODE", "VERIFIED"
76    );
77    println!("{}", "-".repeat(145));
78    for p in &entries {
79        let (c_mode, c_val, c_at) = match &p.committed {
80            Some(b) => (
81                b.strategy.as_str().to_string(),
82                boundary_value(b),
83                b.at.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
84            ),
85            None => ("-".into(), "-".into(), "-".into()),
86        };
87        let (v_mode, v_val) = match &p.verified {
88            Some(b) => (b.strategy.as_str().to_string(), boundary_value(b)),
89            None => ("-".into(), "-".into()),
90        };
91        println!(
92            "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
93            p.export_name, c_mode, c_val, c_at, v_mode, v_val
94        );
95    }
96    Ok(())
97}
98
99fn boundary_value(b: &crate::state::Boundary) -> String {
100    if let Some(c) = &b.cursor {
101        c.clone()
102    } else if let Some(idx) = b.chunk_index {
103        format!("chunk #{idx}")
104    } else {
105        "-".into()
106    }
107}
108
109pub fn reset_state(config_path: &str, export_name: &str) -> Result<()> {
110    // Validate the export name against the config BEFORE touching state, so a
111    // typo (`--export pa_audi` instead of `pa_audit`) produces a hint with the
112    // declared names instead of silently DELETE-ing zero rows and printing
113    // "State reset for export 'pa_audi'" as if it worked.  Without this guard
114    // a stray `--export <unknown>` looked like success but did nothing — a
115    // genuine ops footgun under "I reset it, why did the cursor not move".
116    let config = crate::config::Config::load(config_path)?;
117    if !config.exports.iter().any(|e| e.name == export_name) {
118        let known: Vec<String> = config.exports.iter().map(|e| e.name.clone()).collect();
119        anyhow::bail!(
120            "export '{}' not found in config '{}'.\n  Known exports: {}\n  Hint: check the spelling, or run `rivet state show -c {}` to see what is currently tracked.",
121            export_name,
122            config_path,
123            if known.is_empty() {
124                "(none defined)".to_string()
125            } else {
126                known.join(", ")
127            },
128            config_path,
129        );
130    }
131    let state = StateStore::open(config_path)?;
132    state.reset(export_name)?;
133    println!("State reset for export '{}'", export_name);
134    Ok(())
135}
136
137pub fn show_files(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
138    let state = StateStore::open(config_path)?;
139    let files = state.get_files(export_name, limit)?;
140    if files.is_empty() {
141        println!("No files recorded yet.");
142        return Ok(());
143    }
144    println!(
145        "{:<35} {:<40} {:>8} {:>10} CREATED",
146        "RUN ID", "FILE", "ROWS", "BYTES"
147    );
148    println!("{}", "-".repeat(110));
149    for f in &files {
150        println!(
151            "{:<35} {:<40} {:>8} {:>10} {}",
152            f.run_id,
153            f.file_name,
154            f.row_count,
155            format_bytes(f.bytes as u64),
156            f.created_at,
157        );
158    }
159    Ok(())
160}
161
162pub fn show_metrics(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
163    let state = StateStore::open(config_path)?;
164    let metrics = state.get_metrics(export_name, limit)?;
165    if metrics.is_empty() {
166        println!("No metrics recorded yet.");
167        return Ok(());
168    }
169    println!(
170        "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} RUN ID",
171        "EXPORT", "STATUS", "ROWS", "DURATION", "RSS", "FILES", "BYTES"
172    );
173    println!("{}", "-".repeat(110));
174    for m in &metrics {
175        let duration = if m.duration_ms >= 1000 {
176            format!("{:.1}s", m.duration_ms as f64 / 1000.0)
177        } else {
178            format!("{}ms", m.duration_ms)
179        };
180        let rss = m
181            .peak_rss_mb
182            .map(|r| format!("{}MB", r))
183            .unwrap_or_else(|| "-".into());
184        let bytes = if m.bytes_written > 0 {
185            format_bytes(m.bytes_written as u64)
186        } else {
187            "-".into()
188        };
189        let run_id = m.run_id.as_deref().unwrap_or(&m.run_at);
190        println!(
191            "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} {}",
192            m.export_name, m.status, m.total_rows, duration, rss, m.files_produced, bytes, run_id
193        );
194        if let Some(err) = &m.error_message {
195            println!("  Error: {}", err);
196        }
197        let mut flags = Vec::new();
198        if m.retries > 0 {
199            flags.push(format!("retries={}", m.retries));
200        }
201        if let Some(v) = m.validated {
202            flags.push(format!("validated={}", if v { "pass" } else { "FAIL" }));
203        }
204        if let Some(sc) = m.schema_changed {
205            flags.push(format!("schema={}", if sc { "CHANGED" } else { "ok" }));
206        }
207        if !flags.is_empty() {
208            println!("  {}", flags.join("  "));
209        }
210    }
211    Ok(())
212}
213
214pub fn reset_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
215    let state = StateStore::open(config_path)?;
216    let n = state.reset_chunk_checkpoint(export_name)?;
217    println!(
218        "Removed {} chunk run record(s) for export '{}'.",
219        n, export_name
220    );
221    Ok(())
222}
223
224/// Clear chunk checkpoints for every export **named in `config_path`'s YAML** that currently has an
225/// `in_progress` chunk run (`chunk_run.status = 'in_progress'`).
226///
227/// Names present only in state (removed from config) are skipped with a printed note.
228pub fn reset_chunk_checkpoints_stuck(config_path: &str) -> Result<()> {
229    let cfg = Config::load(config_path)?;
230    let allowed: std::collections::HashSet<&str> =
231        cfg.exports.iter().map(|e| e.name.as_str()).collect();
232    let state = StateStore::open(config_path)?;
233    let stuck = state.list_export_names_with_in_progress_chunk_runs()?;
234    if stuck.is_empty() {
235        println!("No exports have an in-progress chunk checkpoint run.");
236        println!(
237            "(Nothing with chunk_run.status = 'in_progress' in {}.)",
238            StateStore::state_db_path(config_path).display()
239        );
240        return Ok(());
241    }
242
243    let mut skipped_not_in_config = Vec::new();
244    let mut targets = Vec::new();
245    for name in stuck {
246        if allowed.contains(name.as_str()) {
247            targets.push(name);
248        } else {
249            skipped_not_in_config.push(name);
250        }
251    }
252
253    for name in &skipped_not_in_config {
254        println!(
255            "Skipping '{}' — chunk checkpoint still in_progress but this export is not in the config.",
256            name
257        );
258    }
259
260    if targets.is_empty() {
261        println!(
262            "No matching exports to reset (none of the in-progress runs belong to exports in this config)."
263        );
264        return Ok(());
265    }
266
267    println!(
268        "Resetting chunk checkpoints for {} export(s) with in_progress runs: {}",
269        targets.len(),
270        targets.join(", ")
271    );
272
273    for name in targets {
274        let n = state.reset_chunk_checkpoint(&name)?;
275        println!("Removed {} chunk run record(s) for export '{}'.", n, name);
276    }
277    Ok(())
278}
279
280pub fn show_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
281    let state = StateStore::open(config_path)?;
282    println!(
283        "database:   {}",
284        StateStore::state_db_path(config_path).display()
285    );
286    let Some((run_id, plan_hash, status, updated_at)) = state.get_latest_chunk_run(export_name)?
287    else {
288        println!("No chunk checkpoint data for export '{}'.", export_name);
289        return Ok(());
290    };
291    println!("export:     {}", export_name);
292    println!("run_id:     {}", run_id);
293    println!("plan_hash:  {}", plan_hash);
294    println!("status:     {}", status);
295    println!("updated_at: {}", updated_at);
296    println!();
297    println!(
298        "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} FILE",
299        "IDX", "STATUS", "START", "END", "ATT", "ROWS"
300    );
301    println!("{}", "-".repeat(90));
302    for t in state.list_chunk_tasks_for_run(&run_id)? {
303        let file = t.file_name.as_deref().unwrap_or("-");
304        let rows = t
305            .rows_written
306            .map(|r| r.to_string())
307            .unwrap_or_else(|| "-".into());
308        println!(
309            "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} {}",
310            t.chunk_index, t.status, t.start_key, t.end_key, t.attempts, rows, file
311        );
312        if let Some(e) = &t.last_error {
313            println!("       error: {}", e);
314        }
315    }
316    Ok(())
317}
318
319/// Display recent run journal entries for an export.
320///
321/// Shows up to `limit` most recent runs (newest first), each as a compact
322/// block: run header + per-event summary lines.
323pub fn show_journal(
324    config_path: &str,
325    export_name: &str,
326    limit: usize,
327    run_id: Option<&str>,
328) -> Result<()> {
329    let state = StateStore::open(config_path)?;
330
331    let journals = if let Some(rid) = run_id {
332        match state.load_journal(rid)? {
333            Some(j) => vec![j],
334            None => {
335                println!("No journal found for run_id '{rid}'.");
336                return Ok(());
337            }
338        }
339    } else {
340        state.recent_journals(export_name, limit)?
341    };
342
343    if journals.is_empty() {
344        println!("No journal entries for export '{export_name}' yet.");
345        println!("Journals are recorded after each `rivet run`.");
346        return Ok(());
347    }
348
349    for journal in &journals {
350        // ── run header ────────────────────────────────────────────────────
351        let outcome = journal.final_outcome().and_then(|e| {
352            if let RunEvent::RunCompleted {
353                status,
354                duration_ms,
355                ..
356            } = &e.event
357            {
358                Some((status.as_str(), *duration_ms))
359            } else {
360                None
361            }
362        });
363        let (status_str, duration_str) = match outcome {
364            Some((s, ms)) if ms >= 1000 => (s, format!("{:.1}s", ms as f64 / 1000.0)),
365            Some((s, ms)) => (s, format!("{ms}ms")),
366            None => ("(incomplete)", String::new()),
367        };
368        let icon = match status_str {
369            "success" => "✓",
370            "failed" => "✗",
371            _ => "•",
372        };
373        println!(
374            "\n{icon} {export}  {status}  {dur}",
375            export = journal.export_name,
376            status = status_str,
377            dur = duration_str,
378        );
379        println!("  run_id: {}", journal.run_id);
380
381        // ── event summary lines ────────────────────────────────────────────
382        let files = journal.files();
383        if !files.is_empty() {
384            let total_rows: i64 = files
385                .iter()
386                .filter_map(|e| {
387                    if let RunEvent::FileWritten { rows, .. } = &e.event {
388                        Some(*rows)
389                    } else {
390                        None
391                    }
392                })
393                .sum();
394            let total_bytes: u64 = files
395                .iter()
396                .filter_map(|e| {
397                    if let RunEvent::FileWritten { bytes, .. } = &e.event {
398                        Some(*bytes)
399                    } else {
400                        None
401                    }
402                })
403                .sum();
404            println!(
405                "  files:  {}  rows: {}  size: {}",
406                files.len(),
407                total_rows,
408                format_bytes(total_bytes),
409            );
410        }
411
412        let retries = journal.retries();
413        if !retries.is_empty() {
414            println!("  retries: {}", retries.len());
415        }
416
417        for e in journal.quality_issues() {
418            if let RunEvent::QualityIssue { severity, message } = &e.event {
419                println!("  quality [{severity}]: {message}");
420            }
421        }
422
423        for e in journal.schema_changes() {
424            if let RunEvent::SchemaChanged {
425                added,
426                removed,
427                type_changed,
428            } = &e.event
429            {
430                if !added.is_empty() {
431                    println!("  schema: +{}", added.join(", +"));
432                }
433                if !removed.is_empty() {
434                    println!("  schema: -{}", removed.join(", -"));
435                }
436                for (col, old, new) in type_changed {
437                    println!("  schema: {col} {old}→{new}");
438                }
439            }
440        }
441
442        if let Some(e) = journal.final_outcome()
443            && let RunEvent::RunCompleted {
444                error_message: Some(err),
445                ..
446            } = &e.event
447        {
448            let first_line = err.lines().next().unwrap_or(err);
449            println!("  error:  {first_line}");
450        }
451    }
452    println!();
453    Ok(())
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459    use crate::journal::{RunEvent, RunJournal};
460
461    // ── helpers ──────────────────────────────────────────────────────────────
462
463    fn setup_dir() -> (tempfile::TempDir, String) {
464        let dir = tempfile::TempDir::new().unwrap();
465        // config file itself does not need to exist; StateStore::open uses its parent dir
466        let config_path = dir.path().join("rivet.yaml").to_str().unwrap().to_string();
467        (dir, config_path)
468    }
469
470    fn write_two_export_config(config_path: &str) {
471        std::fs::write(
472            config_path,
473            br#"source:
474  type: postgres
475  url: postgresql://localhost/testdb
476exports:
477  - name: transactions
478    query: "SELECT 1"
479    mode: full
480    format: parquet
481    destination:
482      type: local
483      path: ./out
484  - name: orders
485    query: "SELECT 1"
486    mode: full
487    format: parquet
488    destination:
489      type: local
490      path: ./out
491"#,
492        )
493        .unwrap();
494    }
495
496    fn write_single_export_config(config_path: &str) {
497        std::fs::write(
498            config_path,
499            br#"source:
500  type: postgres
501  url: postgresql://localhost/testdb
502exports:
503  - name: transactions
504    query: "SELECT 1"
505    mode: full
506    format: parquet
507    destination:
508      type: local
509      path: ./out
510"#,
511        )
512        .unwrap();
513    }
514
515    fn open_state(dir: &tempfile::TempDir) -> StateStore {
516        let db_path = dir.path().join(".rivet_state.db");
517        StateStore::open_at_path(&db_path).unwrap()
518    }
519
520    fn make_journal(run_id: &str, export: &str) -> RunJournal {
521        let mut j = RunJournal::new(run_id, export);
522        j.record(RunEvent::FileWritten {
523            file_name: "part0.parquet".into(),
524            rows: 1_000,
525            bytes: 65_536,
526            part_index: 0,
527        });
528        j.record(RunEvent::RunCompleted {
529            status: "success".into(),
530            error_message: None,
531            duration_ms: 1_500,
532        });
533        j
534    }
535
536    // ── boundary_value ───────────────────────────────────────────────────────
537
538    fn make_boundary(cursor: Option<&str>, chunk_index: Option<i64>) -> crate::state::Boundary {
539        crate::state::Boundary {
540            strategy: "incremental".into(),
541            run_id: None,
542            cursor: cursor.map(|s| s.to_string()),
543            chunk_index,
544            at: chrono::Utc::now(),
545        }
546    }
547
548    #[test]
549    fn boundary_value_cursor_takes_precedence_over_chunk_index() {
550        let b = make_boundary(Some("2025-01-15"), Some(42));
551        assert_eq!(boundary_value(&b), "2025-01-15");
552    }
553
554    #[test]
555    fn boundary_value_chunk_index_used_when_no_cursor() {
556        let b = make_boundary(None, Some(7));
557        assert_eq!(boundary_value(&b), "chunk #7");
558    }
559
560    #[test]
561    fn boundary_value_dash_when_neither_set() {
562        let b = make_boundary(None, None);
563        assert_eq!(boundary_value(&b), "-");
564    }
565
566    // ── show_state ───────────────────────────────────────────────────────────
567
568    #[test]
569    fn show_state_empty_db_returns_ok() {
570        let (dir, config_path) = setup_dir();
571        let _ = open_state(&dir); // create the DB file
572        assert!(show_state(&config_path).is_ok());
573    }
574
575    #[test]
576    fn show_state_with_cursor_record_returns_ok() {
577        let (dir, config_path) = setup_dir();
578        let state = open_state(&dir);
579        state.update("orders", "2025-01-15").unwrap();
580        drop(state);
581        assert!(show_state(&config_path).is_ok());
582    }
583
584    // ── show_files ───────────────────────────────────────────────────────────
585
586    #[test]
587    fn show_files_empty_returns_ok() {
588        let (dir, config_path) = setup_dir();
589        let _ = open_state(&dir);
590        assert!(show_files(&config_path, None, 10).is_ok());
591    }
592
593    #[test]
594    fn show_files_with_record_returns_ok() {
595        let (dir, config_path) = setup_dir();
596        let state = open_state(&dir);
597        state
598            .record_file(
599                "r1",
600                "orders",
601                "orders_001.parquet",
602                50_000,
603                4096,
604                "parquet",
605                Some("zstd"),
606            )
607            .unwrap();
608        drop(state);
609        assert!(show_files(&config_path, Some("orders"), 10).is_ok());
610    }
611
612    // ── show_metrics ─────────────────────────────────────────────────────────
613
614    #[test]
615    fn show_metrics_empty_returns_ok() {
616        let (dir, config_path) = setup_dir();
617        let _ = open_state(&dir);
618        assert!(show_metrics(&config_path, None, 10).is_ok());
619    }
620
621    #[test]
622    fn show_metrics_exercises_flag_and_duration_paths() {
623        // Covers retries / validated / schema_changed flag lines and
624        // the ms-vs-seconds duration branch in the formatter.
625        let (dir, config_path) = setup_dir();
626        let state = open_state(&dir);
627        // ≥1000 ms → "X.Xs" branch; retries + validated + schema_changed flags
628        state
629            .record_metric(
630                "orders",
631                "r1",
632                1_500,
633                50_000,
634                Some(42),
635                "success",
636                None,
637                Some("balanced"),
638                Some("parquet"),
639                Some("full"),
640                1,
641                4096,
642                3,
643                Some(true),
644                Some(true),
645            )
646            .unwrap();
647        // <1000 ms → "Xms" branch; error_message line
648        state
649            .record_metric(
650                "orders",
651                "r2",
652                800,
653                0,
654                None,
655                "failed",
656                Some("timeout"),
657                None,
658                None,
659                None,
660                0,
661                0,
662                0,
663                Some(false),
664                None,
665            )
666            .unwrap();
667        drop(state);
668        assert!(show_metrics(&config_path, Some("orders"), 10).is_ok());
669    }
670
671    // ── show_journal ─────────────────────────────────────────────────────────
672
673    #[test]
674    fn show_journal_empty_returns_ok() {
675        let (dir, config_path) = setup_dir();
676        let _ = open_state(&dir);
677        assert!(show_journal(&config_path, "orders", 5, None).is_ok());
678    }
679
680    #[test]
681    fn show_journal_with_entry_returns_ok() {
682        let (dir, config_path) = setup_dir();
683        let state = open_state(&dir);
684        state
685            .store_journal(&make_journal("run_001", "orders"))
686            .unwrap();
687        drop(state);
688        assert!(show_journal(&config_path, "orders", 5, None).is_ok());
689    }
690
691    #[test]
692    fn show_journal_by_run_id_not_found_returns_ok() {
693        let (dir, config_path) = setup_dir();
694        let _ = open_state(&dir);
695        assert!(show_journal(&config_path, "orders", 5, Some("no_such_run")).is_ok());
696    }
697
698    #[test]
699    fn show_journal_by_run_id_found_returns_ok() {
700        let (dir, config_path) = setup_dir();
701        let state = open_state(&dir);
702        state
703            .store_journal(&make_journal("run_xyz", "orders"))
704            .unwrap();
705        drop(state);
706        assert!(show_journal(&config_path, "orders", 5, Some("run_xyz")).is_ok());
707    }
708
709    // ── reset_state ──────────────────────────────────────────────────────────
710
711    #[test]
712    fn reset_state_returns_ok() {
713        let (dir, config_path) = setup_dir();
714        write_two_export_config(&config_path);
715        let state = open_state(&dir);
716        state.update("orders", "100").unwrap();
717        drop(state);
718        assert!(reset_state(&config_path, "orders").is_ok());
719    }
720
721    // F-NEW (0.7.7 audit): `state reset` on an export that is not declared
722    // in the config used to silently succeed (DELETE WHERE export_name = X
723    // affects 0 rows; "State reset for export 'X'" printed; rc=0). A typo'd
724    // `--export pa_audi` looked like success but did nothing. This pins the
725    // hint-emitting bail.
726    #[test]
727    fn reset_state_unknown_export_bails_with_hint() {
728        let (_dir, config_path) = setup_dir();
729        write_two_export_config(&config_path);
730        let err = reset_state(&config_path, "ghost").unwrap_err();
731        let msg = format!("{err:#}");
732        assert!(
733            msg.contains("export 'ghost' not found"),
734            "must name the missing export: {msg}"
735        );
736        assert!(
737            msg.contains("orders") && msg.contains("transactions"),
738            "must list the declared exports so the user can spot the typo: {msg}"
739        );
740        assert!(
741            msg.contains("rivet state show"),
742            "must point at a follow-up command: {msg}"
743        );
744    }
745
746    // ── reset_chunk_checkpoint ───────────────────────────────────────────────
747
748    #[test]
749    fn reset_chunk_checkpoint_on_empty_db_returns_ok() {
750        let (dir, config_path) = setup_dir();
751        let _ = open_state(&dir);
752        assert!(reset_chunk_checkpoint(&config_path, "orders").is_ok());
753    }
754
755    #[test]
756    fn reset_chunk_checkpoints_stuck_no_rows_returns_ok() {
757        let (dir, config_path) = setup_dir();
758        write_two_export_config(&config_path);
759        let _ = open_state(&dir);
760        assert!(reset_chunk_checkpoints_stuck(&config_path).is_ok());
761    }
762
763    #[test]
764    fn reset_chunk_checkpoints_stuck_clears_matching_exports_only() {
765        let (dir, config_path) = setup_dir();
766        write_two_export_config(&config_path);
767        let state = open_state(&dir);
768        state
769            .create_chunk_run("r_tx", "transactions", "plan", 3)
770            .unwrap();
771        state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
772        drop(state);
773
774        reset_chunk_checkpoints_stuck(&config_path).unwrap();
775
776        let state = StateStore::open(&config_path).unwrap();
777        assert!(
778            state
779                .find_in_progress_chunk_run("transactions")
780                .unwrap()
781                .is_none()
782        );
783        assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
784        assert_eq!(
785            state.reset_chunk_checkpoint("ghost").unwrap(),
786            1,
787            "cleanup ghost row"
788        );
789    }
790
791    #[test]
792    fn reset_chunk_checkpoints_stuck_skips_when_only_unknown_exports_stuck() {
793        let (dir, config_path) = setup_dir();
794        write_single_export_config(&config_path);
795        let state = open_state(&dir);
796        state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
797        drop(state);
798
799        reset_chunk_checkpoints_stuck(&config_path).unwrap();
800
801        let state = StateStore::open(&config_path).unwrap();
802        assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
803        assert_eq!(state.reset_chunk_checkpoint("ghost").unwrap(), 1);
804    }
805
806    // ── show_progression ─────────────────────────────────────────────────────
807
808    #[test]
809    fn show_progression_empty_returns_ok() {
810        let (dir, config_path) = setup_dir();
811        let _ = open_state(&dir);
812        assert!(show_progression(&config_path, None).is_ok());
813    }
814
815    #[test]
816    fn show_progression_with_incremental_boundary_returns_ok() {
817        let (dir, config_path) = setup_dir();
818        let state = open_state(&dir);
819        state
820            .record_committed_incremental("orders", "2025-06-01", "run_001")
821            .unwrap();
822        drop(state);
823        assert!(show_progression(&config_path, Some("orders")).is_ok());
824    }
825
826    // ── show_chunk_checkpoint ────────────────────────────────────────────────
827
828    #[test]
829    fn show_chunk_checkpoint_no_data_returns_ok() {
830        let (dir, config_path) = setup_dir();
831        let _ = open_state(&dir);
832        assert!(show_chunk_checkpoint(&config_path, "orders").is_ok());
833    }
834}