use std::collections::HashMap;
use std::path::Path;
use crate::config::Config;
use crate::error::Result;
use crate::plan::{
ComputedPlanData, ExportRecommendation, ExtractionStrategy, PlanArtifact, PlanDiagnostics,
PlanPrioritizationSnapshot, PrioritizationInputs, build_plan,
campaign::recommend_campaign,
inputs::{PrioritizationHints, build_prioritization_inputs},
recommend::recommend_export,
validate::{DiagnosticLevel, validate_plan},
};
use crate::state::StateStore;
use crate::{preflight, source};
use super::chunked::{chunk_plan_fingerprint, detect_and_generate_chunks};
pub enum PlanOutputFormat {
Pretty,
Json(Option<String>),
}
pub fn run_plan_command(
config_path: &str,
export_name: Option<&str>,
params: Option<&HashMap<String, String>>,
format: PlanOutputFormat,
) -> Result<()> {
let config = Config::load_with_params(config_path, params)?;
let config_dir = Path::new(config_path)
.parent()
.unwrap_or_else(|| Path::new("."));
let exports: Vec<_> = if let Some(name) = export_name {
let e = config
.exports
.iter()
.find(|e| e.name == name)
.ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
vec![e]
} else {
config.exports.iter().collect()
};
let state_path = config_dir.join(".rivet_state.db");
let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
let mut built: Vec<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> = Vec::new();
for export in exports {
built.push(build_plan_artifact(
&config,
export,
config_path,
config_dir,
params,
&state,
)?);
}
let campaign_opt = if built.len() > 1 {
let pairs: Vec<_> = built
.iter()
.map(|(_, i, r)| (i.clone(), r.clone()))
.collect();
Some(recommend_campaign(pairs))
} else {
None
};
let multi_export = built.len() > 1;
let mut artifacts: Vec<PlanArtifact> = Vec::with_capacity(built.len());
for (mut artifact, _inputs, standalone_rec) in built {
let snap = if let Some(ref camp) = campaign_opt {
let rec = camp
.ordered_exports
.iter()
.find(|e| e.export_name == artifact.export_name)
.cloned()
.unwrap_or_else(|| standalone_rec.clone());
PlanPrioritizationSnapshot {
export_recommendation: rec,
campaign: Some(camp.clone()),
}
} else {
PlanPrioritizationSnapshot {
export_recommendation: standalone_rec,
campaign: None,
}
};
artifact.prioritization = Some(snap);
artifacts.push(artifact);
}
let mut fields: ExportFields = HashMap::new();
for a in &artifacts {
if let Some(p) = a.prioritization.as_ref() {
let rec = &p.export_recommendation;
let parallel_safe =
rec.cost_class == crate::plan::CostClass::Low && !rec.isolate_on_source;
fields.insert(
a.export_name.clone(),
vec![
("wave", rec.recommended_wave.to_string()),
("parallel_safe", parallel_safe.to_string()),
],
);
}
}
if !fields.is_empty() {
write_plan_fields_to_config(config_path, &fields)?;
log::info!(
"plan: recorded wave + parallel-safety for {} export(s) in {}",
fields.len(),
config_path
);
}
emit_artifacts(&artifacts, &format, multi_export, config_path)?;
Ok(())
}
fn per_export_output_path(base: &str, export_name: &str) -> String {
let sanitized: String = export_name
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect();
let path = Path::new(base);
let parent = path.parent();
let stem = path.file_stem().and_then(|s| s.to_str());
let ext = path.extension().and_then(|s| s.to_str());
let file_name = match (stem, ext) {
(Some(stem), Some(ext)) => format!("{stem}.{sanitized}.{ext}"),
(Some(stem), None) => format!("{stem}.{sanitized}"),
(None, _) => format!("{base}.{sanitized}"),
};
match parent {
Some(dir) if !dir.as_os_str().is_empty() => {
dir.join(file_name).to_string_lossy().into_owned()
}
_ => file_name,
}
}
fn build_plan_artifact(
config: &Config,
export: &crate::config::ExportConfig,
config_path: &str,
config_dir: &Path,
params: Option<&HashMap<String, String>>,
state: &StateStore,
) -> Result<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> {
let plan = build_plan(config, export, config_dir, false, false, false, params)?;
let validate_diags = validate_plan(&plan);
let mut validate_warnings: Vec<String> = Vec::new();
for d in &validate_diags {
match d.level {
DiagnosticLevel::Rejected => {
anyhow::bail!("[{}] {}", d.rule, d.message);
}
DiagnosticLevel::Warning | DiagnosticLevel::Degraded => {
validate_warnings.push(format!("[{}] {}", d.rule, d.message));
}
}
}
let (computed, plan_diagnostics, hints) = match preflight::get_export_diagnostic(config, export)
{
Ok(diag) => {
let mut warnings = diag.warnings.clone();
warnings.extend(validate_warnings);
if warnings.is_empty() && !matches!(diag.verdict, preflight::HealthVerdict::Efficient) {
if let Some(s) = diag.suggestion.clone() {
warnings.push(s);
} else {
warnings.push(format!(
"verdict {} but preflight collected no specific warnings — review `rivet check` output for context",
diag.verdict
));
}
}
let strategy_rationale = crate::plan::explain_strategy(&diag, export);
let plan_diagnostics = PlanDiagnostics {
verdict: diag.verdict.to_string(),
warnings,
recommended_profile: diag.recommended_profile.to_string(),
strategy_rationale,
};
let computed = compute_plan_data(&plan, diag.row_estimate, state)?;
let hints = PrioritizationHints {
incremental_uses_index: diag.uses_index,
cursor_range_observed: diag.cursor_min.is_some() && diag.cursor_max.is_some(),
};
(computed, plan_diagnostics, hints)
}
Err(e) => {
log::warn!(
"plan '{}': preflight diagnostics failed (continuing without them): {:#}",
export.name,
e
);
let computed = compute_plan_data(&plan, None, state)?;
let mut warnings = vec!["preflight diagnostics unavailable".into()];
warnings.extend(validate_warnings);
let plan_diagnostics = PlanDiagnostics {
verdict: "unknown (preflight failed)".into(),
warnings,
recommended_profile: "balanced".into(),
strategy_rationale: "Strategy rationale unavailable — preflight diagnostics could \
not be collected for this export."
.into(),
};
(computed, plan_diagnostics, PrioritizationHints::default())
}
};
let fingerprint = match &plan.strategy {
ExtractionStrategy::Chunked(cp) => chunk_plan_fingerprint(
&plan.base_query,
&cp.column,
cp.chunk_size,
cp.chunk_count,
cp.dense,
cp.by_days,
),
_ => String::new(),
};
let history = match state.get_metrics(Some(&export.name), 20) {
Ok(metrics) => Some(crate::plan::HistorySnapshot::summarize(&metrics)),
Err(e) => {
log::warn!(
"plan '{}': history lookup failed; proceeding without historical refinement: {:#}",
export.name,
e
);
None
}
};
let inputs =
build_prioritization_inputs(export, &plan, &computed, &plan_diagnostics, hints, history);
let recommendation = recommend_export(&inputs, &plan_diagnostics);
let mut artifact = PlanArtifact::new(
plan.export_name.clone(),
plan.strategy.mode_label().to_string(),
fingerprint,
plan,
computed,
plan_diagnostics,
);
artifact.config_path = Some(
Path::new(config_path)
.canonicalize()
.map(|p| p.to_string_lossy().into_owned())
.unwrap_or_else(|_| config_path.to_string()),
);
Ok((artifact, inputs, recommendation))
}
fn compute_plan_data(
plan: &crate::plan::ResolvedRunPlan,
row_estimate: Option<i64>,
state: &StateStore,
) -> Result<ComputedPlanData> {
match &plan.strategy {
ExtractionStrategy::Chunked(cp) => {
let mut src = source::create_source(&plan.source)?;
let chunk_ranges = detect_and_generate_chunks(
&mut *src,
&plan.base_query,
&cp.column,
cp.chunk_size,
cp.chunk_count,
&plan.export_name,
cp.dense,
cp.by_days,
plan.source.source_type,
)?;
let chunk_count = chunk_ranges.len();
let chunked_estimate = chunk_ranges
.first()
.zip(chunk_ranges.last())
.map(|(first, last)| (last.1 - first.0 + 1).max(0));
Ok(ComputedPlanData {
chunk_ranges,
chunk_count,
cursor_snapshot: None,
row_estimate: chunked_estimate.or(row_estimate),
})
}
ExtractionStrategy::Incremental(_) => {
let cursor_snapshot = state.get(&plan.export_name)?.last_cursor_value;
Ok(ComputedPlanData {
chunk_ranges: vec![],
chunk_count: 0,
cursor_snapshot,
row_estimate,
})
}
ExtractionStrategy::Snapshot
| ExtractionStrategy::TimeWindow { .. }
| ExtractionStrategy::Keyset(_) => Ok(ComputedPlanData {
chunk_ranges: vec![],
chunk_count: 0,
cursor_snapshot: None,
row_estimate,
}),
}
}
fn emit_artifacts(
artifacts: &[PlanArtifact],
format: &PlanOutputFormat,
multi_export: bool,
config_path: &str,
) -> Result<()> {
match format {
PlanOutputFormat::Pretty => {
if multi_export {
print_compact_summary(artifacts, config_path);
} else {
for artifact in artifacts {
artifact.print_summary();
}
}
}
PlanOutputFormat::Json(None) => {
if artifacts.len() > 1 {
println!("{}", serde_json::to_string_pretty(artifacts)?);
} else if let Some(artifact) = artifacts.first() {
println!("{}", artifact.to_json_pretty()?);
}
}
PlanOutputFormat::Json(Some(path)) => {
for artifact in artifacts {
let json = artifact.to_json_pretty()?;
let out_path = if multi_export {
per_export_output_path(path, &artifact.export_name)
} else {
path.clone()
};
std::fs::write(&out_path, &json)
.map_err(|e| anyhow::anyhow!("cannot write plan file '{}': {}", out_path, e))?;
println!("Plan written to: {}", out_path);
}
}
}
Ok(())
}
fn print_compact_summary(artifacts: &[PlanArtifact], config_path: &str) {
let key = |a: &PlanArtifact| {
a.prioritization
.as_ref()
.map(|p| {
(
p.export_recommendation.recommended_wave,
p.export_recommendation.priority_score,
)
})
.unwrap_or((u32::MAX, 0))
};
let mut order: Vec<&PlanArtifact> = artifacts.iter().collect();
order.sort_by(|a, b| {
let (wa, sa) = key(a);
let (wb, sb) = key(b);
wa.cmp(&wb)
.then(sb.cmp(&sa))
.then(a.export_name.cmp(&b.export_name))
});
let name_w = order
.iter()
.map(|a| a.export_name.chars().count())
.max()
.unwrap_or(6)
.clamp(6, 32);
println!();
println!(
" Plan: {} exports — `rivet apply {}` runs them by wave (lowest first)",
artifacts.len(),
config_path
);
println!();
println!("{}", PlanArtifact::summary_header(name_w));
for a in &order {
println!("{}", a.summary_line(name_w));
}
println!();
println!(" Full detail for one export: rivet plan -c <config> --export <name>");
}
type ExportFields = HashMap<String, Vec<(&'static str, String)>>;
fn write_plan_fields_to_config(config_path: &str, fields: &ExportFields) -> Result<()> {
if fields.is_empty() {
return Ok(());
}
let original = std::fs::read_to_string(config_path).map_err(|e| {
anyhow::anyhow!(
"cannot read config '{}' to record plan fields: {}",
config_path,
e
)
})?;
let updated = apply_field_annotations(&original, fields);
if updated != original {
std::fs::write(config_path, updated).map_err(|e| {
anyhow::anyhow!(
"cannot write plan fields to config '{}': {}",
config_path,
e
)
})?;
}
Ok(())
}
fn apply_field_annotations(yaml: &str, fields: &ExportFields) -> String {
let mut out = String::with_capacity(yaml.len() + 64);
let mut current: Option<(String, usize, usize)> = None;
for line in yaml.split_inclusive('\n') {
let content = line.strip_suffix('\n').unwrap_or(line);
if let Some((dash_indent, name)) = parse_export_name(content) {
out.push_str(line);
let field_indent = dash_indent + 2;
if let Some(items) = fields.get(&name) {
for (key, value) in items {
out.push_str(&" ".repeat(field_indent));
out.push_str(&format!("{key}: {value}\n"));
}
}
current = Some((name, dash_indent, field_indent));
continue;
}
if let Some((name, dash_indent, field_indent)) = current.as_ref() {
let trimmed = content.trim_start();
let indent = content.len() - trimmed.len();
let blank_or_comment = trimmed.is_empty() || trimmed.starts_with('#');
if !blank_or_comment && indent <= *dash_indent {
current = None; } else if indent == *field_indent
&& let Some(items) = fields.get(name)
&& items
.iter()
.any(|(key, _)| trimmed.starts_with(&format!("{key}:")))
{
continue; }
}
out.push_str(line);
}
out
}
fn parse_export_name(line: &str) -> Option<(usize, String)> {
let trimmed = line.trim_start();
let dash_indent = line.len() - trimmed.len();
let rest = trimmed.strip_prefix("- ")?;
let value = rest.trim_start().strip_prefix("name:")?;
let value = value.split(" #").next().unwrap_or(value);
let name = value
.trim()
.trim_matches(|c: char| c == '"' || c == '\'')
.to_string();
(!name.is_empty()).then_some((dash_indent, name))
}
#[cfg(test)]
mod tests {
use super::per_export_output_path;
use std::path::Path;
use super::{ExportFields, apply_field_annotations};
fn wave_fields(pairs: &[(&str, u32)]) -> ExportFields {
pairs
.iter()
.map(|(n, w)| (n.to_string(), vec![("wave", w.to_string())]))
.collect()
}
#[test]
fn wave_annotations_insert_replace_and_preserve() {
let yaml = "exports:\n - name: orders # the orders table\n mode: incremental\n wave: 9\n cursor_column: updated_at\n - name: events\n mode: full\ndestination:\n type: local\n";
let out = apply_field_annotations(yaml, &wave_fields(&[("orders", 2), ("events", 1)]));
assert!(
out.contains("- name: orders # the orders table\n wave: 2\n"),
"{out}"
);
assert!(
!out.contains("wave: 9"),
"stale wave must be dropped:\n{out}"
);
assert!(out.contains("- name: events\n wave: 1\n"), "{out}");
assert!(out.contains("cursor_column: updated_at"));
assert!(out.contains("destination:\n type: local"));
}
#[test]
fn wave_annotations_leave_unlisted_export_untouched() {
let yaml = "exports:\n - name: orders\n mode: full\n";
let out = apply_field_annotations(yaml, &wave_fields(&[("other", 1)]));
assert_eq!(out, yaml);
}
#[test]
fn per_export_paths_are_distinct() {
let a = per_export_output_path("/tmp/plan.json", "orders");
let b = per_export_output_path("/tmp/plan.json", "users");
assert_ne!(a, b, "distinct exports must not collide on one path");
assert_eq!(a, "/tmp/plan.orders.json");
assert_eq!(b, "/tmp/plan.users.json");
}
#[test]
fn per_export_path_keeps_json_extension() {
let p = per_export_output_path("/tmp/plan.json", "orders");
assert_eq!(
Path::new(&p).extension().and_then(|e| e.to_str()),
Some("json"),
"per-export file must keep the .json extension"
);
}
#[test]
fn per_export_path_without_extension_appends_name() {
let p = per_export_output_path("/tmp/plan", "orders");
assert_eq!(p, "/tmp/plan.orders");
}
#[test]
fn per_export_path_sanitizes_unsafe_export_name() {
let p = per_export_output_path("/tmp/plan.json", "../../etc/passwd");
assert!(
!p.contains(".."),
"sanitized path must not contain a `..` traversal: {p}"
);
let file = Path::new(&p)
.file_name()
.and_then(|f| f.to_str())
.expect("derived path has a file name");
assert!(
!file.contains('/') && !file.contains('\\'),
"sanitized file name must not contain a path separator: {file}"
);
assert_eq!(
Path::new(&p).parent().and_then(|d| d.to_str()),
Some("/tmp"),
"derived path must stay in the requested directory"
);
assert_eq!(
Path::new(&p).extension().and_then(|e| e.to_str()),
Some("json"),
"extension must survive sanitization"
);
}
}