#[cfg(feature = "jemalloc")]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
mod config;
mod destination;
mod enrich;
mod error;
mod format;
mod notify;
mod pipeline;
mod preflight;
mod quality;
mod resource;
mod source;
mod state;
mod tuning;
mod types;
use clap::{CommandFactory, Parser, Subcommand};
use clap_complete::Shell;
use error::Result;
#[derive(Parser)]
#[command(name = "rivet", version, about = "Export data from databases to files")]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Run {
#[arg(short, long)]
config: String,
#[arg(short, long)]
export: Option<String>,
#[arg(long)]
validate: bool,
#[arg(long)]
reconcile: bool,
#[arg(long)]
resume: bool,
#[arg(long)]
parallel_exports: bool,
#[arg(long)]
parallel_export_processes: bool,
#[arg(short, long = "param", value_name = "KEY=VALUE")]
params: Vec<String>,
},
Check {
#[arg(short, long)]
config: String,
#[arg(short, long)]
export: Option<String>,
#[arg(short, long = "param", value_name = "KEY=VALUE")]
params: Vec<String>,
},
Doctor {
#[arg(short, long)]
config: String,
},
State {
#[command(subcommand)]
action: StateAction,
},
Completions {
#[arg(value_enum)]
shell: Shell,
},
Metrics {
#[arg(short, long)]
config: String,
#[arg(short, long)]
export: Option<String>,
#[arg(short, long, default_value = "20")]
last: usize,
},
}
#[derive(Subcommand)]
enum StateAction {
Show {
#[arg(short, long)]
config: String,
},
Reset {
#[arg(short, long)]
config: String,
#[arg(short, long)]
export: String,
},
Files {
#[arg(short, long)]
config: String,
#[arg(short, long)]
export: Option<String>,
#[arg(short, long, default_value = "50")]
last: usize,
},
ResetChunks {
#[arg(short, long)]
config: String,
#[arg(short, long)]
export: String,
},
Chunks {
#[arg(short, long)]
config: String,
#[arg(short, long)]
export: String,
},
}
fn parse_params(raw: &[String]) -> std::collections::HashMap<String, String> {
raw.iter()
.filter_map(|s| s.split_once('='))
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
fn main() -> Result<()> {
env_logger::init();
let cli = Cli::parse();
match cli.command {
Commands::Run {
config,
export,
validate,
reconcile,
resume,
parallel_exports,
parallel_export_processes,
params,
} => {
let p = parse_params(¶ms);
let p = if p.is_empty() { None } else { Some(p) };
pipeline::run(
&config,
export.as_deref(),
validate,
reconcile,
resume,
p.as_ref(),
parallel_exports,
parallel_export_processes,
)?;
}
Commands::Check {
config,
export,
params,
} => {
let p = parse_params(¶ms);
let p = if p.is_empty() { None } else { Some(p) };
preflight::check(&config, export.as_deref(), p.as_ref())?;
}
Commands::Doctor { config } => {
preflight::doctor(&config)?;
}
Commands::Completions { shell } => {
clap_complete::generate(shell, &mut Cli::command(), "rivet", &mut std::io::stdout());
}
Commands::Metrics {
config,
export,
last,
} => {
pipeline::show_metrics(&config, export.as_deref(), last)?;
}
Commands::State { action } => match action {
StateAction::Show { config } => {
pipeline::show_state(&config)?;
}
StateAction::Reset { config, export } => {
pipeline::reset_state(&config, &export)?;
}
StateAction::Files {
config,
export,
last,
} => {
pipeline::show_files(&config, export.as_deref(), last)?;
}
StateAction::ResetChunks { config, export } => {
pipeline::reset_chunk_checkpoint(&config, &export)?;
}
StateAction::Chunks { config, export } => {
pipeline::show_chunk_checkpoint(&config, &export)?;
}
},
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_params_basic() {
let input = vec!["KEY=value".to_string()];
let result = parse_params(&input);
assert_eq!(result.get("KEY").unwrap(), "value");
}
#[test]
fn parse_params_equals_in_value() {
let input = vec!["FILTER=x=1 AND y=2".to_string()];
let result = parse_params(&input);
assert_eq!(result.get("FILTER").unwrap(), "x=1 AND y=2");
}
#[test]
fn parse_params_multiple() {
let input = vec!["A=1".to_string(), "B=2".to_string()];
let result = parse_params(&input);
assert_eq!(result.len(), 2);
assert_eq!(result.get("A").unwrap(), "1");
assert_eq!(result.get("B").unwrap(), "2");
}
#[test]
fn parse_params_empty_input() {
let result = parse_params(&[]);
assert!(result.is_empty());
}
#[test]
fn parse_params_no_equals_skipped() {
let input = vec!["NO_EQUALS_HERE".to_string()];
let result = parse_params(&input);
assert!(result.is_empty());
}
#[test]
fn parse_params_empty_value() {
let input = vec!["KEY=".to_string()];
let result = parse_params(&input);
assert_eq!(result.get("KEY").unwrap(), "");
}
#[test]
fn parse_params_duplicate_last_wins() {
let input = vec!["K=first".to_string(), "K=second".to_string()];
let result = parse_params(&input);
assert_eq!(result.get("K").unwrap(), "second");
}
}