1use std::collections::HashMap;
14use std::path::Path;
15
16use crate::config::Config;
17use crate::error::Result;
18use crate::plan::{
19 ComputedPlanData, ExportRecommendation, ExtractionStrategy, PlanArtifact, PlanDiagnostics,
20 PlanPrioritizationSnapshot, PrioritizationInputs, build_plan,
21 campaign::recommend_campaign,
22 inputs::{PrioritizationHints, build_prioritization_inputs},
23 recommend::recommend_export,
24 validate::{DiagnosticLevel, validate_plan},
25};
26use crate::state::StateStore;
27use crate::{preflight, source};
28
29use super::chunked::{chunk_plan_fingerprint, detect_and_generate_chunks};
30
31pub enum PlanOutputFormat {
33 Pretty,
35 Json(Option<String>),
37}
38
39pub fn run_plan_command(
44 config_path: &str,
45 export_name: Option<&str>,
46 params: Option<&HashMap<String, String>>,
47 format: PlanOutputFormat,
48) -> Result<()> {
49 let config = Config::load_with_params(config_path, params)?;
50 let config_dir = Path::new(config_path)
51 .parent()
52 .unwrap_or_else(|| Path::new("."));
53
54 let exports: Vec<_> = if let Some(name) = export_name {
55 let e = config
56 .exports
57 .iter()
58 .find(|e| e.name == name)
59 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
60 vec![e]
61 } else {
62 config.exports.iter().collect()
63 };
64
65 let state_path = config_dir.join(".rivet_state.db");
66 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
67
68 let mut built: Vec<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> = Vec::new();
69
70 for export in exports {
71 built.push(build_plan_artifact(
72 &config,
73 export,
74 config_path,
75 config_dir,
76 params,
77 &state,
78 )?);
79 }
80
81 let campaign_opt = if built.len() > 1 {
82 let pairs: Vec<_> = built
83 .iter()
84 .map(|(_, i, r)| (i.clone(), r.clone()))
85 .collect();
86 Some(recommend_campaign(pairs))
87 } else {
88 None
89 };
90
91 for (mut artifact, _inputs, standalone_rec) in built {
92 let snap = if let Some(ref camp) = campaign_opt {
93 let rec = camp
94 .ordered_exports
95 .iter()
96 .find(|e| e.export_name == artifact.export_name)
97 .cloned()
98 .unwrap_or_else(|| standalone_rec.clone());
99 PlanPrioritizationSnapshot {
100 export_recommendation: rec,
101 campaign: Some(camp.clone()),
102 }
103 } else {
104 PlanPrioritizationSnapshot {
105 export_recommendation: standalone_rec,
106 campaign: None,
107 }
108 };
109 artifact.prioritization = Some(snap);
110 emit_artifact(&artifact, &format)?;
111 }
112
113 Ok(())
114}
115
116fn build_plan_artifact(
117 config: &Config,
118 export: &crate::config::ExportConfig,
119 config_path: &str,
120 config_dir: &Path,
121 params: Option<&HashMap<String, String>>,
122 state: &StateStore,
123) -> Result<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> {
124 let plan = build_plan(config, export, config_dir, false, false, false, params)?;
125
126 let validate_diags = validate_plan(&plan);
128 let mut validate_warnings: Vec<String> = Vec::new();
129 for d in &validate_diags {
130 match d.level {
131 DiagnosticLevel::Rejected => {
132 anyhow::bail!("[{}] {}", d.rule, d.message);
133 }
134 DiagnosticLevel::Warning | DiagnosticLevel::Degraded => {
135 validate_warnings.push(format!("[{}] {}", d.rule, d.message));
136 }
137 }
138 }
139
140 let (computed, plan_diagnostics, hints) = match preflight::get_export_diagnostic(config, export)
141 {
142 Ok(diag) => {
143 let mut warnings = diag.warnings.clone();
144 warnings.extend(validate_warnings);
145 if warnings.is_empty() && !matches!(diag.verdict, preflight::HealthVerdict::Efficient) {
153 if let Some(s) = diag.suggestion.clone() {
154 warnings.push(s);
155 } else {
156 warnings.push(format!(
157 "verdict {} but preflight collected no specific warnings — review `rivet check` output for context",
158 diag.verdict
159 ));
160 }
161 }
162 let plan_diagnostics = PlanDiagnostics {
163 verdict: diag.verdict.to_string(),
164 warnings,
165 recommended_profile: diag.recommended_profile.to_string(),
166 };
167 let computed = compute_plan_data(&plan, diag.row_estimate, state)?;
168 let hints = PrioritizationHints {
169 incremental_uses_index: diag.uses_index,
170 cursor_range_observed: diag.cursor_min.is_some() && diag.cursor_max.is_some(),
171 };
172 (computed, plan_diagnostics, hints)
173 }
174 Err(e) => {
175 log::warn!(
176 "plan '{}': preflight diagnostics failed (continuing without them): {:#}",
177 export.name,
178 e
179 );
180 let computed = compute_plan_data(&plan, None, state)?;
181 let mut warnings = vec!["preflight diagnostics unavailable".into()];
182 warnings.extend(validate_warnings);
183 let plan_diagnostics = PlanDiagnostics {
184 verdict: "unknown (preflight failed)".into(),
185 warnings,
186 recommended_profile: "balanced".into(),
187 };
188 (computed, plan_diagnostics, PrioritizationHints::default())
189 }
190 };
191
192 let fingerprint = match &plan.strategy {
193 ExtractionStrategy::Chunked(cp) => chunk_plan_fingerprint(
194 &plan.base_query,
195 &cp.column,
196 cp.chunk_size,
197 cp.chunk_count,
198 cp.dense,
199 cp.by_days,
200 ),
201 _ => String::new(),
202 };
203
204 let history = match state.get_metrics(Some(&export.name), 20) {
206 Ok(metrics) => Some(crate::plan::HistorySnapshot::summarize(&metrics)),
207 Err(e) => {
208 log::warn!(
209 "plan '{}': history lookup failed; proceeding without historical refinement: {:#}",
210 export.name,
211 e
212 );
213 None
214 }
215 };
216
217 let inputs =
218 build_prioritization_inputs(export, &plan, &computed, &plan_diagnostics, hints, history);
219 let recommendation = recommend_export(&inputs, &plan_diagnostics);
220
221 let mut artifact = PlanArtifact::new(
222 plan.export_name.clone(),
223 plan.strategy.mode_label().to_string(),
224 fingerprint,
225 plan,
226 computed,
227 plan_diagnostics,
228 );
229
230 artifact.config_path = Some(
238 Path::new(config_path)
239 .canonicalize()
240 .map(|p| p.to_string_lossy().into_owned())
241 .unwrap_or_else(|_| config_path.to_string()),
242 );
243
244 Ok((artifact, inputs, recommendation))
245}
246
247fn compute_plan_data(
255 plan: &crate::plan::ResolvedRunPlan,
256 row_estimate: Option<i64>,
257 state: &StateStore,
258) -> Result<ComputedPlanData> {
259 match &plan.strategy {
260 ExtractionStrategy::Chunked(cp) => {
261 let mut src = source::create_source(&plan.source)?;
262 let chunk_ranges = detect_and_generate_chunks(
263 &mut *src,
264 &plan.base_query,
265 &cp.column,
266 cp.chunk_size,
267 cp.chunk_count,
268 &plan.export_name,
269 cp.dense,
270 cp.by_days,
271 plan.source.source_type,
272 )?;
273 let chunk_count = chunk_ranges.len();
274 let chunked_estimate = chunk_ranges
281 .first()
282 .zip(chunk_ranges.last())
283 .map(|(first, last)| (last.1 - first.0 + 1).max(0));
284 Ok(ComputedPlanData {
285 chunk_ranges,
286 chunk_count,
287 cursor_snapshot: None,
288 row_estimate: chunked_estimate.or(row_estimate),
289 })
290 }
291
292 ExtractionStrategy::Incremental(_) => {
293 let cursor_snapshot = state.get(&plan.export_name)?.last_cursor_value;
294 Ok(ComputedPlanData {
295 chunk_ranges: vec![],
296 chunk_count: 0,
297 cursor_snapshot,
298 row_estimate,
299 })
300 }
301
302 ExtractionStrategy::Snapshot | ExtractionStrategy::TimeWindow { .. } => {
303 Ok(ComputedPlanData {
304 chunk_ranges: vec![],
305 chunk_count: 0,
306 cursor_snapshot: None,
307 row_estimate,
308 })
309 }
310 }
311}
312
313fn emit_artifact(artifact: &PlanArtifact, format: &PlanOutputFormat) -> Result<()> {
314 match format {
315 PlanOutputFormat::Pretty => {
316 artifact.print_summary();
317 }
318 PlanOutputFormat::Json(None) => {
319 println!("{}", artifact.to_json_pretty()?);
320 }
321 PlanOutputFormat::Json(Some(path)) => {
322 let json = artifact.to_json_pretty()?;
323 std::fs::write(path, &json)
324 .map_err(|e| anyhow::anyhow!("cannot write plan file '{}': {}", path, e))?;
325 println!("Plan written to: {}", path);
326 }
327 }
328 Ok(())
329}