1use std::collections::HashMap;
14use std::path::Path;
15
16use crate::config::Config;
17use crate::error::Result;
18use crate::plan::{
19 ComputedPlanData, ExportRecommendation, ExtractionStrategy, PlanArtifact, PlanDiagnostics,
20 PlanPrioritizationSnapshot, PrioritizationInputs, build_plan,
21 campaign::recommend_campaign,
22 inputs::{PrioritizationHints, build_prioritization_inputs},
23 recommend::recommend_export,
24 validate::{DiagnosticLevel, validate_plan},
25};
26use crate::state::StateStore;
27use crate::{preflight, source};
28
29use super::chunked::{chunk_plan_fingerprint, detect_and_generate_chunks};
30
31pub enum PlanOutputFormat {
33 Pretty,
35 Json(Option<String>),
37}
38
39pub fn run_plan_command(
44 config_path: &str,
45 export_name: Option<&str>,
46 params: Option<&HashMap<String, String>>,
47 format: PlanOutputFormat,
48) -> Result<()> {
49 let config = Config::load_with_params(config_path, params)?;
50 let config_dir = Path::new(config_path)
51 .parent()
52 .unwrap_or_else(|| Path::new("."));
53
54 let exports: Vec<_> = if let Some(name) = export_name {
55 let e = config
56 .exports
57 .iter()
58 .find(|e| e.name == name)
59 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
60 vec![e]
61 } else {
62 config.exports.iter().collect()
63 };
64
65 let state_path = config_dir.join(".rivet_state.db");
66 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
67
68 let mut built: Vec<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> = Vec::new();
69
70 for export in exports {
71 built.push(build_plan_artifact(
72 &config,
73 export,
74 config_path,
75 config_dir,
76 params,
77 &state,
78 )?);
79 }
80
81 let campaign_opt = if built.len() > 1 {
82 let pairs: Vec<_> = built
83 .iter()
84 .map(|(_, i, r)| (i.clone(), r.clone()))
85 .collect();
86 Some(recommend_campaign(pairs))
87 } else {
88 None
89 };
90
91 let multi_export = built.len() > 1;
98
99 let mut artifacts: Vec<PlanArtifact> = Vec::with_capacity(built.len());
100 for (mut artifact, _inputs, standalone_rec) in built {
101 let snap = if let Some(ref camp) = campaign_opt {
102 let rec = camp
103 .ordered_exports
104 .iter()
105 .find(|e| e.export_name == artifact.export_name)
106 .cloned()
107 .unwrap_or_else(|| standalone_rec.clone());
108 PlanPrioritizationSnapshot {
109 export_recommendation: rec,
110 campaign: Some(camp.clone()),
111 }
112 } else {
113 PlanPrioritizationSnapshot {
114 export_recommendation: standalone_rec,
115 campaign: None,
116 }
117 };
118 artifact.prioritization = Some(snap);
119 artifacts.push(artifact);
120 }
121
122 let mut fields: ExportFields = HashMap::new();
129 for a in &artifacts {
130 if let Some(p) = a.prioritization.as_ref() {
131 let rec = &p.export_recommendation;
132 let parallel_safe =
136 rec.cost_class == crate::plan::CostClass::Low && !rec.isolate_on_source;
137 fields.insert(
138 a.export_name.clone(),
139 vec![
140 ("wave", rec.recommended_wave.to_string()),
141 ("parallel_safe", parallel_safe.to_string()),
142 ],
143 );
144 }
145 }
146 if !fields.is_empty() {
147 write_plan_fields_to_config(config_path, &fields)?;
148 log::info!(
149 "plan: recorded wave + parallel-safety for {} export(s) in {}",
150 fields.len(),
151 config_path
152 );
153 }
154
155 emit_artifacts(&artifacts, &format, multi_export, config_path)?;
156
157 Ok(())
158}
159
160fn per_export_output_path(base: &str, export_name: &str) -> String {
171 let sanitized: String = export_name
172 .chars()
173 .map(|c| {
174 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
175 c
176 } else {
177 '_'
178 }
179 })
180 .collect();
181
182 let path = Path::new(base);
183 let parent = path.parent();
184 let stem = path.file_stem().and_then(|s| s.to_str());
185 let ext = path.extension().and_then(|s| s.to_str());
186
187 let file_name = match (stem, ext) {
188 (Some(stem), Some(ext)) => format!("{stem}.{sanitized}.{ext}"),
189 (Some(stem), None) => format!("{stem}.{sanitized}"),
190 (None, _) => format!("{base}.{sanitized}"),
194 };
195
196 match parent {
197 Some(dir) if !dir.as_os_str().is_empty() => {
198 dir.join(file_name).to_string_lossy().into_owned()
199 }
200 _ => file_name,
201 }
202}
203
204fn build_plan_artifact(
205 config: &Config,
206 export: &crate::config::ExportConfig,
207 config_path: &str,
208 config_dir: &Path,
209 params: Option<&HashMap<String, String>>,
210 state: &StateStore,
211) -> Result<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> {
212 let plan = build_plan(config, export, config_dir, false, false, false, params)?;
213
214 let validate_diags = validate_plan(&plan);
216 let mut validate_warnings: Vec<String> = Vec::new();
217 for d in &validate_diags {
218 match d.level {
219 DiagnosticLevel::Rejected => {
220 anyhow::bail!("[{}] {}", d.rule, d.message);
221 }
222 DiagnosticLevel::Warning | DiagnosticLevel::Degraded => {
223 validate_warnings.push(format!("[{}] {}", d.rule, d.message));
224 }
225 }
226 }
227
228 let (computed, plan_diagnostics, hints) = match preflight::get_export_diagnostic(config, export)
229 {
230 Ok(diag) => {
231 let mut warnings: Vec<String> =
234 diag.warnings.iter().map(|w| w.message.clone()).collect();
235 warnings.extend(validate_warnings);
236 if warnings.is_empty() && !matches!(diag.verdict, preflight::HealthVerdict::Efficient) {
244 if let Some(s) = diag.suggestion.clone() {
245 warnings.push(s);
246 } else {
247 warnings.push(format!(
248 "verdict {} but preflight collected no specific warnings — review `rivet check` output for context",
249 diag.verdict
250 ));
251 }
252 }
253 let strategy_rationale = crate::plan::explain_strategy(&diag, export);
258 let plan_diagnostics = PlanDiagnostics {
259 verdict: diag.verdict.to_string(),
260 warnings,
261 recommended_profile: diag.recommended_profile.to_string(),
262 strategy_rationale,
263 };
264 let computed = compute_plan_data(&plan, diag.row_estimate, state)?;
265 let hints = PrioritizationHints {
266 incremental_uses_index: diag.uses_index,
267 cursor_range_observed: diag.cursor_min.is_some() && diag.cursor_max.is_some(),
268 };
269 (computed, plan_diagnostics, hints)
270 }
271 Err(e) => {
272 log::warn!(
273 "plan '{}': preflight diagnostics failed (continuing without them): {:#}",
274 export.name,
275 e
276 );
277 let computed = compute_plan_data(&plan, None, state)?;
278 let mut warnings = vec!["preflight diagnostics unavailable".into()];
279 warnings.extend(validate_warnings);
280 let plan_diagnostics = PlanDiagnostics {
281 verdict: "unknown (preflight failed)".into(),
282 warnings,
283 recommended_profile: "balanced".into(),
284 strategy_rationale: "Strategy rationale unavailable — preflight diagnostics could \
287 not be collected for this export."
288 .into(),
289 };
290 (computed, plan_diagnostics, PrioritizationHints::default())
291 }
292 };
293
294 let fingerprint = match &plan.strategy {
295 ExtractionStrategy::Chunked(cp) => chunk_plan_fingerprint(
296 &plan.base_query,
297 &cp.column,
298 cp.chunk_size,
299 cp.chunk_count,
300 cp.dense,
301 cp.by_days,
302 ),
303 _ => String::new(),
304 };
305
306 let history = match state.get_metrics(Some(&export.name), 20) {
308 Ok(metrics) => Some(crate::plan::HistorySnapshot::summarize(&metrics)),
309 Err(e) => {
310 log::warn!(
311 "plan '{}': history lookup failed; proceeding without historical refinement: {:#}",
312 export.name,
313 e
314 );
315 None
316 }
317 };
318
319 let inputs =
320 build_prioritization_inputs(export, &plan, &computed, &plan_diagnostics, hints, history);
321 let recommendation = recommend_export(&inputs, &plan_diagnostics);
322
323 let mut artifact = PlanArtifact::new(
324 plan.export_name.clone(),
325 plan.strategy.mode_label().to_string(),
326 fingerprint,
327 plan,
328 computed,
329 plan_diagnostics,
330 );
331
332 artifact.config_path = Some(
340 Path::new(config_path)
341 .canonicalize()
342 .map(|p| p.to_string_lossy().into_owned())
343 .unwrap_or_else(|_| config_path.to_string()),
344 );
345
346 Ok((artifact, inputs, recommendation))
347}
348
349fn compute_plan_data(
357 plan: &crate::plan::ResolvedRunPlan,
358 row_estimate: Option<i64>,
359 state: &StateStore,
360) -> Result<ComputedPlanData> {
361 match &plan.strategy {
362 ExtractionStrategy::Chunked(cp) => {
363 let mut src = source::create_source(&plan.source)?;
364 let chunk_ranges = detect_and_generate_chunks(
365 &mut *src,
366 &plan.base_query,
367 &cp.column,
368 cp.chunk_size,
369 cp.chunk_count,
370 &plan.export_name,
371 cp.dense,
372 cp.by_days,
373 plan.source.source_type,
374 )?;
375 let chunk_count = chunk_ranges.len();
376 let chunked_estimate = chunk_ranges
383 .first()
384 .zip(chunk_ranges.last())
385 .map(|(first, last)| (last.1 - first.0 + 1).max(0));
386 Ok(ComputedPlanData {
387 chunk_ranges,
388 chunk_count,
389 cursor_snapshot: None,
390 row_estimate: chunked_estimate.or(row_estimate),
391 })
392 }
393
394 ExtractionStrategy::Incremental(_) => {
395 let cursor_snapshot = state.get(&plan.export_name)?.last_cursor_value;
396 Ok(ComputedPlanData {
397 chunk_ranges: vec![],
398 chunk_count: 0,
399 cursor_snapshot,
400 row_estimate,
401 })
402 }
403
404 ExtractionStrategy::Snapshot
407 | ExtractionStrategy::TimeWindow { .. }
408 | ExtractionStrategy::Keyset(_) => Ok(ComputedPlanData {
409 chunk_ranges: vec![],
410 chunk_count: 0,
411 cursor_snapshot: None,
412 row_estimate,
413 }),
414 }
415}
416
417fn emit_artifacts(
418 artifacts: &[PlanArtifact],
419 format: &PlanOutputFormat,
420 multi_export: bool,
421 config_path: &str,
422) -> Result<()> {
423 match format {
424 PlanOutputFormat::Pretty => {
425 if multi_export {
426 print_compact_summary(artifacts, config_path);
431 } else {
432 for artifact in artifacts {
433 artifact.print_summary();
434 }
435 }
436 }
437 PlanOutputFormat::Json(None) => {
442 if artifacts.len() > 1 {
443 println!("{}", serde_json::to_string_pretty(artifacts)?);
446 } else if let Some(artifact) = artifacts.first() {
447 println!("{}", artifact.to_json_pretty()?);
448 }
449 }
450 PlanOutputFormat::Json(Some(path)) => {
451 for artifact in artifacts {
452 let json = artifact.to_json_pretty()?;
453 let out_path = if multi_export {
457 per_export_output_path(path, &artifact.export_name)
458 } else {
459 path.clone()
460 };
461 std::fs::write(&out_path, &json)
462 .map_err(|e| anyhow::anyhow!("cannot write plan file '{}': {}", out_path, e))?;
463 println!("Plan written to: {}", out_path);
464 }
465 }
466 }
467 Ok(())
468}
469
470fn print_compact_summary(artifacts: &[PlanArtifact], config_path: &str) {
475 let key = |a: &PlanArtifact| {
476 a.prioritization
477 .as_ref()
478 .map(|p| {
479 (
480 p.export_recommendation.recommended_wave,
481 p.export_recommendation.priority_score,
482 )
483 })
484 .unwrap_or((u32::MAX, 0))
485 };
486 let mut order: Vec<&PlanArtifact> = artifacts.iter().collect();
487 order.sort_by(|a, b| {
488 let (wa, sa) = key(a);
489 let (wb, sb) = key(b);
490 wa.cmp(&wb)
491 .then(sb.cmp(&sa))
492 .then(a.export_name.cmp(&b.export_name))
493 });
494 let name_w = order
495 .iter()
496 .map(|a| a.export_name.chars().count())
497 .max()
498 .unwrap_or(6)
499 .clamp(6, 32);
500
501 println!();
502 println!(
503 " Plan: {} exports — `rivet apply {}` runs them by wave (lowest first)",
504 artifacts.len(),
505 config_path
506 );
507 println!();
508 println!("{}", PlanArtifact::summary_header(name_w));
509 for a in &order {
510 println!("{}", a.summary_line(name_w));
511 }
512 println!();
513 println!(" Full detail for one export: rivet plan -c <config> --export <name>");
514}
515
516type ExportFields = HashMap<String, Vec<(&'static str, String)>>;
519
520fn write_plan_fields_to_config(config_path: &str, fields: &ExportFields) -> Result<()> {
521 if fields.is_empty() {
522 return Ok(());
523 }
524 let original = std::fs::read_to_string(config_path).map_err(|e| {
525 anyhow::anyhow!(
526 "cannot read config '{}' to record plan fields: {}",
527 config_path,
528 e
529 )
530 })?;
531 let updated = apply_field_annotations(&original, fields);
532 if updated != original {
533 std::fs::write(config_path, updated).map_err(|e| {
534 anyhow::anyhow!(
535 "cannot write plan fields to config '{}': {}",
536 config_path,
537 e
538 )
539 })?;
540 }
541 Ok(())
542}
543
544fn apply_field_annotations(yaml: &str, fields: &ExportFields) -> String {
548 let mut out = String::with_capacity(yaml.len() + 64);
549 let mut current: Option<(String, usize, usize)> = None;
552
553 for line in yaml.split_inclusive('\n') {
554 let content = line.strip_suffix('\n').unwrap_or(line);
555
556 if let Some((dash_indent, name)) = parse_export_name(content) {
557 out.push_str(line);
558 let field_indent = dash_indent + 2;
559 if let Some(items) = fields.get(&name) {
560 for (key, value) in items {
561 out.push_str(&" ".repeat(field_indent));
562 out.push_str(&format!("{key}: {value}\n"));
563 }
564 }
565 current = Some((name, dash_indent, field_indent));
566 continue;
567 }
568
569 if let Some((name, dash_indent, field_indent)) = current.as_ref() {
570 let trimmed = content.trim_start();
571 let indent = content.len() - trimmed.len();
572 let blank_or_comment = trimmed.is_empty() || trimmed.starts_with('#');
573 if !blank_or_comment && indent <= *dash_indent {
574 current = None; } else if indent == *field_indent
576 && let Some(items) = fields.get(name)
577 && items
578 .iter()
579 .any(|(key, _)| trimmed.starts_with(&format!("{key}:")))
580 {
581 continue; }
583 }
584 out.push_str(line);
585 }
586 out
587}
588
589fn parse_export_name(line: &str) -> Option<(usize, String)> {
592 let trimmed = line.trim_start();
593 let dash_indent = line.len() - trimmed.len();
594 let rest = trimmed.strip_prefix("- ")?;
595 let value = rest.trim_start().strip_prefix("name:")?;
596 let value = value.split(" #").next().unwrap_or(value);
598 let name = value
599 .trim()
600 .trim_matches(|c: char| c == '"' || c == '\'')
601 .to_string();
602 (!name.is_empty()).then_some((dash_indent, name))
603}
604
605#[cfg(test)]
606mod tests {
607 use super::per_export_output_path;
608 use std::path::Path;
609
610 use super::{ExportFields, apply_field_annotations};
611
612 fn wave_fields(pairs: &[(&str, u32)]) -> ExportFields {
613 pairs
614 .iter()
615 .map(|(n, w)| (n.to_string(), vec![("wave", w.to_string())]))
616 .collect()
617 }
618
619 #[test]
622 fn wave_annotations_insert_replace_and_preserve() {
623 let yaml = "exports:\n - name: orders # the orders table\n mode: incremental\n wave: 9\n cursor_column: updated_at\n - name: events\n mode: full\ndestination:\n type: local\n";
624 let out = apply_field_annotations(yaml, &wave_fields(&[("orders", 2), ("events", 1)]));
625 assert!(
627 out.contains("- name: orders # the orders table\n wave: 2\n"),
628 "{out}"
629 );
630 assert!(
631 !out.contains("wave: 9"),
632 "stale wave must be dropped:\n{out}"
633 );
634 assert!(out.contains("- name: events\n wave: 1\n"), "{out}");
636 assert!(out.contains("cursor_column: updated_at"));
638 assert!(out.contains("destination:\n type: local"));
639 }
640
641 #[test]
643 fn wave_annotations_leave_unlisted_export_untouched() {
644 let yaml = "exports:\n - name: orders\n mode: full\n";
645 let out = apply_field_annotations(yaml, &wave_fields(&[("other", 1)]));
646 assert_eq!(out, yaml);
647 }
648
649 #[test]
652 fn per_export_paths_are_distinct() {
653 let a = per_export_output_path("/tmp/plan.json", "orders");
654 let b = per_export_output_path("/tmp/plan.json", "users");
655 assert_ne!(a, b, "distinct exports must not collide on one path");
656 assert_eq!(a, "/tmp/plan.orders.json");
657 assert_eq!(b, "/tmp/plan.users.json");
658 }
659
660 #[test]
663 fn per_export_path_keeps_json_extension() {
664 let p = per_export_output_path("/tmp/plan.json", "orders");
665 assert_eq!(
666 Path::new(&p).extension().and_then(|e| e.to_str()),
667 Some("json"),
668 "per-export file must keep the .json extension"
669 );
670 }
671
672 #[test]
675 fn per_export_path_without_extension_appends_name() {
676 let p = per_export_output_path("/tmp/plan", "orders");
677 assert_eq!(p, "/tmp/plan.orders");
678 }
679
680 #[test]
683 fn per_export_path_sanitizes_unsafe_export_name() {
684 let p = per_export_output_path("/tmp/plan.json", "../../etc/passwd");
685 assert!(
687 !p.contains(".."),
688 "sanitized path must not contain a `..` traversal: {p}"
689 );
690 let file = Path::new(&p)
691 .file_name()
692 .and_then(|f| f.to_str())
693 .expect("derived path has a file name");
694 assert!(
695 !file.contains('/') && !file.contains('\\'),
696 "sanitized file name must not contain a path separator: {file}"
697 );
698 assert_eq!(
699 Path::new(&p).parent().and_then(|d| d.to_str()),
700 Some("/tmp"),
701 "derived path must stay in the requested directory"
702 );
703 assert_eq!(
704 Path::new(&p).extension().and_then(|e| e.to_str()),
705 Some("json"),
706 "extension must survive sanitization"
707 );
708 }
709}