Skip to main content

rivet/pipeline/
plan_cmd.rs

1//! **`rivet plan`** — build and display a `PlanArtifact` without executing.
2//!
3//! This module is the coordinator for the plan command. It:
4//!
5//! 1. Loads config and resolves the `ResolvedRunPlan` (same as `rivet run`).
6//! 2. Runs preflight diagnostics via `preflight::get_export_diagnostic`.
7//! 3. For `Chunked` exports: opens a source connection and pre-computes chunk
8//!    boundaries using `detect_and_generate_chunks` — no data is exported.
9//! 4. For `Incremental` exports: reads the current cursor from `StateStore`.
10//! 5. Packages everything into a `PlanArtifact` and either writes it to a file
11//!    or prints a human-readable summary to stdout.
12
13use std::collections::HashMap;
14use std::path::Path;
15
16use crate::config::Config;
17use crate::error::Result;
18use crate::plan::{
19    ComputedPlanData, ExportRecommendation, ExtractionStrategy, PlanArtifact, PlanDiagnostics,
20    PlanPrioritizationSnapshot, PrioritizationInputs, build_plan,
21    campaign::recommend_campaign,
22    inputs::{PrioritizationHints, build_prioritization_inputs},
23    recommend::recommend_export,
24    validate::{DiagnosticLevel, validate_plan},
25};
26use crate::state::StateStore;
27use crate::{preflight, source};
28
29use super::chunked::{chunk_plan_fingerprint, detect_and_generate_chunks};
30
31/// Output format for `rivet plan`.
32pub enum PlanOutputFormat {
33    /// Human-readable summary printed to stdout; no file written.
34    Pretty,
35    /// Pretty-printed JSON written to the given path (or stdout if `None`).
36    Json(Option<String>),
37}
38
39/// Entry point for `rivet plan`.
40///
41/// Iterates over all matching exports, builds a `PlanArtifact` for each, and
42/// either prints or saves it according to `format`.
43pub fn run_plan_command(
44    config_path: &str,
45    export_name: Option<&str>,
46    params: Option<&HashMap<String, String>>,
47    format: PlanOutputFormat,
48) -> Result<()> {
49    let config = Config::load_with_params(config_path, params)?;
50    let config_dir = Path::new(config_path)
51        .parent()
52        .unwrap_or_else(|| Path::new("."));
53
54    let exports: Vec<_> = if let Some(name) = export_name {
55        let e = config
56            .exports
57            .iter()
58            .find(|e| e.name == name)
59            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
60        vec![e]
61    } else {
62        config.exports.iter().collect()
63    };
64
65    let state_path = config_dir.join(".rivet_state.db");
66    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
67
68    let mut built: Vec<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> = Vec::new();
69
70    for export in exports {
71        built.push(build_plan_artifact(
72            &config,
73            export,
74            config_path,
75            config_dir,
76            params,
77            &state,
78        )?);
79    }
80
81    let campaign_opt = if built.len() > 1 {
82        let pairs: Vec<_> = built
83            .iter()
84            .map(|(_, i, r)| (i.clone(), r.clone()))
85            .collect();
86        Some(recommend_campaign(pairs))
87    } else {
88        None
89    };
90
91    for (mut artifact, _inputs, standalone_rec) in built {
92        let snap = if let Some(ref camp) = campaign_opt {
93            let rec = camp
94                .ordered_exports
95                .iter()
96                .find(|e| e.export_name == artifact.export_name)
97                .cloned()
98                .unwrap_or_else(|| standalone_rec.clone());
99            PlanPrioritizationSnapshot {
100                export_recommendation: rec,
101                campaign: Some(camp.clone()),
102            }
103        } else {
104            PlanPrioritizationSnapshot {
105                export_recommendation: standalone_rec,
106                campaign: None,
107            }
108        };
109        artifact.prioritization = Some(snap);
110        emit_artifact(&artifact, &format)?;
111    }
112
113    Ok(())
114}
115
116fn build_plan_artifact(
117    config: &Config,
118    export: &crate::config::ExportConfig,
119    config_path: &str,
120    config_dir: &Path,
121    params: Option<&HashMap<String, String>>,
122    state: &StateStore,
123) -> Result<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> {
124    let plan = build_plan(config, export, config_dir, false, false, false, params)?;
125
126    // Collect plan-level compatibility diagnostics and emit Rejected ones as errors.
127    let validate_diags = validate_plan(&plan);
128    let mut validate_warnings: Vec<String> = Vec::new();
129    for d in &validate_diags {
130        match d.level {
131            DiagnosticLevel::Rejected => {
132                anyhow::bail!("[{}] {}", d.rule, d.message);
133            }
134            DiagnosticLevel::Warning | DiagnosticLevel::Degraded => {
135                validate_warnings.push(format!("[{}] {}", d.rule, d.message));
136            }
137        }
138    }
139
140    let (computed, plan_diagnostics, hints) = match preflight::get_export_diagnostic(config, export)
141    {
142        Ok(diag) => {
143            let mut warnings = diag.warnings.clone();
144            warnings.extend(validate_warnings);
145            // F3 (0.7.5 audit): the JSON artifact's `diagnostics`
146            // exposed a non-Efficient verdict with `warnings: []`, so a
147            // machine consumer could not see *why* the plan was flagged.
148            // If preflight emitted no specific warning, fall back to the
149            // build_suggestion text (the same line shown in `rivet check`)
150            // so the JSON always carries at least one human-readable
151            // reason matching the verdict.
152            if warnings.is_empty() && !matches!(diag.verdict, preflight::HealthVerdict::Efficient) {
153                if let Some(s) = diag.suggestion.clone() {
154                    warnings.push(s);
155                } else {
156                    warnings.push(format!(
157                        "verdict {} but preflight collected no specific warnings — review `rivet check` output for context",
158                        diag.verdict
159                    ));
160                }
161            }
162            let plan_diagnostics = PlanDiagnostics {
163                verdict: diag.verdict.to_string(),
164                warnings,
165                recommended_profile: diag.recommended_profile.to_string(),
166            };
167            let computed = compute_plan_data(&plan, diag.row_estimate, state)?;
168            let hints = PrioritizationHints {
169                incremental_uses_index: diag.uses_index,
170                cursor_range_observed: diag.cursor_min.is_some() && diag.cursor_max.is_some(),
171            };
172            (computed, plan_diagnostics, hints)
173        }
174        Err(e) => {
175            log::warn!(
176                "plan '{}': preflight diagnostics failed (continuing without them): {:#}",
177                export.name,
178                e
179            );
180            let computed = compute_plan_data(&plan, None, state)?;
181            let mut warnings = vec!["preflight diagnostics unavailable".into()];
182            warnings.extend(validate_warnings);
183            let plan_diagnostics = PlanDiagnostics {
184                verdict: "unknown (preflight failed)".into(),
185                warnings,
186                recommended_profile: "balanced".into(),
187            };
188            (computed, plan_diagnostics, PrioritizationHints::default())
189        }
190    };
191
192    let fingerprint = match &plan.strategy {
193        ExtractionStrategy::Chunked(cp) => chunk_plan_fingerprint(
194            &plan.base_query,
195            &cp.column,
196            cp.chunk_size,
197            cp.chunk_count,
198            cp.dense,
199            cp.by_days,
200        ),
201        _ => String::new(),
202    };
203
204    // Epic I: fold recent-run history into prioritization (bounded contribution).
205    let history = match state.get_metrics(Some(&export.name), 20) {
206        Ok(metrics) => Some(crate::plan::HistorySnapshot::summarize(&metrics)),
207        Err(e) => {
208            log::warn!(
209                "plan '{}': history lookup failed; proceeding without historical refinement: {:#}",
210                export.name,
211                e
212            );
213            None
214        }
215    };
216
217    let inputs =
218        build_prioritization_inputs(export, &plan, &computed, &plan_diagnostics, hints, history);
219    let recommendation = recommend_export(&inputs, &plan_diagnostics);
220
221    let mut artifact = PlanArtifact::new(
222        plan.export_name.clone(),
223        plan.strategy.mode_label().to_string(),
224        fingerprint,
225        plan,
226        computed,
227        plan_diagnostics,
228    );
229
230    // F13 (0.7.5 audit): record the absolute config path so `rivet
231    // apply` can locate the matching `.rivet_state.db` (cursors,
232    // manifest history) even when the plan JSON is stored in a
233    // different directory.  `canonicalize` resolves symlinks and
234    // produces an absolute path; we fall back to the original string
235    // if the path no longer resolves (rare, e.g. config deleted
236    // between plan and apply).
237    artifact.config_path = Some(
238        Path::new(config_path)
239            .canonicalize()
240            .map(|p| p.to_string_lossy().into_owned())
241            .unwrap_or_else(|_| config_path.to_string()),
242    );
243
244    Ok((artifact, inputs, recommendation))
245}
246
247/// Compute the `ComputedPlanData` portion of the artifact.
248///
249/// For `Chunked` exports this opens a source connection and calls
250/// `detect_and_generate_chunks` to pre-compute chunk boundaries.  No rows are
251/// exported — we only run the `SELECT min(col) / max(col)` boundary queries.
252///
253/// For `Incremental` exports we read the last cursor value from `StateStore`.
254fn compute_plan_data(
255    plan: &crate::plan::ResolvedRunPlan,
256    row_estimate: Option<i64>,
257    state: &StateStore,
258) -> Result<ComputedPlanData> {
259    match &plan.strategy {
260        ExtractionStrategy::Chunked(cp) => {
261            let mut src = source::create_source(&plan.source)?;
262            let chunk_ranges = detect_and_generate_chunks(
263                &mut *src,
264                &plan.base_query,
265                &cp.column,
266                cp.chunk_size,
267                cp.chunk_count,
268                &plan.export_name,
269                cp.dense,
270                cp.by_days,
271                plan.source.source_type,
272            )?;
273            let chunk_count = chunk_ranges.len();
274            // F4 (0.7.5 audit): for chunked exports the artifact already
275            // contains the exact key span (`chunk_ranges[0].0` ..
276            // `chunk_ranges[-1].1`).  Using that as `row_estimate`
277            // beats `pg_class.reltuples` (which is `1130` on a fresh
278            // 30-row PG table because `ANALYZE` hasn't run).  This
279            // makes PG and MySQL agree on the same artifact.
280            let chunked_estimate = chunk_ranges
281                .first()
282                .zip(chunk_ranges.last())
283                .map(|(first, last)| (last.1 - first.0 + 1).max(0));
284            Ok(ComputedPlanData {
285                chunk_ranges,
286                chunk_count,
287                cursor_snapshot: None,
288                row_estimate: chunked_estimate.or(row_estimate),
289            })
290        }
291
292        ExtractionStrategy::Incremental(_) => {
293            let cursor_snapshot = state.get(&plan.export_name)?.last_cursor_value;
294            Ok(ComputedPlanData {
295                chunk_ranges: vec![],
296                chunk_count: 0,
297                cursor_snapshot,
298                row_estimate,
299            })
300        }
301
302        // Keyset pages are computed dynamically at run time (seek pagination),
303        // so there are no precomputed ranges to describe here — like a snapshot.
304        ExtractionStrategy::Snapshot
305        | ExtractionStrategy::TimeWindow { .. }
306        | ExtractionStrategy::Keyset(_) => Ok(ComputedPlanData {
307            chunk_ranges: vec![],
308            chunk_count: 0,
309            cursor_snapshot: None,
310            row_estimate,
311        }),
312    }
313}
314
315fn emit_artifact(artifact: &PlanArtifact, format: &PlanOutputFormat) -> Result<()> {
316    match format {
317        PlanOutputFormat::Pretty => {
318            artifact.print_summary();
319        }
320        PlanOutputFormat::Json(None) => {
321            println!("{}", artifact.to_json_pretty()?);
322        }
323        PlanOutputFormat::Json(Some(path)) => {
324            let json = artifact.to_json_pretty()?;
325            std::fs::write(path, &json)
326                .map_err(|e| anyhow::anyhow!("cannot write plan file '{}': {}", path, e))?;
327            println!("Plan written to: {}", path);
328        }
329    }
330    Ok(())
331}