rivet-cli 0.2.0-beta.2

CLI tool to export PostgreSQL and MySQL to Parquet/CSV (local, S3, GCS) with tuning, preflight checks, and SQLite-backed state.
Documentation
use super::format_bytes;
use crate::error::Result;
use crate::state::StateStore;

pub fn show_state(config_path: &str) -> Result<()> {
    let state = StateStore::open(config_path)?;
    let states = state.list_all()?;
    if states.is_empty() {
        println!("No export state recorded yet.");
        return Ok(());
    }
    println!("{:<30} {:<40} LAST RUN", "EXPORT", "LAST CURSOR");
    println!("{}", "-".repeat(90));
    for s in &states {
        println!(
            "{:<30} {:<40} {}",
            s.export_name,
            s.last_cursor_value.as_deref().unwrap_or("-"),
            s.last_run_at.as_deref().unwrap_or("-"),
        );
    }
    Ok(())
}

pub fn reset_state(config_path: &str, export_name: &str) -> Result<()> {
    let state = StateStore::open(config_path)?;
    state.reset(export_name)?;
    println!("State reset for export '{}'", export_name);
    Ok(())
}

pub fn show_files(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
    let state = StateStore::open(config_path)?;
    let files = state.get_files(export_name, limit)?;
    if files.is_empty() {
        println!("No files recorded yet.");
        return Ok(());
    }
    println!(
        "{:<35} {:<40} {:>8} {:>10} CREATED",
        "RUN ID", "FILE", "ROWS", "BYTES"
    );
    println!("{}", "-".repeat(110));
    for f in &files {
        println!(
            "{:<35} {:<40} {:>8} {:>10} {}",
            f.run_id,
            f.file_name,
            f.row_count,
            format_bytes(f.bytes as u64),
            f.created_at,
        );
    }
    Ok(())
}

pub fn show_metrics(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
    let state = StateStore::open(config_path)?;
    let metrics = state.get_metrics(export_name, limit)?;
    if metrics.is_empty() {
        println!("No metrics recorded yet.");
        return Ok(());
    }
    println!(
        "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} RUN ID",
        "EXPORT", "STATUS", "ROWS", "DURATION", "RSS", "FILES", "BYTES"
    );
    println!("{}", "-".repeat(110));
    for m in &metrics {
        let duration = if m.duration_ms >= 1000 {
            format!("{:.1}s", m.duration_ms as f64 / 1000.0)
        } else {
            format!("{}ms", m.duration_ms)
        };
        let rss = m
            .peak_rss_mb
            .map(|r| format!("{}MB", r))
            .unwrap_or_else(|| "-".into());
        let bytes = if m.bytes_written > 0 {
            format_bytes(m.bytes_written as u64)
        } else {
            "-".into()
        };
        let run_id = m.run_id.as_deref().unwrap_or(&m.run_at);
        println!(
            "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} {}",
            m.export_name, m.status, m.total_rows, duration, rss, m.files_produced, bytes, run_id
        );
        if let Some(err) = &m.error_message {
            println!("  Error: {}", err);
        }
        let mut flags = Vec::new();
        if m.retries > 0 {
            flags.push(format!("retries={}", m.retries));
        }
        if let Some(v) = m.validated {
            flags.push(format!("validated={}", if v { "pass" } else { "FAIL" }));
        }
        if let Some(sc) = m.schema_changed {
            flags.push(format!("schema={}", if sc { "CHANGED" } else { "ok" }));
        }
        if !flags.is_empty() {
            println!("  {}", flags.join("  "));
        }
    }
    Ok(())
}

pub fn reset_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
    let state = StateStore::open(config_path)?;
    let n = state.reset_chunk_checkpoint(export_name)?;
    println!(
        "Removed {} chunk run record(s) for export '{}'.",
        n, export_name
    );
    Ok(())
}

pub fn show_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
    let state = StateStore::open(config_path)?;
    println!(
        "database:   {}",
        StateStore::state_db_path(config_path).display()
    );
    let Some((run_id, plan_hash, status, updated_at)) = state.get_latest_chunk_run(export_name)?
    else {
        println!("No chunk checkpoint data for export '{}'.", export_name);
        return Ok(());
    };
    println!("export:     {}", export_name);
    println!("run_id:     {}", run_id);
    println!("plan_hash:  {}", plan_hash);
    println!("status:     {}", status);
    println!("updated_at: {}", updated_at);
    println!();
    println!(
        "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} FILE",
        "IDX", "STATUS", "START", "END", "ATT", "ROWS"
    );
    println!("{}", "-".repeat(90));
    for t in state.list_chunk_tasks_for_run(&run_id)? {
        let file = t.file_name.as_deref().unwrap_or("-");
        let rows = t
            .rows_written
            .map(|r| r.to_string())
            .unwrap_or_else(|| "-".into());
        println!(
            "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} {}",
            t.chunk_index, t.status, t.start_key, t.end_key, t.attempts, rows, file
        );
        if let Some(e) = &t.last_error {
            println!("       error: {}", e);
        }
    }
    Ok(())
}