Skip to main content

rivet/pipeline/
plan_cmd.rs

1//! **`rivet plan`** — build and display a `PlanArtifact` without executing.
2//!
3//! This module is the coordinator for the plan command. It:
4//!
5//! 1. Loads config and resolves the `ResolvedRunPlan` (same as `rivet run`).
6//! 2. Runs preflight diagnostics via `preflight::get_export_diagnostic`.
7//! 3. For `Chunked` exports: opens a source connection and pre-computes chunk
8//!    boundaries using `detect_and_generate_chunks` — no data is exported.
9//! 4. For `Incremental` exports: reads the current cursor from `StateStore`.
10//! 5. Packages everything into a `PlanArtifact` and either writes it to a file
11//!    or prints a human-readable summary to stdout.
12
13use std::collections::HashMap;
14use std::path::Path;
15
16use crate::config::Config;
17use crate::error::Result;
18use crate::plan::{
19    ComputedPlanData, ExportRecommendation, ExtractionStrategy, PlanArtifact, PlanDiagnostics,
20    PlanPrioritizationSnapshot, PrioritizationInputs, build_plan,
21    campaign::recommend_campaign,
22    inputs::{PrioritizationHints, build_prioritization_inputs},
23    recommend::recommend_export,
24    validate::{DiagnosticLevel, validate_plan},
25};
26use crate::state::StateStore;
27use crate::{preflight, source};
28
29use super::chunked::{chunk_plan_fingerprint, detect_and_generate_chunks};
30
31/// Output format for `rivet plan`.
32pub enum PlanOutputFormat {
33    /// Human-readable summary printed to stdout; no file written.
34    Pretty,
35    /// Pretty-printed JSON written to the given path (or stdout if `None`).
36    Json(Option<String>),
37}
38
39/// Entry point for `rivet plan`.
40///
41/// Iterates over all matching exports, builds a `PlanArtifact` for each, and
42/// either prints or saves it according to `format`.
43pub fn run_plan_command(
44    config_path: &str,
45    export_name: Option<&str>,
46    params: Option<&HashMap<String, String>>,
47    format: PlanOutputFormat,
48) -> Result<()> {
49    let config = Config::load_with_params(config_path, params)?;
50    let config_dir = Path::new(config_path)
51        .parent()
52        .unwrap_or_else(|| Path::new("."));
53
54    let exports: Vec<_> = if let Some(name) = export_name {
55        let e = config
56            .exports
57            .iter()
58            .find(|e| e.name == name)
59            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
60        vec![e]
61    } else {
62        config.exports.iter().collect()
63    };
64
65    let state_path = config_dir.join(".rivet_state.db");
66    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
67
68    let mut built: Vec<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> = Vec::new();
69
70    for export in exports {
71        built.push(build_plan_artifact(
72            &config,
73            export,
74            config_path,
75            config_dir,
76            params,
77            &state,
78        )?);
79    }
80
81    let campaign_opt = if built.len() > 1 {
82        let pairs: Vec<_> = built
83            .iter()
84            .map(|(_, i, r)| (i.clone(), r.clone()))
85            .collect();
86        Some(recommend_campaign(pairs))
87    } else {
88        None
89    };
90
91    // When `--output FILE` is given but the config yields more than one export,
92    // a single path cannot hold every artifact: `rivet apply` reads exactly one
93    // `PlanArtifact` per file (`PlanArtifact::from_file`), and a JSON array is
94    // not a valid artifact. Writing all of them to the same path silently kept
95    // only the last (audit #4). Instead, derive a distinct per-export path so no
96    // export is dropped and each file remains directly consumable by `apply`.
97    let multi_export = built.len() > 1;
98
99    let mut artifacts: Vec<PlanArtifact> = Vec::with_capacity(built.len());
100    for (mut artifact, _inputs, standalone_rec) in built {
101        let snap = if let Some(ref camp) = campaign_opt {
102            let rec = camp
103                .ordered_exports
104                .iter()
105                .find(|e| e.export_name == artifact.export_name)
106                .cloned()
107                .unwrap_or_else(|| standalone_rec.clone());
108            PlanPrioritizationSnapshot {
109                export_recommendation: rec,
110                campaign: Some(camp.clone()),
111            }
112        } else {
113            PlanPrioritizationSnapshot {
114                export_recommendation: standalone_rec,
115                campaign: None,
116            }
117        };
118        artifact.prioritization = Some(snap);
119        artifacts.push(artifact);
120    }
121
122    emit_artifacts(&artifacts, &format, multi_export)?;
123
124    Ok(())
125}
126
127/// Derive a distinct per-export output path from the `--output` value when more
128/// than one export is being planned.
129///
130/// Inserts the export name into the file stem (`plan.json` → `plan.orders.json`)
131/// so every artifact lands on its own path and `rivet apply <file>` consumes a
132/// single artifact unchanged. The export name is sanitised to a safe filename
133/// fragment (any non-alphanumeric/`-`/`_` byte becomes `_`) so an export named
134/// with a slash or dot cannot escape the intended directory or mangle the
135/// extension. The original extension is preserved so per-export files keep the
136/// same suffix (`.json`) the operator asked for.
137fn per_export_output_path(base: &str, export_name: &str) -> String {
138    let sanitized: String = export_name
139        .chars()
140        .map(|c| {
141            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
142                c
143            } else {
144                '_'
145            }
146        })
147        .collect();
148
149    let path = Path::new(base);
150    let parent = path.parent();
151    let stem = path.file_stem().and_then(|s| s.to_str());
152    let ext = path.extension().and_then(|s| s.to_str());
153
154    let file_name = match (stem, ext) {
155        (Some(stem), Some(ext)) => format!("{stem}.{sanitized}.{ext}"),
156        (Some(stem), None) => format!("{stem}.{sanitized}"),
157        // No usable stem (e.g. path was empty or just an extension): fall back to
158        // appending the sanitized name so the artifact still lands on a distinct
159        // path rather than colliding.
160        (None, _) => format!("{base}.{sanitized}"),
161    };
162
163    match parent {
164        Some(dir) if !dir.as_os_str().is_empty() => {
165            dir.join(file_name).to_string_lossy().into_owned()
166        }
167        _ => file_name,
168    }
169}
170
171fn build_plan_artifact(
172    config: &Config,
173    export: &crate::config::ExportConfig,
174    config_path: &str,
175    config_dir: &Path,
176    params: Option<&HashMap<String, String>>,
177    state: &StateStore,
178) -> Result<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> {
179    let plan = build_plan(config, export, config_dir, false, false, false, params)?;
180
181    // Collect plan-level compatibility diagnostics and emit Rejected ones as errors.
182    let validate_diags = validate_plan(&plan);
183    let mut validate_warnings: Vec<String> = Vec::new();
184    for d in &validate_diags {
185        match d.level {
186            DiagnosticLevel::Rejected => {
187                anyhow::bail!("[{}] {}", d.rule, d.message);
188            }
189            DiagnosticLevel::Warning | DiagnosticLevel::Degraded => {
190                validate_warnings.push(format!("[{}] {}", d.rule, d.message));
191            }
192        }
193    }
194
195    let (computed, plan_diagnostics, hints) = match preflight::get_export_diagnostic(config, export)
196    {
197        Ok(diag) => {
198            let mut warnings = diag.warnings.clone();
199            warnings.extend(validate_warnings);
200            // F3 (0.7.5 audit): the JSON artifact's `diagnostics`
201            // exposed a non-Efficient verdict with `warnings: []`, so a
202            // machine consumer could not see *why* the plan was flagged.
203            // If preflight emitted no specific warning, fall back to the
204            // build_suggestion text (the same line shown in `rivet check`)
205            // so the JSON always carries at least one human-readable
206            // reason matching the verdict.
207            if warnings.is_empty() && !matches!(diag.verdict, preflight::HealthVerdict::Efficient) {
208                if let Some(s) = diag.suggestion.clone() {
209                    warnings.push(s);
210                } else {
211                    warnings.push(format!(
212                        "verdict {} but preflight collected no specific warnings — review `rivet check` output for context",
213                        diag.verdict
214                    ));
215                }
216            }
217            let plan_diagnostics = PlanDiagnostics {
218                verdict: diag.verdict.to_string(),
219                warnings,
220                recommended_profile: diag.recommended_profile.to_string(),
221            };
222            let computed = compute_plan_data(&plan, diag.row_estimate, state)?;
223            let hints = PrioritizationHints {
224                incremental_uses_index: diag.uses_index,
225                cursor_range_observed: diag.cursor_min.is_some() && diag.cursor_max.is_some(),
226            };
227            (computed, plan_diagnostics, hints)
228        }
229        Err(e) => {
230            log::warn!(
231                "plan '{}': preflight diagnostics failed (continuing without them): {:#}",
232                export.name,
233                e
234            );
235            let computed = compute_plan_data(&plan, None, state)?;
236            let mut warnings = vec!["preflight diagnostics unavailable".into()];
237            warnings.extend(validate_warnings);
238            let plan_diagnostics = PlanDiagnostics {
239                verdict: "unknown (preflight failed)".into(),
240                warnings,
241                recommended_profile: "balanced".into(),
242            };
243            (computed, plan_diagnostics, PrioritizationHints::default())
244        }
245    };
246
247    let fingerprint = match &plan.strategy {
248        ExtractionStrategy::Chunked(cp) => chunk_plan_fingerprint(
249            &plan.base_query,
250            &cp.column,
251            cp.chunk_size,
252            cp.chunk_count,
253            cp.dense,
254            cp.by_days,
255        ),
256        _ => String::new(),
257    };
258
259    // Epic I: fold recent-run history into prioritization (bounded contribution).
260    let history = match state.get_metrics(Some(&export.name), 20) {
261        Ok(metrics) => Some(crate::plan::HistorySnapshot::summarize(&metrics)),
262        Err(e) => {
263            log::warn!(
264                "plan '{}': history lookup failed; proceeding without historical refinement: {:#}",
265                export.name,
266                e
267            );
268            None
269        }
270    };
271
272    let inputs =
273        build_prioritization_inputs(export, &plan, &computed, &plan_diagnostics, hints, history);
274    let recommendation = recommend_export(&inputs, &plan_diagnostics);
275
276    let mut artifact = PlanArtifact::new(
277        plan.export_name.clone(),
278        plan.strategy.mode_label().to_string(),
279        fingerprint,
280        plan,
281        computed,
282        plan_diagnostics,
283    );
284
285    // F13 (0.7.5 audit): record the absolute config path so `rivet
286    // apply` can locate the matching `.rivet_state.db` (cursors,
287    // manifest history) even when the plan JSON is stored in a
288    // different directory.  `canonicalize` resolves symlinks and
289    // produces an absolute path; we fall back to the original string
290    // if the path no longer resolves (rare, e.g. config deleted
291    // between plan and apply).
292    artifact.config_path = Some(
293        Path::new(config_path)
294            .canonicalize()
295            .map(|p| p.to_string_lossy().into_owned())
296            .unwrap_or_else(|_| config_path.to_string()),
297    );
298
299    Ok((artifact, inputs, recommendation))
300}
301
302/// Compute the `ComputedPlanData` portion of the artifact.
303///
304/// For `Chunked` exports this opens a source connection and calls
305/// `detect_and_generate_chunks` to pre-compute chunk boundaries.  No rows are
306/// exported — we only run the `SELECT min(col) / max(col)` boundary queries.
307///
308/// For `Incremental` exports we read the last cursor value from `StateStore`.
309fn compute_plan_data(
310    plan: &crate::plan::ResolvedRunPlan,
311    row_estimate: Option<i64>,
312    state: &StateStore,
313) -> Result<ComputedPlanData> {
314    match &plan.strategy {
315        ExtractionStrategy::Chunked(cp) => {
316            let mut src = source::create_source(&plan.source)?;
317            let chunk_ranges = detect_and_generate_chunks(
318                &mut *src,
319                &plan.base_query,
320                &cp.column,
321                cp.chunk_size,
322                cp.chunk_count,
323                &plan.export_name,
324                cp.dense,
325                cp.by_days,
326                plan.source.source_type,
327            )?;
328            let chunk_count = chunk_ranges.len();
329            // F4 (0.7.5 audit): for chunked exports the artifact already
330            // contains the exact key span (`chunk_ranges[0].0` ..
331            // `chunk_ranges[-1].1`).  Using that as `row_estimate`
332            // beats `pg_class.reltuples` (which is `1130` on a fresh
333            // 30-row PG table because `ANALYZE` hasn't run).  This
334            // makes PG and MySQL agree on the same artifact.
335            let chunked_estimate = chunk_ranges
336                .first()
337                .zip(chunk_ranges.last())
338                .map(|(first, last)| (last.1 - first.0 + 1).max(0));
339            Ok(ComputedPlanData {
340                chunk_ranges,
341                chunk_count,
342                cursor_snapshot: None,
343                row_estimate: chunked_estimate.or(row_estimate),
344            })
345        }
346
347        ExtractionStrategy::Incremental(_) => {
348            let cursor_snapshot = state.get(&plan.export_name)?.last_cursor_value;
349            Ok(ComputedPlanData {
350                chunk_ranges: vec![],
351                chunk_count: 0,
352                cursor_snapshot,
353                row_estimate,
354            })
355        }
356
357        // Keyset pages are computed dynamically at run time (seek pagination),
358        // so there are no precomputed ranges to describe here — like a snapshot.
359        ExtractionStrategy::Snapshot
360        | ExtractionStrategy::TimeWindow { .. }
361        | ExtractionStrategy::Keyset(_) => Ok(ComputedPlanData {
362            chunk_ranges: vec![],
363            chunk_count: 0,
364            cursor_snapshot: None,
365            row_estimate,
366        }),
367    }
368}
369
370fn emit_artifacts(
371    artifacts: &[PlanArtifact],
372    format: &PlanOutputFormat,
373    multi_export: bool,
374) -> Result<()> {
375    match format {
376        PlanOutputFormat::Pretty => {
377            for artifact in artifacts {
378                artifact.print_summary();
379            }
380        }
381        // Multi-export JSON to stdout: a single export prints its object
382        // unchanged, but printing N objects back-to-back is invalid JSON (audit
383        // #L10: `jq` fails with "Extra data"). Wrap them in a JSON array so the
384        // stream parses as one document.
385        PlanOutputFormat::Json(None) => {
386            if artifacts.len() > 1 {
387                // `&[PlanArtifact]` serializes as a JSON array — one parseable
388                // document rather than N concatenated objects.
389                println!("{}", serde_json::to_string_pretty(artifacts)?);
390            } else if let Some(artifact) = artifacts.first() {
391                println!("{}", artifact.to_json_pretty()?);
392            }
393        }
394        PlanOutputFormat::Json(Some(path)) => {
395            for artifact in artifacts {
396                let json = artifact.to_json_pretty()?;
397                // For a single export keep the operator's exact path so `rivet
398                // apply <path>` works verbatim. For multiple exports a single
399                // path would overwrite (audit #4): give each export its own file.
400                let out_path = if multi_export {
401                    per_export_output_path(path, &artifact.export_name)
402                } else {
403                    path.clone()
404                };
405                std::fs::write(&out_path, &json)
406                    .map_err(|e| anyhow::anyhow!("cannot write plan file '{}': {}", out_path, e))?;
407                println!("Plan written to: {}", out_path);
408            }
409        }
410    }
411    Ok(())
412}
413
414#[cfg(test)]
415mod tests {
416    use super::per_export_output_path;
417    use std::path::Path;
418
419    /// Regression (audit #4): two distinct exports under one `--output FILE`
420    /// must derive two distinct paths, so neither is silently overwritten.
421    #[test]
422    fn per_export_paths_are_distinct() {
423        let a = per_export_output_path("/tmp/plan.json", "orders");
424        let b = per_export_output_path("/tmp/plan.json", "users");
425        assert_ne!(a, b, "distinct exports must not collide on one path");
426        assert_eq!(a, "/tmp/plan.orders.json");
427        assert_eq!(b, "/tmp/plan.users.json");
428    }
429
430    /// The original extension is preserved so `rivet apply` (and any `*.json`
431    /// glob) still find the per-export files.
432    #[test]
433    fn per_export_path_keeps_json_extension() {
434        let p = per_export_output_path("/tmp/plan.json", "orders");
435        assert_eq!(
436            Path::new(&p).extension().and_then(|e| e.to_str()),
437            Some("json"),
438            "per-export file must keep the .json extension"
439        );
440    }
441
442    /// An export name with no extension on the base still gets a distinct,
443    /// non-colliding suffix rather than overwriting the base path.
444    #[test]
445    fn per_export_path_without_extension_appends_name() {
446        let p = per_export_output_path("/tmp/plan", "orders");
447        assert_eq!(p, "/tmp/plan.orders");
448    }
449
450    /// A hostile export name (path separators, dots) must not escape the
451    /// directory or mangle the extension — every unsafe byte is replaced.
452    #[test]
453    fn per_export_path_sanitizes_unsafe_export_name() {
454        let p = per_export_output_path("/tmp/plan.json", "../../etc/passwd");
455        // No remaining path separators or `..` traversal in the derived name.
456        assert!(
457            !p.contains(".."),
458            "sanitized path must not contain a `..` traversal: {p}"
459        );
460        let file = Path::new(&p)
461            .file_name()
462            .and_then(|f| f.to_str())
463            .expect("derived path has a file name");
464        assert!(
465            !file.contains('/') && !file.contains('\\'),
466            "sanitized file name must not contain a path separator: {file}"
467        );
468        assert_eq!(
469            Path::new(&p).parent().and_then(|d| d.to_str()),
470            Some("/tmp"),
471            "derived path must stay in the requested directory"
472        );
473        assert_eq!(
474            Path::new(&p).extension().and_then(|e| e.to_str()),
475            Some("json"),
476            "extension must survive sanitization"
477        );
478    }
479}