Skip to main content

rivet/pipeline/
plan_cmd.rs

1//! **`rivet plan`** — build and display a `PlanArtifact` without executing.
2//!
3//! This module is the coordinator for the plan command. It:
4//!
5//! 1. Loads config and resolves the `ResolvedRunPlan` (same as `rivet run`).
6//! 2. Runs preflight diagnostics via `preflight::get_export_diagnostic`.
7//! 3. For `Chunked` exports: opens a source connection and pre-computes chunk
8//!    boundaries using `detect_and_generate_chunks` — no data is exported.
9//! 4. For `Incremental` exports: reads the current cursor from `StateStore`.
10//! 5. Packages everything into a `PlanArtifact` and either writes it to a file
11//!    or prints a human-readable summary to stdout.
12
13use std::collections::HashMap;
14use std::path::Path;
15
16use crate::config::Config;
17use crate::error::Result;
18use crate::plan::{
19    ComputedPlanData, ExportRecommendation, ExtractionStrategy, PlanArtifact, PlanDiagnostics,
20    PlanPrioritizationSnapshot, PrioritizationInputs, build_plan,
21    campaign::recommend_campaign,
22    inputs::{PrioritizationHints, build_prioritization_inputs},
23    recommend::recommend_export,
24    validate::{DiagnosticLevel, validate_plan},
25};
26use crate::state::StateStore;
27use crate::{preflight, source};
28
29use super::chunked::{chunk_plan_fingerprint, detect_and_generate_chunks};
30
31/// Output format for `rivet plan`.
32pub enum PlanOutputFormat {
33    /// Human-readable summary printed to stdout; no file written.
34    Pretty,
35    /// Pretty-printed JSON written to the given path (or stdout if `None`).
36    Json(Option<String>),
37}
38
39/// Entry point for `rivet plan`.
40///
41/// Iterates over all matching exports, builds a `PlanArtifact` for each, and
42/// either prints or saves it according to `format`.
43pub fn run_plan_command(
44    config_path: &str,
45    export_name: Option<&str>,
46    params: Option<&HashMap<String, String>>,
47    format: PlanOutputFormat,
48) -> Result<()> {
49    let config = Config::load_with_params(config_path, params)?;
50    let config_dir = Path::new(config_path)
51        .parent()
52        .unwrap_or_else(|| Path::new("."));
53
54    let exports: Vec<_> = if let Some(name) = export_name {
55        let e = config
56            .exports
57            .iter()
58            .find(|e| e.name == name)
59            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
60        vec![e]
61    } else {
62        config.exports.iter().collect()
63    };
64
65    let state_path = config_dir.join(".rivet_state.db");
66    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
67
68    let mut built: Vec<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> = Vec::new();
69
70    for export in exports {
71        built.push(build_plan_artifact(
72            &config,
73            export,
74            config_path,
75            config_dir,
76            params,
77            &state,
78        )?);
79    }
80
81    let campaign_opt = if built.len() > 1 {
82        let pairs: Vec<_> = built
83            .iter()
84            .map(|(_, i, r)| (i.clone(), r.clone()))
85            .collect();
86        Some(recommend_campaign(pairs))
87    } else {
88        None
89    };
90
91    // When `--output FILE` is given but the config yields more than one export,
92    // a single path cannot hold every artifact: `rivet apply` reads exactly one
93    // `PlanArtifact` per file (`PlanArtifact::from_file`), and a JSON array is
94    // not a valid artifact. Writing all of them to the same path silently kept
95    // only the last (audit #4). Instead, derive a distinct per-export path so no
96    // export is dropped and each file remains directly consumable by `apply`.
97    let multi_export = built.len() > 1;
98
99    let mut artifacts: Vec<PlanArtifact> = Vec::with_capacity(built.len());
100    for (mut artifact, _inputs, standalone_rec) in built {
101        let snap = if let Some(ref camp) = campaign_opt {
102            let rec = camp
103                .ordered_exports
104                .iter()
105                .find(|e| e.export_name == artifact.export_name)
106                .cloned()
107                .unwrap_or_else(|| standalone_rec.clone());
108            PlanPrioritizationSnapshot {
109                export_recommendation: rec,
110                campaign: Some(camp.clone()),
111            }
112        } else {
113            PlanPrioritizationSnapshot {
114                export_recommendation: standalone_rec,
115                campaign: None,
116            }
117        };
118        artifact.prioritization = Some(snap);
119        artifacts.push(artifact);
120    }
121
122    // Record the advisory wave + parallel-safety on each export in the config
123    // (in place), so the operator sees / edits them and `rivet apply` can run
124    // exports wave-by-wave, parallelizing only the cheap (low-cost) ones.
125    // `parallel_safe = cost_class Low` (< 100K rows): a heavy table already
126    // chunk-parallelizes internally, so two of them at once would overload the
127    // source — only the small / cheap exports share a concurrent wave batch.
128    let mut fields: ExportFields = HashMap::new();
129    for a in &artifacts {
130        if let Some(p) = a.prioritization.as_ref() {
131            let rec = &p.export_recommendation;
132            // ...and not flagged `isolate_on_source` by the campaign (a shared,
133            // contended source group) — the campaign owns the "run alone" call,
134            // so a cheap export there still runs on its own.
135            let parallel_safe =
136                rec.cost_class == crate::plan::CostClass::Low && !rec.isolate_on_source;
137            fields.insert(
138                a.export_name.clone(),
139                vec![
140                    ("wave", rec.recommended_wave.to_string()),
141                    ("parallel_safe", parallel_safe.to_string()),
142                ],
143            );
144        }
145    }
146    if !fields.is_empty() {
147        write_plan_fields_to_config(config_path, &fields)?;
148        log::info!(
149            "plan: recorded wave + parallel-safety for {} export(s) in {}",
150            fields.len(),
151            config_path
152        );
153    }
154
155    emit_artifacts(&artifacts, &format, multi_export, config_path)?;
156
157    Ok(())
158}
159
160/// Derive a distinct per-export output path from the `--output` value when more
161/// than one export is being planned.
162///
163/// Inserts the export name into the file stem (`plan.json` → `plan.orders.json`)
164/// so every artifact lands on its own path and `rivet apply <file>` consumes a
165/// single artifact unchanged. The export name is sanitised to a safe filename
166/// fragment (any non-alphanumeric/`-`/`_` byte becomes `_`) so an export named
167/// with a slash or dot cannot escape the intended directory or mangle the
168/// extension. The original extension is preserved so per-export files keep the
169/// same suffix (`.json`) the operator asked for.
170fn per_export_output_path(base: &str, export_name: &str) -> String {
171    let sanitized: String = export_name
172        .chars()
173        .map(|c| {
174            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
175                c
176            } else {
177                '_'
178            }
179        })
180        .collect();
181
182    let path = Path::new(base);
183    let parent = path.parent();
184    let stem = path.file_stem().and_then(|s| s.to_str());
185    let ext = path.extension().and_then(|s| s.to_str());
186
187    let file_name = match (stem, ext) {
188        (Some(stem), Some(ext)) => format!("{stem}.{sanitized}.{ext}"),
189        (Some(stem), None) => format!("{stem}.{sanitized}"),
190        // No usable stem (e.g. path was empty or just an extension): fall back to
191        // appending the sanitized name so the artifact still lands on a distinct
192        // path rather than colliding.
193        (None, _) => format!("{base}.{sanitized}"),
194    };
195
196    match parent {
197        Some(dir) if !dir.as_os_str().is_empty() => {
198            dir.join(file_name).to_string_lossy().into_owned()
199        }
200        _ => file_name,
201    }
202}
203
204fn build_plan_artifact(
205    config: &Config,
206    export: &crate::config::ExportConfig,
207    config_path: &str,
208    config_dir: &Path,
209    params: Option<&HashMap<String, String>>,
210    state: &StateStore,
211) -> Result<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> {
212    let plan = build_plan(config, export, config_dir, false, false, false, params)?;
213
214    // Collect plan-level compatibility diagnostics and emit Rejected ones as errors.
215    let validate_diags = validate_plan(&plan);
216    let mut validate_warnings: Vec<String> = Vec::new();
217    for d in &validate_diags {
218        match d.level {
219            DiagnosticLevel::Rejected => {
220                anyhow::bail!("[{}] {}", d.rule, d.message);
221            }
222            DiagnosticLevel::Warning | DiagnosticLevel::Degraded => {
223                validate_warnings.push(format!("[{}] {}", d.rule, d.message));
224            }
225        }
226    }
227
228    let (computed, plan_diagnostics, hints) = match preflight::get_export_diagnostic(config, export)
229    {
230        Ok(diag) => {
231            // The plan artifact's warnings stay flat strings (severity tags are a
232            // `rivet check` surface); take each warning's message text.
233            let mut warnings: Vec<String> =
234                diag.warnings.iter().map(|w| w.message.clone()).collect();
235            warnings.extend(validate_warnings);
236            // F3 (0.7.5 audit): the JSON artifact's `diagnostics`
237            // exposed a non-Efficient verdict with `warnings: []`, so a
238            // machine consumer could not see *why* the plan was flagged.
239            // If preflight emitted no specific warning, fall back to the
240            // build_suggestion text (the same line shown in `rivet check`)
241            // so the JSON always carries at least one human-readable
242            // reason matching the verdict.
243            if warnings.is_empty() && !matches!(diag.verdict, preflight::HealthVerdict::Efficient) {
244                if let Some(s) = diag.suggestion.clone() {
245                    warnings.push(s);
246                } else {
247                    warnings.push(format!(
248                        "verdict {} but preflight collected no specific warnings — review `rivet check` output for context",
249                        diag.verdict
250                    ));
251                }
252            }
253            // Explain *why* this strategy was chosen (mode + chunk geometry +
254            // parallelism) and its risk profile. Built from the same
255            // `ExportDiagnostic` + config the numbers above came from, so the
256            // narrative can never contradict them.
257            let strategy_rationale = crate::plan::explain_strategy(&diag, export);
258            let plan_diagnostics = PlanDiagnostics {
259                verdict: diag.verdict.to_string(),
260                warnings,
261                recommended_profile: diag.recommended_profile.to_string(),
262                strategy_rationale,
263            };
264            let computed = compute_plan_data(&plan, diag.row_estimate, state)?;
265            let hints = PrioritizationHints {
266                incremental_uses_index: diag.uses_index,
267                cursor_range_observed: diag.cursor_min.is_some() && diag.cursor_max.is_some(),
268            };
269            (computed, plan_diagnostics, hints)
270        }
271        Err(e) => {
272            log::warn!(
273                "plan '{}': preflight diagnostics failed (continuing without them): {:#}",
274                export.name,
275                e
276            );
277            let computed = compute_plan_data(&plan, None, state)?;
278            let mut warnings = vec!["preflight diagnostics unavailable".into()];
279            warnings.extend(validate_warnings);
280            let plan_diagnostics = PlanDiagnostics {
281                verdict: "unknown (preflight failed)".into(),
282                warnings,
283                recommended_profile: "balanced".into(),
284                // No diagnostic to explain from — be honest rather than fabricate
285                // a rationale from config alone (no row estimate / index facts).
286                strategy_rationale: "Strategy rationale unavailable — preflight diagnostics could \
287                     not be collected for this export."
288                    .into(),
289            };
290            (computed, plan_diagnostics, PrioritizationHints::default())
291        }
292    };
293
294    let fingerprint = match &plan.strategy {
295        ExtractionStrategy::Chunked(cp) => chunk_plan_fingerprint(
296            &plan.base_query,
297            &cp.column,
298            cp.chunk_size,
299            cp.chunk_count,
300            cp.dense,
301            cp.by_days,
302        ),
303        _ => String::new(),
304    };
305
306    // Epic I: fold recent-run history into prioritization (bounded contribution).
307    let history = match state.get_metrics(Some(&export.name), 20) {
308        Ok(metrics) => Some(crate::plan::HistorySnapshot::summarize(&metrics)),
309        Err(e) => {
310            log::warn!(
311                "plan '{}': history lookup failed; proceeding without historical refinement: {:#}",
312                export.name,
313                e
314            );
315            None
316        }
317    };
318
319    let inputs =
320        build_prioritization_inputs(export, &plan, &computed, &plan_diagnostics, hints, history);
321    let recommendation = recommend_export(&inputs, &plan_diagnostics);
322
323    let mut artifact = PlanArtifact::new(
324        plan.export_name.clone(),
325        plan.strategy.mode_label().to_string(),
326        fingerprint,
327        plan,
328        computed,
329        plan_diagnostics,
330    );
331
332    // F13 (0.7.5 audit): record the absolute config path so `rivet
333    // apply` can locate the matching `.rivet_state.db` (cursors,
334    // manifest history) even when the plan JSON is stored in a
335    // different directory.  `canonicalize` resolves symlinks and
336    // produces an absolute path; we fall back to the original string
337    // if the path no longer resolves (rare, e.g. config deleted
338    // between plan and apply).
339    artifact.config_path = Some(
340        Path::new(config_path)
341            .canonicalize()
342            .map(|p| p.to_string_lossy().into_owned())
343            .unwrap_or_else(|_| config_path.to_string()),
344    );
345
346    Ok((artifact, inputs, recommendation))
347}
348
349/// Compute the `ComputedPlanData` portion of the artifact.
350///
351/// For `Chunked` exports this opens a source connection and calls
352/// `detect_and_generate_chunks` to pre-compute chunk boundaries.  No rows are
353/// exported — we only run the `SELECT min(col) / max(col)` boundary queries.
354///
355/// For `Incremental` exports we read the last cursor value from `StateStore`.
356fn compute_plan_data(
357    plan: &crate::plan::ResolvedRunPlan,
358    row_estimate: Option<i64>,
359    state: &StateStore,
360) -> Result<ComputedPlanData> {
361    match &plan.strategy {
362        ExtractionStrategy::Chunked(cp) => {
363            let mut src = source::create_source(&plan.source)?;
364            let chunk_ranges = detect_and_generate_chunks(
365                &mut *src,
366                &plan.base_query,
367                &cp.column,
368                cp.chunk_size,
369                cp.chunk_count,
370                &plan.export_name,
371                cp.dense,
372                cp.by_days,
373                plan.source.source_type,
374            )?;
375            let chunk_count = chunk_ranges.len();
376            // F4 (0.7.5 audit): for chunked exports the artifact already
377            // contains the exact key span (`chunk_ranges[0].0` ..
378            // `chunk_ranges[-1].1`).  Using that as `row_estimate`
379            // beats `pg_class.reltuples` (which is `1130` on a fresh
380            // 30-row PG table because `ANALYZE` hasn't run).  This
381            // makes PG and MySQL agree on the same artifact.
382            let chunked_estimate = chunk_ranges
383                .first()
384                .zip(chunk_ranges.last())
385                .map(|(first, last)| (last.1 - first.0 + 1).max(0));
386            Ok(ComputedPlanData {
387                chunk_ranges,
388                chunk_count,
389                cursor_snapshot: None,
390                row_estimate: chunked_estimate.or(row_estimate),
391            })
392        }
393
394        ExtractionStrategy::Incremental(_) => {
395            let cursor_snapshot = state.get(&plan.export_name)?.last_cursor_value;
396            Ok(ComputedPlanData {
397                chunk_ranges: vec![],
398                chunk_count: 0,
399                cursor_snapshot,
400                row_estimate,
401            })
402        }
403
404        // Keyset pages are computed dynamically at run time (seek pagination),
405        // so there are no precomputed ranges to describe here — like a snapshot.
406        ExtractionStrategy::Snapshot
407        | ExtractionStrategy::TimeWindow { .. }
408        | ExtractionStrategy::Keyset(_) => Ok(ComputedPlanData {
409            chunk_ranges: vec![],
410            chunk_count: 0,
411            cursor_snapshot: None,
412            row_estimate,
413        }),
414    }
415}
416
417fn emit_artifacts(
418    artifacts: &[PlanArtifact],
419    format: &PlanOutputFormat,
420    multi_export: bool,
421    config_path: &str,
422) -> Result<()> {
423    match format {
424        PlanOutputFormat::Pretty => {
425            if multi_export {
426                // A schema scan can yield dozens of exports; the full per-export
427                // block would be hundreds of lines. Show a compact, one-line-per-
428                // export table sorted by wave, then the wave-execution hint. Use
429                // `--export <name>` for a single export's full detail.
430                print_compact_summary(artifacts, config_path);
431            } else {
432                for artifact in artifacts {
433                    artifact.print_summary();
434                }
435            }
436        }
437        // Multi-export JSON to stdout: a single export prints its object
438        // unchanged, but printing N objects back-to-back is invalid JSON (audit
439        // #L10: `jq` fails with "Extra data"). Wrap them in a JSON array so the
440        // stream parses as one document.
441        PlanOutputFormat::Json(None) => {
442            if artifacts.len() > 1 {
443                // `&[PlanArtifact]` serializes as a JSON array — one parseable
444                // document rather than N concatenated objects.
445                println!("{}", serde_json::to_string_pretty(artifacts)?);
446            } else if let Some(artifact) = artifacts.first() {
447                println!("{}", artifact.to_json_pretty()?);
448            }
449        }
450        PlanOutputFormat::Json(Some(path)) => {
451            for artifact in artifacts {
452                let json = artifact.to_json_pretty()?;
453                // For a single export keep the operator's exact path so `rivet
454                // apply <path>` works verbatim. For multiple exports a single
455                // path would overwrite (audit #4): give each export its own file.
456                let out_path = if multi_export {
457                    per_export_output_path(path, &artifact.export_name)
458                } else {
459                    path.clone()
460                };
461                std::fs::write(&out_path, &json)
462                    .map_err(|e| anyhow::anyhow!("cannot write plan file '{}': {}", out_path, e))?;
463                println!("Plan written to: {}", out_path);
464            }
465        }
466    }
467    Ok(())
468}
469
470/// Compact one-line-per-export table for a multi-export plan, sorted by wave
471/// (then by descending score, then name). The full per-export block
472/// (`print_summary`) would be hundreds of lines for a schema scan, so it is
473/// reserved for a single-export plan (`--export <name>`).
474fn print_compact_summary(artifacts: &[PlanArtifact], config_path: &str) {
475    let key = |a: &PlanArtifact| {
476        a.prioritization
477            .as_ref()
478            .map(|p| {
479                (
480                    p.export_recommendation.recommended_wave,
481                    p.export_recommendation.priority_score,
482                )
483            })
484            .unwrap_or((u32::MAX, 0))
485    };
486    let mut order: Vec<&PlanArtifact> = artifacts.iter().collect();
487    order.sort_by(|a, b| {
488        let (wa, sa) = key(a);
489        let (wb, sb) = key(b);
490        wa.cmp(&wb)
491            .then(sb.cmp(&sa))
492            .then(a.export_name.cmp(&b.export_name))
493    });
494    let name_w = order
495        .iter()
496        .map(|a| a.export_name.chars().count())
497        .max()
498        .unwrap_or(6)
499        .clamp(6, 32);
500
501    println!();
502    println!(
503        "  Plan: {} exports — `rivet apply {}` runs them by wave (lowest first)",
504        artifacts.len(),
505        config_path
506    );
507    println!();
508    println!("{}", PlanArtifact::summary_header(name_w));
509    for a in &order {
510        println!("{}", a.summary_line(name_w));
511    }
512    println!();
513    println!("  Full detail for one export:  rivet plan -c <config> --export <name>");
514}
515
516/// Per-export YAML fields that `plan` records in place (`wave`, `parallel_safe`),
517/// keyed by export name → ordered `(field, value)` pairs.
518type ExportFields = HashMap<String, Vec<(&'static str, String)>>;
519
520fn write_plan_fields_to_config(config_path: &str, fields: &ExportFields) -> Result<()> {
521    if fields.is_empty() {
522        return Ok(());
523    }
524    let original = std::fs::read_to_string(config_path).map_err(|e| {
525        anyhow::anyhow!(
526            "cannot read config '{}' to record plan fields: {}",
527            config_path,
528            e
529        )
530    })?;
531    let updated = apply_field_annotations(&original, fields);
532    if updated != original {
533        std::fs::write(config_path, updated).map_err(|e| {
534            anyhow::anyhow!(
535                "cannot write plan fields to config '{}': {}",
536                config_path,
537                e
538            )
539        })?;
540    }
541    Ok(())
542}
543
544/// Pure core of [`write_plan_fields_to_config`] (text in → text out, unit-tested).
545/// For each named export, inserts its `<field>: <value>` lines right after
546/// `- name:` and drops any stale line for those same fields.
547fn apply_field_annotations(yaml: &str, fields: &ExportFields) -> String {
548    let mut out = String::with_capacity(yaml.len() + 64);
549    // (export name, indent of the `-`, indent of the export's fields) while
550    // inside an export block; `None` between / outside exports.
551    let mut current: Option<(String, usize, usize)> = None;
552
553    for line in yaml.split_inclusive('\n') {
554        let content = line.strip_suffix('\n').unwrap_or(line);
555
556        if let Some((dash_indent, name)) = parse_export_name(content) {
557            out.push_str(line);
558            let field_indent = dash_indent + 2;
559            if let Some(items) = fields.get(&name) {
560                for (key, value) in items {
561                    out.push_str(&" ".repeat(field_indent));
562                    out.push_str(&format!("{key}: {value}\n"));
563                }
564            }
565            current = Some((name, dash_indent, field_indent));
566            continue;
567        }
568
569        if let Some((name, dash_indent, field_indent)) = current.as_ref() {
570            let trimmed = content.trim_start();
571            let indent = content.len() - trimmed.len();
572            let blank_or_comment = trimmed.is_empty() || trimmed.starts_with('#');
573            if !blank_or_comment && indent <= *dash_indent {
574                current = None; // dedented out of the export block
575            } else if indent == *field_indent
576                && let Some(items) = fields.get(name)
577                && items
578                    .iter()
579                    .any(|(key, _)| trimmed.starts_with(&format!("{key}:")))
580            {
581                continue; // drop the stale field line — the fresh one is already in
582            }
583        }
584        out.push_str(line);
585    }
586    out
587}
588
589/// Parse a `<indent>- name: <name>` export list item → `(indent_of_dash, name)`.
590/// Handles quoted names; `None` for any other line.
591fn parse_export_name(line: &str) -> Option<(usize, String)> {
592    let trimmed = line.trim_start();
593    let dash_indent = line.len() - trimmed.len();
594    let rest = trimmed.strip_prefix("- ")?;
595    let value = rest.trim_start().strip_prefix("name:")?;
596    // Drop an inline ` # …` comment (a YAML comment is a space then a hash).
597    let value = value.split(" #").next().unwrap_or(value);
598    let name = value
599        .trim()
600        .trim_matches(|c: char| c == '"' || c == '\'')
601        .to_string();
602    (!name.is_empty()).then_some((dash_indent, name))
603}
604
605#[cfg(test)]
606mod tests {
607    use super::per_export_output_path;
608    use std::path::Path;
609
610    use super::{ExportFields, apply_field_annotations};
611
612    fn wave_fields(pairs: &[(&str, u32)]) -> ExportFields {
613        pairs
614            .iter()
615            .map(|(n, w)| (n.to_string(), vec![("wave", w.to_string())]))
616            .collect()
617    }
618
619    /// Inserts `wave:` right after each `- name:`, replaces a stale one, and
620    /// leaves comments / other fields / unlisted exports untouched.
621    #[test]
622    fn wave_annotations_insert_replace_and_preserve() {
623        let yaml = "exports:\n  - name: orders   # the orders table\n    mode: incremental\n    wave: 9\n    cursor_column: updated_at\n  - name: events\n    mode: full\ndestination:\n  type: local\n";
624        let out = apply_field_annotations(yaml, &wave_fields(&[("orders", 2), ("events", 1)]));
625        // orders: stale wave 9 replaced with 2, placed after name (comment kept)
626        assert!(
627            out.contains("- name: orders   # the orders table\n    wave: 2\n"),
628            "{out}"
629        );
630        assert!(
631            !out.contains("wave: 9"),
632            "stale wave must be dropped:\n{out}"
633        );
634        // events: fresh wave 1 inserted right after name
635        assert!(out.contains("- name: events\n    wave: 1\n"), "{out}");
636        // untouched: the cursor field and the trailing top-level key
637        assert!(out.contains("cursor_column: updated_at"));
638        assert!(out.contains("destination:\n  type: local"));
639    }
640
641    /// An export whose name isn't in the map is left byte-identical.
642    #[test]
643    fn wave_annotations_leave_unlisted_export_untouched() {
644        let yaml = "exports:\n  - name: orders\n    mode: full\n";
645        let out = apply_field_annotations(yaml, &wave_fields(&[("other", 1)]));
646        assert_eq!(out, yaml);
647    }
648
649    /// Regression (audit #4): two distinct exports under one `--output FILE`
650    /// must derive two distinct paths, so neither is silently overwritten.
651    #[test]
652    fn per_export_paths_are_distinct() {
653        let a = per_export_output_path("/tmp/plan.json", "orders");
654        let b = per_export_output_path("/tmp/plan.json", "users");
655        assert_ne!(a, b, "distinct exports must not collide on one path");
656        assert_eq!(a, "/tmp/plan.orders.json");
657        assert_eq!(b, "/tmp/plan.users.json");
658    }
659
660    /// The original extension is preserved so `rivet apply` (and any `*.json`
661    /// glob) still find the per-export files.
662    #[test]
663    fn per_export_path_keeps_json_extension() {
664        let p = per_export_output_path("/tmp/plan.json", "orders");
665        assert_eq!(
666            Path::new(&p).extension().and_then(|e| e.to_str()),
667            Some("json"),
668            "per-export file must keep the .json extension"
669        );
670    }
671
672    /// An export name with no extension on the base still gets a distinct,
673    /// non-colliding suffix rather than overwriting the base path.
674    #[test]
675    fn per_export_path_without_extension_appends_name() {
676        let p = per_export_output_path("/tmp/plan", "orders");
677        assert_eq!(p, "/tmp/plan.orders");
678    }
679
680    /// A hostile export name (path separators, dots) must not escape the
681    /// directory or mangle the extension — every unsafe byte is replaced.
682    #[test]
683    fn per_export_path_sanitizes_unsafe_export_name() {
684        let p = per_export_output_path("/tmp/plan.json", "../../etc/passwd");
685        // No remaining path separators or `..` traversal in the derived name.
686        assert!(
687            !p.contains(".."),
688            "sanitized path must not contain a `..` traversal: {p}"
689        );
690        let file = Path::new(&p)
691            .file_name()
692            .and_then(|f| f.to_str())
693            .expect("derived path has a file name");
694        assert!(
695            !file.contains('/') && !file.contains('\\'),
696            "sanitized file name must not contain a path separator: {file}"
697        );
698        assert_eq!(
699            Path::new(&p).parent().and_then(|d| d.to_str()),
700            Some("/tmp"),
701            "derived path must stay in the requested directory"
702        );
703        assert_eq!(
704            Path::new(&p).extension().and_then(|e| e.to_str()),
705            Some("json"),
706            "extension must survive sanitization"
707        );
708    }
709}