use clap::CommandFactory;
use super::args::{
Cli, Commands, PlanFormat, ReconcileFormat, SchemaKind, StateAction, ValidateFormat,
};
use super::params::{parse_params, resolve_init_source};
use super::validate::validate_cli;
use crate::config::Config;
use crate::error::Result;
use crate::{init, pipeline, preflight};
fn check_export_selection(config: &Config, export: Option<&str>) -> Result<()> {
let Some(name) = export else { return Ok(()) };
if config.exports.iter().any(|e| e.name == name) {
return Ok(());
}
let mut known: Vec<&str> = config.exports.iter().map(|e| e.name.as_str()).collect();
known.sort_unstable();
anyhow::bail!(
"export '{}' not found in config.\n Known exports: {}\n Hint: check the spelling against the names above.",
name,
if known.is_empty() {
"(none defined)".to_string()
} else {
known.join(", ")
},
);
}
pub fn dispatch(cli: Cli) -> Result<()> {
validate_cli(&cli.command)?;
match cli.command {
Commands::Run {
config,
export,
validate,
reconcile,
resume,
force,
parallel_exports,
parallel_export_processes,
summary_output,
json,
params,
} => dispatch_run(
config,
export,
validate,
reconcile,
resume,
force,
parallel_exports,
parallel_export_processes,
summary_output,
json,
params,
),
Commands::Check {
config,
export,
params,
type_report,
strict,
json,
target,
} => dispatch_check(config, export, params, type_report, strict, json, target),
Commands::Doctor { config } => preflight::doctor(&config),
Commands::Init {
source,
source_env,
source_file,
table,
schema,
include,
exclude,
output,
discover,
gcs_bucket,
gcs_credentials_file,
s3_bucket,
s3_region,
} => dispatch_init(
source,
source_env,
source_file,
table,
schema,
include,
exclude,
output,
discover,
gcs_bucket,
gcs_credentials_file,
s3_bucket,
s3_region,
),
Commands::Plan {
config,
export,
params,
output,
format,
} => dispatch_plan(config, export, params, output, format),
Commands::Apply { plan_file, force } => pipeline::run_apply_command(&plan_file, force),
Commands::Validate {
config,
export,
format,
output,
date,
run_id,
prefix,
} => dispatch_validate(config, export, format, output, date, run_id, prefix),
Commands::Reconcile {
config,
export,
format,
output,
params,
} => dispatch_reconcile(config, export, format, output, params),
Commands::Repair {
config,
export,
report,
execute,
format,
output,
params,
} => dispatch_repair(config, export, report, execute, format, output, params),
Commands::Completions { shell } => {
clap_complete::generate(shell, &mut Cli::command(), "rivet", &mut std::io::stdout());
Ok(())
}
Commands::Metrics {
config,
export,
last,
} => pipeline::show_metrics(&config, export.as_deref(), last),
Commands::Journal {
config,
export,
last,
run_id,
} => pipeline::show_journal(&config, &export, last, run_id.as_deref()),
Commands::Schema { what } => dispatch_schema(what),
Commands::State { action } => dispatch_state(action),
}
}
fn dispatch_schema(what: SchemaKind) -> Result<()> {
match what {
SchemaKind::Config => {
let schema = crate::config::generate_config_schema_pretty()?;
print!("{schema}");
Ok(())
}
}
}
#[allow(clippy::too_many_arguments)]
fn dispatch_run(
config: String,
export: Option<String>,
validate: bool,
reconcile: bool,
resume: bool,
force: bool,
parallel_exports: bool,
parallel_export_processes: bool,
summary_output: Option<String>,
json: bool,
params: Vec<String>,
) -> Result<()> {
let p = parse_params(¶ms);
let p = if p.is_empty() { None } else { Some(p) };
if let Some(name) = export.as_deref() {
check_export_selection(&Config::load_with_params(&config, p.as_ref())?, Some(name))?;
}
let summary_output_path = summary_output.as_ref().map(std::path::PathBuf::from);
pipeline::run(
&config,
export.as_deref(),
validate,
reconcile,
resume,
force,
p.as_ref(),
parallel_exports,
parallel_export_processes,
summary_output_path.as_deref(),
json,
)
}
fn dispatch_check(
config: String,
export: Option<String>,
params: Vec<String>,
type_report: bool,
strict: bool,
json: bool,
target: Option<String>,
) -> Result<()> {
let p = parse_params(¶ms);
let p = if p.is_empty() { None } else { Some(p) };
let tgt = match target.as_deref() {
Some(s) => Some(crate::types::target::ExportTarget::parse(s).ok_or_else(|| {
anyhow::anyhow!("unknown target '{s}' (expected: bigquery, duckdb, snowflake)")
})?),
None => None,
};
if let Some(name) = export.as_deref() {
check_export_selection(&Config::load_with_params(&config, p.as_ref())?, Some(name))?;
}
preflight::check(
&config,
export.as_deref(),
p.as_ref(),
type_report || json || strict || tgt.is_some(),
strict,
json,
tgt,
)?;
check_plan_compatibility(&config, export.as_deref(), p.as_ref(), json)
}
fn check_plan_compatibility(
config_path: &str,
export_name: Option<&str>,
params: Option<&std::collections::HashMap<String, String>>,
json_output: bool,
) -> Result<()> {
if json_output {
return Ok(());
}
let config = Config::load_with_params(config_path, params)?;
let config_dir = std::path::Path::new(config_path)
.parent()
.unwrap_or_else(|| std::path::Path::new("."));
let selected: Vec<&crate::config::ExportConfig> = match export_name {
Some(name) => config.exports.iter().filter(|e| e.name == name).collect(),
None => config.exports.iter().collect(),
};
let mut rejected: Option<String> = None;
for export in selected {
let plan =
match crate::plan::build_plan(&config, export, config_dir, false, false, false, params)
{
Ok(plan) => plan,
Err(e) => {
log::warn!(
"check '{}': plan-compatibility check skipped (plan did not build): {:#}",
export.name,
e
);
continue;
}
};
for d in crate::plan::validate_plan(&plan) {
let line = format!("[{}] {}", d.rule, d.message);
match d.level {
crate::plan::DiagnosticLevel::Rejected => {
println!("Rejected: {line}");
rejected.get_or_insert(line);
}
crate::plan::DiagnosticLevel::Warning => println!("Warning: {line}"),
crate::plan::DiagnosticLevel::Degraded => println!("Degraded: {line}"),
}
}
}
if let Some(line) = rejected {
anyhow::bail!("{line}");
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn dispatch_init(
source: Option<String>,
source_env: Option<String>,
source_file: Option<String>,
table: Option<String>,
schema: Option<String>,
include: Vec<String>,
exclude: Vec<String>,
output: Option<String>,
discover: bool,
gcs_bucket: Option<String>,
gcs_credentials_file: Option<String>,
s3_bucket: Option<String>,
s3_region: Option<String>,
) -> Result<()> {
let fmt = if discover {
init::InitFormat::DiscoveryJson
} else {
init::InitFormat::Yaml
};
let (source_url, provenance) = resolve_init_source(source, source_env, source_file)?;
let yaml_dest = init::InitYamlDestination {
gcs_bucket,
gcs_credentials_file,
s3_bucket,
s3_region,
};
let filter = init::TableFilter { include, exclude };
init::init(
&source_url,
&provenance,
table.as_deref(),
schema.as_deref(),
output.as_deref(),
fmt,
yaml_dest,
&filter,
)
}
fn dispatch_plan(
config: String,
export: Option<String>,
params: Vec<String>,
output: Option<String>,
format: PlanFormat,
) -> Result<()> {
let p = parse_params(¶ms);
let p = if p.is_empty() { None } else { Some(p) };
if let Some(name) = export.as_deref() {
check_export_selection(&Config::load_with_params(&config, p.as_ref())?, Some(name))?;
}
let fmt = match format {
PlanFormat::Pretty => pipeline::PlanOutputFormat::Pretty,
PlanFormat::Json => pipeline::PlanOutputFormat::Json(output),
};
pipeline::run_plan_command(&config, export.as_deref(), p.as_ref(), fmt)
}
fn dispatch_validate(
config: String,
export: Option<String>,
format: ValidateFormat,
output: Option<String>,
date: Option<String>,
run_id: Option<String>,
prefix: Option<String>,
) -> Result<()> {
if let Some(name) = export.as_deref() {
check_export_selection(&Config::load(&config)?, Some(name))?;
}
let fmt = match format {
ValidateFormat::Pretty => pipeline::ValidateOutputFormat::Pretty,
ValidateFormat::Json => pipeline::ValidateOutputFormat::Json(output),
};
let parsed_date = match date {
Some(s) => Some(
chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").map_err(|e| {
anyhow::anyhow!("invalid --date '{}': expected YYYY-MM-DD ({})", s, e)
})?,
),
None => None,
};
let target = pipeline::ValidateTarget {
date: parsed_date,
run_id,
prefix_override: prefix,
};
pipeline::run_validate_command(&config, export.as_deref(), fmt, target)
}
fn dispatch_reconcile(
config: String,
export: String,
format: ReconcileFormat,
output: Option<String>,
params: Vec<String>,
) -> Result<()> {
let p = parse_params(¶ms);
let p = if p.is_empty() { None } else { Some(p) };
check_export_selection(
&Config::load_with_params(&config, p.as_ref())?,
Some(&export),
)?;
let fmt = match format {
ReconcileFormat::Pretty => pipeline::ReconcileOutputFormat::Pretty,
ReconcileFormat::Json => pipeline::ReconcileOutputFormat::Json(output),
};
pipeline::run_reconcile_command(&config, &export, p.as_ref(), fmt)
}
fn dispatch_repair(
config: String,
export: String,
report: Option<String>,
execute: bool,
format: ReconcileFormat,
output: Option<String>,
params: Vec<String>,
) -> Result<()> {
let p = parse_params(¶ms);
let p = if p.is_empty() { None } else { Some(p) };
check_export_selection(
&Config::load_with_params(&config, p.as_ref())?,
Some(&export),
)?;
let source = match report {
Some(path) => pipeline::RepairReportSource::File(path),
None => pipeline::RepairReportSource::Auto,
};
let fmt = match format {
ReconcileFormat::Pretty => pipeline::RepairOutputFormat::Pretty,
ReconcileFormat::Json => pipeline::RepairOutputFormat::Json(output),
};
pipeline::run_repair_command(&config, &export, p.as_ref(), source, execute, fmt)
}
fn dispatch_state(action: StateAction) -> Result<()> {
match action {
StateAction::Show { config } => pipeline::show_state(&config),
StateAction::Reset { config, export } => pipeline::reset_state(&config, &export),
StateAction::Files {
config,
export,
last,
} => pipeline::show_files(&config, export.as_deref(), last),
StateAction::ResetChunks {
config,
export,
stuck_checkpoints,
} => {
if stuck_checkpoints {
pipeline::reset_chunk_checkpoints_stuck(&config)
} else if let Some(name) = export {
pipeline::reset_chunk_checkpoint(&config, &name)
} else {
Ok(())
}
}
StateAction::Chunks { config, export } => pipeline::show_chunk_checkpoint(&config, &export),
StateAction::Progression { config, export } => {
pipeline::show_progression(&config, export.as_deref())
}
}
}