Skip to main content

rivet_cli/pipeline/
cli.rs

1use super::format_bytes;
2use crate::error::Result;
3use crate::state::StateStore;
4
5pub fn show_state(config_path: &str) -> Result<()> {
6    let state = StateStore::open(config_path)?;
7    let states = state.list_all()?;
8    if states.is_empty() {
9        println!("No export state recorded yet.");
10        return Ok(());
11    }
12    println!("{:<30} {:<40} LAST RUN", "EXPORT", "LAST CURSOR");
13    println!("{}", "-".repeat(90));
14    for s in &states {
15        println!(
16            "{:<30} {:<40} {}",
17            s.export_name,
18            s.last_cursor_value.as_deref().unwrap_or("-"),
19            s.last_run_at.as_deref().unwrap_or("-"),
20        );
21    }
22    Ok(())
23}
24
25pub fn reset_state(config_path: &str, export_name: &str) -> Result<()> {
26    let state = StateStore::open(config_path)?;
27    state.reset(export_name)?;
28    println!("State reset for export '{}'", export_name);
29    Ok(())
30}
31
32pub fn show_files(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
33    let state = StateStore::open(config_path)?;
34    let files = state.get_files(export_name, limit)?;
35    if files.is_empty() {
36        println!("No files recorded yet.");
37        return Ok(());
38    }
39    println!(
40        "{:<35} {:<40} {:>8} {:>10} CREATED",
41        "RUN ID", "FILE", "ROWS", "BYTES"
42    );
43    println!("{}", "-".repeat(110));
44    for f in &files {
45        println!(
46            "{:<35} {:<40} {:>8} {:>10} {}",
47            f.run_id,
48            f.file_name,
49            f.row_count,
50            format_bytes(f.bytes as u64),
51            f.created_at,
52        );
53    }
54    Ok(())
55}
56
57pub fn show_metrics(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
58    let state = StateStore::open(config_path)?;
59    let metrics = state.get_metrics(export_name, limit)?;
60    if metrics.is_empty() {
61        println!("No metrics recorded yet.");
62        return Ok(());
63    }
64    println!(
65        "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} RUN ID",
66        "EXPORT", "STATUS", "ROWS", "DURATION", "RSS", "FILES", "BYTES"
67    );
68    println!("{}", "-".repeat(110));
69    for m in &metrics {
70        let duration = if m.duration_ms >= 1000 {
71            format!("{:.1}s", m.duration_ms as f64 / 1000.0)
72        } else {
73            format!("{}ms", m.duration_ms)
74        };
75        let rss = m
76            .peak_rss_mb
77            .map(|r| format!("{}MB", r))
78            .unwrap_or_else(|| "-".into());
79        let bytes = if m.bytes_written > 0 {
80            format_bytes(m.bytes_written as u64)
81        } else {
82            "-".into()
83        };
84        let run_id = m.run_id.as_deref().unwrap_or(&m.run_at);
85        println!(
86            "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} {}",
87            m.export_name, m.status, m.total_rows, duration, rss, m.files_produced, bytes, run_id
88        );
89        if let Some(err) = &m.error_message {
90            println!("  Error: {}", err);
91        }
92        let mut flags = Vec::new();
93        if m.retries > 0 {
94            flags.push(format!("retries={}", m.retries));
95        }
96        if let Some(v) = m.validated {
97            flags.push(format!("validated={}", if v { "pass" } else { "FAIL" }));
98        }
99        if let Some(sc) = m.schema_changed {
100            flags.push(format!("schema={}", if sc { "CHANGED" } else { "ok" }));
101        }
102        if !flags.is_empty() {
103            println!("  {}", flags.join("  "));
104        }
105    }
106    Ok(())
107}
108
109pub fn reset_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
110    let state = StateStore::open(config_path)?;
111    let n = state.reset_chunk_checkpoint(export_name)?;
112    println!(
113        "Removed {} chunk run record(s) for export '{}'.",
114        n, export_name
115    );
116    Ok(())
117}
118
119pub fn show_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
120    let state = StateStore::open(config_path)?;
121    println!(
122        "database:   {}",
123        StateStore::state_db_path(config_path).display()
124    );
125    let Some((run_id, plan_hash, status, updated_at)) = state.get_latest_chunk_run(export_name)?
126    else {
127        println!("No chunk checkpoint data for export '{}'.", export_name);
128        return Ok(());
129    };
130    println!("export:     {}", export_name);
131    println!("run_id:     {}", run_id);
132    println!("plan_hash:  {}", plan_hash);
133    println!("status:     {}", status);
134    println!("updated_at: {}", updated_at);
135    println!();
136    println!(
137        "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} FILE",
138        "IDX", "STATUS", "START", "END", "ATT", "ROWS"
139    );
140    println!("{}", "-".repeat(90));
141    for t in state.list_chunk_tasks_for_run(&run_id)? {
142        let file = t.file_name.as_deref().unwrap_or("-");
143        let rows = t
144            .rows_written
145            .map(|r| r.to_string())
146            .unwrap_or_else(|| "-".into());
147        println!(
148            "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} {}",
149            t.chunk_index, t.status, t.start_key, t.end_key, t.attempts, rows, file
150        );
151        if let Some(e) = &t.last_error {
152            println!("       error: {}", e);
153        }
154    }
155    Ok(())
156}