1use std::collections::HashMap;
14use std::path::Path;
15
16use crate::config::Config;
17use crate::error::Result;
18use crate::plan::{
19 ComputedPlanData, ExportRecommendation, ExtractionStrategy, PlanArtifact, PlanDiagnostics,
20 PlanPrioritizationSnapshot, PrioritizationInputs, build_plan,
21 campaign::recommend_campaign,
22 inputs::{PrioritizationHints, build_prioritization_inputs},
23 recommend::recommend_export,
24 validate::{DiagnosticLevel, validate_plan},
25};
26use crate::state::StateStore;
27use crate::{preflight, source};
28
29use super::chunked::{chunk_plan_fingerprint, detect_and_generate_chunks};
30
31pub enum PlanOutputFormat {
33 Pretty,
35 Json(Option<String>),
37}
38
39pub fn run_plan_command(
44 config_path: &str,
45 export_name: Option<&str>,
46 params: Option<&HashMap<String, String>>,
47 format: PlanOutputFormat,
48) -> Result<()> {
49 let config = Config::load_with_params(config_path, params)?;
50 let config_dir = Path::new(config_path)
51 .parent()
52 .unwrap_or_else(|| Path::new("."));
53
54 let exports: Vec<_> = if let Some(name) = export_name {
55 let e = config
56 .exports
57 .iter()
58 .find(|e| e.name == name)
59 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
60 vec![e]
61 } else {
62 config.exports.iter().collect()
63 };
64
65 let state_path = config_dir.join(".rivet_state.db");
66 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
67
68 let mut built: Vec<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> = Vec::new();
69
70 for export in exports {
71 built.push(build_plan_artifact(
72 &config,
73 export,
74 config_path,
75 config_dir,
76 params,
77 &state,
78 )?);
79 }
80
81 let campaign_opt = if built.len() > 1 {
82 let pairs: Vec<_> = built
83 .iter()
84 .map(|(_, i, r)| (i.clone(), r.clone()))
85 .collect();
86 Some(recommend_campaign(pairs))
87 } else {
88 None
89 };
90
91 let multi_export = built.len() > 1;
98
99 let mut artifacts: Vec<PlanArtifact> = Vec::with_capacity(built.len());
100 for (mut artifact, _inputs, standalone_rec) in built {
101 let snap = if let Some(ref camp) = campaign_opt {
102 let rec = camp
103 .ordered_exports
104 .iter()
105 .find(|e| e.export_name == artifact.export_name)
106 .cloned()
107 .unwrap_or_else(|| standalone_rec.clone());
108 PlanPrioritizationSnapshot {
109 export_recommendation: rec,
110 campaign: Some(camp.clone()),
111 }
112 } else {
113 PlanPrioritizationSnapshot {
114 export_recommendation: standalone_rec,
115 campaign: None,
116 }
117 };
118 artifact.prioritization = Some(snap);
119 artifacts.push(artifact);
120 }
121
122 emit_artifacts(&artifacts, &format, multi_export)?;
123
124 Ok(())
125}
126
127fn per_export_output_path(base: &str, export_name: &str) -> String {
138 let sanitized: String = export_name
139 .chars()
140 .map(|c| {
141 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
142 c
143 } else {
144 '_'
145 }
146 })
147 .collect();
148
149 let path = Path::new(base);
150 let parent = path.parent();
151 let stem = path.file_stem().and_then(|s| s.to_str());
152 let ext = path.extension().and_then(|s| s.to_str());
153
154 let file_name = match (stem, ext) {
155 (Some(stem), Some(ext)) => format!("{stem}.{sanitized}.{ext}"),
156 (Some(stem), None) => format!("{stem}.{sanitized}"),
157 (None, _) => format!("{base}.{sanitized}"),
161 };
162
163 match parent {
164 Some(dir) if !dir.as_os_str().is_empty() => {
165 dir.join(file_name).to_string_lossy().into_owned()
166 }
167 _ => file_name,
168 }
169}
170
171fn build_plan_artifact(
172 config: &Config,
173 export: &crate::config::ExportConfig,
174 config_path: &str,
175 config_dir: &Path,
176 params: Option<&HashMap<String, String>>,
177 state: &StateStore,
178) -> Result<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> {
179 let plan = build_plan(config, export, config_dir, false, false, false, params)?;
180
181 let validate_diags = validate_plan(&plan);
183 let mut validate_warnings: Vec<String> = Vec::new();
184 for d in &validate_diags {
185 match d.level {
186 DiagnosticLevel::Rejected => {
187 anyhow::bail!("[{}] {}", d.rule, d.message);
188 }
189 DiagnosticLevel::Warning | DiagnosticLevel::Degraded => {
190 validate_warnings.push(format!("[{}] {}", d.rule, d.message));
191 }
192 }
193 }
194
195 let (computed, plan_diagnostics, hints) = match preflight::get_export_diagnostic(config, export)
196 {
197 Ok(diag) => {
198 let mut warnings = diag.warnings.clone();
199 warnings.extend(validate_warnings);
200 if warnings.is_empty() && !matches!(diag.verdict, preflight::HealthVerdict::Efficient) {
208 if let Some(s) = diag.suggestion.clone() {
209 warnings.push(s);
210 } else {
211 warnings.push(format!(
212 "verdict {} but preflight collected no specific warnings — review `rivet check` output for context",
213 diag.verdict
214 ));
215 }
216 }
217 let plan_diagnostics = PlanDiagnostics {
218 verdict: diag.verdict.to_string(),
219 warnings,
220 recommended_profile: diag.recommended_profile.to_string(),
221 };
222 let computed = compute_plan_data(&plan, diag.row_estimate, state)?;
223 let hints = PrioritizationHints {
224 incremental_uses_index: diag.uses_index,
225 cursor_range_observed: diag.cursor_min.is_some() && diag.cursor_max.is_some(),
226 };
227 (computed, plan_diagnostics, hints)
228 }
229 Err(e) => {
230 log::warn!(
231 "plan '{}': preflight diagnostics failed (continuing without them): {:#}",
232 export.name,
233 e
234 );
235 let computed = compute_plan_data(&plan, None, state)?;
236 let mut warnings = vec!["preflight diagnostics unavailable".into()];
237 warnings.extend(validate_warnings);
238 let plan_diagnostics = PlanDiagnostics {
239 verdict: "unknown (preflight failed)".into(),
240 warnings,
241 recommended_profile: "balanced".into(),
242 };
243 (computed, plan_diagnostics, PrioritizationHints::default())
244 }
245 };
246
247 let fingerprint = match &plan.strategy {
248 ExtractionStrategy::Chunked(cp) => chunk_plan_fingerprint(
249 &plan.base_query,
250 &cp.column,
251 cp.chunk_size,
252 cp.chunk_count,
253 cp.dense,
254 cp.by_days,
255 ),
256 _ => String::new(),
257 };
258
259 let history = match state.get_metrics(Some(&export.name), 20) {
261 Ok(metrics) => Some(crate::plan::HistorySnapshot::summarize(&metrics)),
262 Err(e) => {
263 log::warn!(
264 "plan '{}': history lookup failed; proceeding without historical refinement: {:#}",
265 export.name,
266 e
267 );
268 None
269 }
270 };
271
272 let inputs =
273 build_prioritization_inputs(export, &plan, &computed, &plan_diagnostics, hints, history);
274 let recommendation = recommend_export(&inputs, &plan_diagnostics);
275
276 let mut artifact = PlanArtifact::new(
277 plan.export_name.clone(),
278 plan.strategy.mode_label().to_string(),
279 fingerprint,
280 plan,
281 computed,
282 plan_diagnostics,
283 );
284
285 artifact.config_path = Some(
293 Path::new(config_path)
294 .canonicalize()
295 .map(|p| p.to_string_lossy().into_owned())
296 .unwrap_or_else(|_| config_path.to_string()),
297 );
298
299 Ok((artifact, inputs, recommendation))
300}
301
302fn compute_plan_data(
310 plan: &crate::plan::ResolvedRunPlan,
311 row_estimate: Option<i64>,
312 state: &StateStore,
313) -> Result<ComputedPlanData> {
314 match &plan.strategy {
315 ExtractionStrategy::Chunked(cp) => {
316 let mut src = source::create_source(&plan.source)?;
317 let chunk_ranges = detect_and_generate_chunks(
318 &mut *src,
319 &plan.base_query,
320 &cp.column,
321 cp.chunk_size,
322 cp.chunk_count,
323 &plan.export_name,
324 cp.dense,
325 cp.by_days,
326 plan.source.source_type,
327 )?;
328 let chunk_count = chunk_ranges.len();
329 let chunked_estimate = chunk_ranges
336 .first()
337 .zip(chunk_ranges.last())
338 .map(|(first, last)| (last.1 - first.0 + 1).max(0));
339 Ok(ComputedPlanData {
340 chunk_ranges,
341 chunk_count,
342 cursor_snapshot: None,
343 row_estimate: chunked_estimate.or(row_estimate),
344 })
345 }
346
347 ExtractionStrategy::Incremental(_) => {
348 let cursor_snapshot = state.get(&plan.export_name)?.last_cursor_value;
349 Ok(ComputedPlanData {
350 chunk_ranges: vec![],
351 chunk_count: 0,
352 cursor_snapshot,
353 row_estimate,
354 })
355 }
356
357 ExtractionStrategy::Snapshot
360 | ExtractionStrategy::TimeWindow { .. }
361 | ExtractionStrategy::Keyset(_) => Ok(ComputedPlanData {
362 chunk_ranges: vec![],
363 chunk_count: 0,
364 cursor_snapshot: None,
365 row_estimate,
366 }),
367 }
368}
369
370fn emit_artifacts(
371 artifacts: &[PlanArtifact],
372 format: &PlanOutputFormat,
373 multi_export: bool,
374) -> Result<()> {
375 match format {
376 PlanOutputFormat::Pretty => {
377 for artifact in artifacts {
378 artifact.print_summary();
379 }
380 }
381 PlanOutputFormat::Json(None) => {
386 if artifacts.len() > 1 {
387 println!("{}", serde_json::to_string_pretty(artifacts)?);
390 } else if let Some(artifact) = artifacts.first() {
391 println!("{}", artifact.to_json_pretty()?);
392 }
393 }
394 PlanOutputFormat::Json(Some(path)) => {
395 for artifact in artifacts {
396 let json = artifact.to_json_pretty()?;
397 let out_path = if multi_export {
401 per_export_output_path(path, &artifact.export_name)
402 } else {
403 path.clone()
404 };
405 std::fs::write(&out_path, &json)
406 .map_err(|e| anyhow::anyhow!("cannot write plan file '{}': {}", out_path, e))?;
407 println!("Plan written to: {}", out_path);
408 }
409 }
410 }
411 Ok(())
412}
413
414#[cfg(test)]
415mod tests {
416 use super::per_export_output_path;
417 use std::path::Path;
418
419 #[test]
422 fn per_export_paths_are_distinct() {
423 let a = per_export_output_path("/tmp/plan.json", "orders");
424 let b = per_export_output_path("/tmp/plan.json", "users");
425 assert_ne!(a, b, "distinct exports must not collide on one path");
426 assert_eq!(a, "/tmp/plan.orders.json");
427 assert_eq!(b, "/tmp/plan.users.json");
428 }
429
430 #[test]
433 fn per_export_path_keeps_json_extension() {
434 let p = per_export_output_path("/tmp/plan.json", "orders");
435 assert_eq!(
436 Path::new(&p).extension().and_then(|e| e.to_str()),
437 Some("json"),
438 "per-export file must keep the .json extension"
439 );
440 }
441
442 #[test]
445 fn per_export_path_without_extension_appends_name() {
446 let p = per_export_output_path("/tmp/plan", "orders");
447 assert_eq!(p, "/tmp/plan.orders");
448 }
449
450 #[test]
453 fn per_export_path_sanitizes_unsafe_export_name() {
454 let p = per_export_output_path("/tmp/plan.json", "../../etc/passwd");
455 assert!(
457 !p.contains(".."),
458 "sanitized path must not contain a `..` traversal: {p}"
459 );
460 let file = Path::new(&p)
461 .file_name()
462 .and_then(|f| f.to_str())
463 .expect("derived path has a file name");
464 assert!(
465 !file.contains('/') && !file.contains('\\'),
466 "sanitized file name must not contain a path separator: {file}"
467 );
468 assert_eq!(
469 Path::new(&p).parent().and_then(|d| d.to_str()),
470 Some("/tmp"),
471 "derived path must stay in the requested directory"
472 );
473 assert_eq!(
474 Path::new(&p).extension().and_then(|e| e.to_str()),
475 Some("json"),
476 "extension must survive sanitization"
477 );
478 }
479}