rivet_cli/pipeline/
cli.rs1use super::format_bytes;
2use crate::error::Result;
3use crate::state::StateStore;
4
5pub fn show_state(config_path: &str) -> Result<()> {
6 let state = StateStore::open(config_path)?;
7 let states = state.list_all()?;
8 if states.is_empty() {
9 println!("No export state recorded yet.");
10 return Ok(());
11 }
12 println!("{:<30} {:<40} LAST RUN", "EXPORT", "LAST CURSOR");
13 println!("{}", "-".repeat(90));
14 for s in &states {
15 println!(
16 "{:<30} {:<40} {}",
17 s.export_name,
18 s.last_cursor_value.as_deref().unwrap_or("-"),
19 s.last_run_at.as_deref().unwrap_or("-"),
20 );
21 }
22 Ok(())
23}
24
25pub fn reset_state(config_path: &str, export_name: &str) -> Result<()> {
26 let state = StateStore::open(config_path)?;
27 state.reset(export_name)?;
28 println!("State reset for export '{}'", export_name);
29 Ok(())
30}
31
32pub fn show_files(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
33 let state = StateStore::open(config_path)?;
34 let files = state.get_files(export_name, limit)?;
35 if files.is_empty() {
36 println!("No files recorded yet.");
37 return Ok(());
38 }
39 println!(
40 "{:<35} {:<40} {:>8} {:>10} CREATED",
41 "RUN ID", "FILE", "ROWS", "BYTES"
42 );
43 println!("{}", "-".repeat(110));
44 for f in &files {
45 println!(
46 "{:<35} {:<40} {:>8} {:>10} {}",
47 f.run_id,
48 f.file_name,
49 f.row_count,
50 format_bytes(f.bytes as u64),
51 f.created_at,
52 );
53 }
54 Ok(())
55}
56
57pub fn show_metrics(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
58 let state = StateStore::open(config_path)?;
59 let metrics = state.get_metrics(export_name, limit)?;
60 if metrics.is_empty() {
61 println!("No metrics recorded yet.");
62 return Ok(());
63 }
64 println!(
65 "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} RUN ID",
66 "EXPORT", "STATUS", "ROWS", "DURATION", "RSS", "FILES", "BYTES"
67 );
68 println!("{}", "-".repeat(110));
69 for m in &metrics {
70 let duration = if m.duration_ms >= 1000 {
71 format!("{:.1}s", m.duration_ms as f64 / 1000.0)
72 } else {
73 format!("{}ms", m.duration_ms)
74 };
75 let rss = m
76 .peak_rss_mb
77 .map(|r| format!("{}MB", r))
78 .unwrap_or_else(|| "-".into());
79 let bytes = if m.bytes_written > 0 {
80 format_bytes(m.bytes_written as u64)
81 } else {
82 "-".into()
83 };
84 let run_id = m.run_id.as_deref().unwrap_or(&m.run_at);
85 println!(
86 "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} {}",
87 m.export_name, m.status, m.total_rows, duration, rss, m.files_produced, bytes, run_id
88 );
89 if let Some(err) = &m.error_message {
90 println!(" Error: {}", err);
91 }
92 let mut flags = Vec::new();
93 if m.retries > 0 {
94 flags.push(format!("retries={}", m.retries));
95 }
96 if let Some(v) = m.validated {
97 flags.push(format!("validated={}", if v { "pass" } else { "FAIL" }));
98 }
99 if let Some(sc) = m.schema_changed {
100 flags.push(format!("schema={}", if sc { "CHANGED" } else { "ok" }));
101 }
102 if !flags.is_empty() {
103 println!(" {}", flags.join(" "));
104 }
105 }
106 Ok(())
107}
108
109pub fn reset_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
110 let state = StateStore::open(config_path)?;
111 let n = state.reset_chunk_checkpoint(export_name)?;
112 println!(
113 "Removed {} chunk run record(s) for export '{}'.",
114 n, export_name
115 );
116 Ok(())
117}
118
119pub fn show_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
120 let state = StateStore::open(config_path)?;
121 println!(
122 "database: {}",
123 StateStore::state_db_path(config_path).display()
124 );
125 let Some((run_id, plan_hash, status, updated_at)) = state.get_latest_chunk_run(export_name)?
126 else {
127 println!("No chunk checkpoint data for export '{}'.", export_name);
128 return Ok(());
129 };
130 println!("export: {}", export_name);
131 println!("run_id: {}", run_id);
132 println!("plan_hash: {}", plan_hash);
133 println!("status: {}", status);
134 println!("updated_at: {}", updated_at);
135 println!();
136 println!(
137 "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} FILE",
138 "IDX", "STATUS", "START", "END", "ATT", "ROWS"
139 );
140 println!("{}", "-".repeat(90));
141 for t in state.list_chunk_tasks_for_run(&run_id)? {
142 let file = t.file_name.as_deref().unwrap_or("-");
143 let rows = t
144 .rows_written
145 .map(|r| r.to_string())
146 .unwrap_or_else(|| "-".into());
147 println!(
148 "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} {}",
149 t.chunk_index, t.status, t.start_key, t.end_key, t.attempts, rows, file
150 );
151 if let Some(e) = &t.last_error {
152 println!(" error: {}", e);
153 }
154 }
155 Ok(())
156}