Skip to main content

rivet/pipeline/
plan_cmd.rs

1//! **`rivet plan`** — build and display a `PlanArtifact` without executing.
2//!
3//! This module is the coordinator for the plan command. It:
4//!
5//! 1. Loads config and resolves the `ResolvedRunPlan` (same as `rivet run`).
6//! 2. Runs preflight diagnostics via `preflight::get_export_diagnostic`.
7//! 3. For `Chunked` exports: opens a source connection and pre-computes chunk
8//!    boundaries using `detect_and_generate_chunks` — no data is exported.
9//! 4. For `Incremental` exports: reads the current cursor from `StateStore`.
10//! 5. Packages everything into a `PlanArtifact` and either writes it to a file
11//!    or prints a human-readable summary to stdout.
12
13use std::collections::HashMap;
14use std::path::Path;
15
16use crate::config::Config;
17use crate::error::Result;
18use crate::plan::{
19    ComputedPlanData, ExportRecommendation, ExtractionStrategy, PlanArtifact, PlanDiagnostics,
20    PlanPrioritizationSnapshot, PrioritizationInputs, build_plan,
21    campaign::recommend_campaign,
22    inputs::{PrioritizationHints, build_prioritization_inputs},
23    recommend::recommend_export,
24    validate::{DiagnosticLevel, validate_plan},
25};
26use crate::state::StateStore;
27use crate::{preflight, source};
28
29use super::chunked::{chunk_plan_fingerprint, detect_and_generate_chunks};
30
31/// Output format for `rivet plan`.
32pub enum PlanOutputFormat {
33    /// Human-readable summary printed to stdout; no file written.
34    Pretty,
35    /// Pretty-printed JSON written to the given path (or stdout if `None`).
36    Json(Option<String>),
37}
38
39/// Entry point for `rivet plan`.
40///
41/// Iterates over all matching exports, builds a `PlanArtifact` for each, and
42/// either prints or saves it according to `format`.
43pub fn run_plan_command(
44    config_path: &str,
45    export_name: Option<&str>,
46    params: Option<&HashMap<String, String>>,
47    format: PlanOutputFormat,
48) -> Result<()> {
49    let config = Config::load_with_params(config_path, params)?;
50    let config_dir = Path::new(config_path)
51        .parent()
52        .unwrap_or_else(|| Path::new("."));
53
54    let exports: Vec<_> = if let Some(name) = export_name {
55        let e = config
56            .exports
57            .iter()
58            .find(|e| e.name == name)
59            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
60        vec![e]
61    } else {
62        config.exports.iter().collect()
63    };
64
65    let state_path = config_dir.join(".rivet_state.db");
66    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
67
68    let mut built: Vec<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> = Vec::new();
69
70    for export in exports {
71        built.push(build_plan_artifact(
72            &config,
73            export,
74            config_path,
75            config_dir,
76            params,
77            &state,
78        )?);
79    }
80
81    let campaign_opt = if built.len() > 1 {
82        let pairs: Vec<_> = built
83            .iter()
84            .map(|(_, i, r)| (i.clone(), r.clone()))
85            .collect();
86        Some(recommend_campaign(pairs))
87    } else {
88        None
89    };
90
91    // When `--output FILE` is given but the config yields more than one export,
92    // a single path cannot hold every artifact: `rivet apply` reads exactly one
93    // `PlanArtifact` per file (`PlanArtifact::from_file`), and a JSON array is
94    // not a valid artifact. Writing all of them to the same path silently kept
95    // only the last (audit #4). Instead, derive a distinct per-export path so no
96    // export is dropped and each file remains directly consumable by `apply`.
97    let multi_export = built.len() > 1;
98
99    let mut artifacts: Vec<PlanArtifact> = Vec::with_capacity(built.len());
100    for (mut artifact, _inputs, standalone_rec) in built {
101        let snap = if let Some(ref camp) = campaign_opt {
102            let rec = camp
103                .ordered_exports
104                .iter()
105                .find(|e| e.export_name == artifact.export_name)
106                .cloned()
107                .unwrap_or_else(|| standalone_rec.clone());
108            PlanPrioritizationSnapshot {
109                export_recommendation: rec,
110                campaign: Some(camp.clone()),
111            }
112        } else {
113            PlanPrioritizationSnapshot {
114                export_recommendation: standalone_rec,
115                campaign: None,
116            }
117        };
118        artifact.prioritization = Some(snap);
119        artifacts.push(artifact);
120    }
121
122    // Record the advisory wave + parallel-safety on each export in the config
123    // (in place), so the operator sees / edits them and `rivet apply` can run
124    // exports wave-by-wave, parallelizing only the cheap (low-cost) ones.
125    // `parallel_safe = cost_class Low` (< 100K rows): a heavy table already
126    // chunk-parallelizes internally, so two of them at once would overload the
127    // source — only the small / cheap exports share a concurrent wave batch.
128    let mut fields: ExportFields = HashMap::new();
129    for a in &artifacts {
130        if let Some(p) = a.prioritization.as_ref() {
131            let rec = &p.export_recommendation;
132            // ...and not flagged `isolate_on_source` by the campaign (a shared,
133            // contended source group) — the campaign owns the "run alone" call,
134            // so a cheap export there still runs on its own.
135            let parallel_safe =
136                rec.cost_class == crate::plan::CostClass::Low && !rec.isolate_on_source;
137            fields.insert(
138                a.export_name.clone(),
139                vec![
140                    ("wave", rec.recommended_wave.to_string()),
141                    ("parallel_safe", parallel_safe.to_string()),
142                ],
143            );
144        }
145    }
146    if !fields.is_empty() {
147        write_plan_fields_to_config(config_path, &fields)?;
148        log::info!(
149            "plan: recorded wave + parallel-safety for {} export(s) in {}",
150            fields.len(),
151            config_path
152        );
153    }
154
155    emit_artifacts(&artifacts, &format, multi_export, config_path)?;
156
157    Ok(())
158}
159
160/// Derive a distinct per-export output path from the `--output` value when more
161/// than one export is being planned.
162///
163/// Inserts the export name into the file stem (`plan.json` → `plan.orders.json`)
164/// so every artifact lands on its own path and `rivet apply <file>` consumes a
165/// single artifact unchanged. The export name is sanitised to a safe filename
166/// fragment (any non-alphanumeric/`-`/`_` byte becomes `_`) so an export named
167/// with a slash or dot cannot escape the intended directory or mangle the
168/// extension. The original extension is preserved so per-export files keep the
169/// same suffix (`.json`) the operator asked for.
170fn per_export_output_path(base: &str, export_name: &str) -> String {
171    let sanitized: String = export_name
172        .chars()
173        .map(|c| {
174            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
175                c
176            } else {
177                '_'
178            }
179        })
180        .collect();
181
182    let path = Path::new(base);
183    let parent = path.parent();
184    let stem = path.file_stem().and_then(|s| s.to_str());
185    let ext = path.extension().and_then(|s| s.to_str());
186
187    let file_name = match (stem, ext) {
188        (Some(stem), Some(ext)) => format!("{stem}.{sanitized}.{ext}"),
189        (Some(stem), None) => format!("{stem}.{sanitized}"),
190        // No usable stem (e.g. path was empty or just an extension): fall back to
191        // appending the sanitized name so the artifact still lands on a distinct
192        // path rather than colliding.
193        (None, _) => format!("{base}.{sanitized}"),
194    };
195
196    match parent {
197        Some(dir) if !dir.as_os_str().is_empty() => {
198            dir.join(file_name).to_string_lossy().into_owned()
199        }
200        _ => file_name,
201    }
202}
203
204fn build_plan_artifact(
205    config: &Config,
206    export: &crate::config::ExportConfig,
207    config_path: &str,
208    config_dir: &Path,
209    params: Option<&HashMap<String, String>>,
210    state: &StateStore,
211) -> Result<(PlanArtifact, PrioritizationInputs, ExportRecommendation)> {
212    let plan = build_plan(config, export, config_dir, false, false, false, params)?;
213
214    // Collect plan-level compatibility diagnostics and emit Rejected ones as errors.
215    let validate_diags = validate_plan(&plan);
216    let mut validate_warnings: Vec<String> = Vec::new();
217    for d in &validate_diags {
218        match d.level {
219            DiagnosticLevel::Rejected => {
220                anyhow::bail!("[{}] {}", d.rule, d.message);
221            }
222            DiagnosticLevel::Warning | DiagnosticLevel::Degraded => {
223                validate_warnings.push(format!("[{}] {}", d.rule, d.message));
224            }
225        }
226    }
227
228    let (computed, plan_diagnostics, hints) = match preflight::get_export_diagnostic(config, export)
229    {
230        Ok(diag) => {
231            let mut warnings = diag.warnings.clone();
232            warnings.extend(validate_warnings);
233            // F3 (0.7.5 audit): the JSON artifact's `diagnostics`
234            // exposed a non-Efficient verdict with `warnings: []`, so a
235            // machine consumer could not see *why* the plan was flagged.
236            // If preflight emitted no specific warning, fall back to the
237            // build_suggestion text (the same line shown in `rivet check`)
238            // so the JSON always carries at least one human-readable
239            // reason matching the verdict.
240            if warnings.is_empty() && !matches!(diag.verdict, preflight::HealthVerdict::Efficient) {
241                if let Some(s) = diag.suggestion.clone() {
242                    warnings.push(s);
243                } else {
244                    warnings.push(format!(
245                        "verdict {} but preflight collected no specific warnings — review `rivet check` output for context",
246                        diag.verdict
247                    ));
248                }
249            }
250            // Explain *why* this strategy was chosen (mode + chunk geometry +
251            // parallelism) and its risk profile. Built from the same
252            // `ExportDiagnostic` + config the numbers above came from, so the
253            // narrative can never contradict them.
254            let strategy_rationale = crate::plan::explain_strategy(&diag, export);
255            let plan_diagnostics = PlanDiagnostics {
256                verdict: diag.verdict.to_string(),
257                warnings,
258                recommended_profile: diag.recommended_profile.to_string(),
259                strategy_rationale,
260            };
261            let computed = compute_plan_data(&plan, diag.row_estimate, state)?;
262            let hints = PrioritizationHints {
263                incremental_uses_index: diag.uses_index,
264                cursor_range_observed: diag.cursor_min.is_some() && diag.cursor_max.is_some(),
265            };
266            (computed, plan_diagnostics, hints)
267        }
268        Err(e) => {
269            log::warn!(
270                "plan '{}': preflight diagnostics failed (continuing without them): {:#}",
271                export.name,
272                e
273            );
274            let computed = compute_plan_data(&plan, None, state)?;
275            let mut warnings = vec!["preflight diagnostics unavailable".into()];
276            warnings.extend(validate_warnings);
277            let plan_diagnostics = PlanDiagnostics {
278                verdict: "unknown (preflight failed)".into(),
279                warnings,
280                recommended_profile: "balanced".into(),
281                // No diagnostic to explain from — be honest rather than fabricate
282                // a rationale from config alone (no row estimate / index facts).
283                strategy_rationale: "Strategy rationale unavailable — preflight diagnostics could \
284                     not be collected for this export."
285                    .into(),
286            };
287            (computed, plan_diagnostics, PrioritizationHints::default())
288        }
289    };
290
291    let fingerprint = match &plan.strategy {
292        ExtractionStrategy::Chunked(cp) => chunk_plan_fingerprint(
293            &plan.base_query,
294            &cp.column,
295            cp.chunk_size,
296            cp.chunk_count,
297            cp.dense,
298            cp.by_days,
299        ),
300        _ => String::new(),
301    };
302
303    // Epic I: fold recent-run history into prioritization (bounded contribution).
304    let history = match state.get_metrics(Some(&export.name), 20) {
305        Ok(metrics) => Some(crate::plan::HistorySnapshot::summarize(&metrics)),
306        Err(e) => {
307            log::warn!(
308                "plan '{}': history lookup failed; proceeding without historical refinement: {:#}",
309                export.name,
310                e
311            );
312            None
313        }
314    };
315
316    let inputs =
317        build_prioritization_inputs(export, &plan, &computed, &plan_diagnostics, hints, history);
318    let recommendation = recommend_export(&inputs, &plan_diagnostics);
319
320    let mut artifact = PlanArtifact::new(
321        plan.export_name.clone(),
322        plan.strategy.mode_label().to_string(),
323        fingerprint,
324        plan,
325        computed,
326        plan_diagnostics,
327    );
328
329    // F13 (0.7.5 audit): record the absolute config path so `rivet
330    // apply` can locate the matching `.rivet_state.db` (cursors,
331    // manifest history) even when the plan JSON is stored in a
332    // different directory.  `canonicalize` resolves symlinks and
333    // produces an absolute path; we fall back to the original string
334    // if the path no longer resolves (rare, e.g. config deleted
335    // between plan and apply).
336    artifact.config_path = Some(
337        Path::new(config_path)
338            .canonicalize()
339            .map(|p| p.to_string_lossy().into_owned())
340            .unwrap_or_else(|_| config_path.to_string()),
341    );
342
343    Ok((artifact, inputs, recommendation))
344}
345
346/// Compute the `ComputedPlanData` portion of the artifact.
347///
348/// For `Chunked` exports this opens a source connection and calls
349/// `detect_and_generate_chunks` to pre-compute chunk boundaries.  No rows are
350/// exported — we only run the `SELECT min(col) / max(col)` boundary queries.
351///
352/// For `Incremental` exports we read the last cursor value from `StateStore`.
353fn compute_plan_data(
354    plan: &crate::plan::ResolvedRunPlan,
355    row_estimate: Option<i64>,
356    state: &StateStore,
357) -> Result<ComputedPlanData> {
358    match &plan.strategy {
359        ExtractionStrategy::Chunked(cp) => {
360            let mut src = source::create_source(&plan.source)?;
361            let chunk_ranges = detect_and_generate_chunks(
362                &mut *src,
363                &plan.base_query,
364                &cp.column,
365                cp.chunk_size,
366                cp.chunk_count,
367                &plan.export_name,
368                cp.dense,
369                cp.by_days,
370                plan.source.source_type,
371            )?;
372            let chunk_count = chunk_ranges.len();
373            // F4 (0.7.5 audit): for chunked exports the artifact already
374            // contains the exact key span (`chunk_ranges[0].0` ..
375            // `chunk_ranges[-1].1`).  Using that as `row_estimate`
376            // beats `pg_class.reltuples` (which is `1130` on a fresh
377            // 30-row PG table because `ANALYZE` hasn't run).  This
378            // makes PG and MySQL agree on the same artifact.
379            let chunked_estimate = chunk_ranges
380                .first()
381                .zip(chunk_ranges.last())
382                .map(|(first, last)| (last.1 - first.0 + 1).max(0));
383            Ok(ComputedPlanData {
384                chunk_ranges,
385                chunk_count,
386                cursor_snapshot: None,
387                row_estimate: chunked_estimate.or(row_estimate),
388            })
389        }
390
391        ExtractionStrategy::Incremental(_) => {
392            let cursor_snapshot = state.get(&plan.export_name)?.last_cursor_value;
393            Ok(ComputedPlanData {
394                chunk_ranges: vec![],
395                chunk_count: 0,
396                cursor_snapshot,
397                row_estimate,
398            })
399        }
400
401        // Keyset pages are computed dynamically at run time (seek pagination),
402        // so there are no precomputed ranges to describe here — like a snapshot.
403        ExtractionStrategy::Snapshot
404        | ExtractionStrategy::TimeWindow { .. }
405        | ExtractionStrategy::Keyset(_) => Ok(ComputedPlanData {
406            chunk_ranges: vec![],
407            chunk_count: 0,
408            cursor_snapshot: None,
409            row_estimate,
410        }),
411    }
412}
413
414fn emit_artifacts(
415    artifacts: &[PlanArtifact],
416    format: &PlanOutputFormat,
417    multi_export: bool,
418    config_path: &str,
419) -> Result<()> {
420    match format {
421        PlanOutputFormat::Pretty => {
422            if multi_export {
423                // A schema scan can yield dozens of exports; the full per-export
424                // block would be hundreds of lines. Show a compact, one-line-per-
425                // export table sorted by wave, then the wave-execution hint. Use
426                // `--export <name>` for a single export's full detail.
427                print_compact_summary(artifacts, config_path);
428            } else {
429                for artifact in artifacts {
430                    artifact.print_summary();
431                }
432            }
433        }
434        // Multi-export JSON to stdout: a single export prints its object
435        // unchanged, but printing N objects back-to-back is invalid JSON (audit
436        // #L10: `jq` fails with "Extra data"). Wrap them in a JSON array so the
437        // stream parses as one document.
438        PlanOutputFormat::Json(None) => {
439            if artifacts.len() > 1 {
440                // `&[PlanArtifact]` serializes as a JSON array — one parseable
441                // document rather than N concatenated objects.
442                println!("{}", serde_json::to_string_pretty(artifacts)?);
443            } else if let Some(artifact) = artifacts.first() {
444                println!("{}", artifact.to_json_pretty()?);
445            }
446        }
447        PlanOutputFormat::Json(Some(path)) => {
448            for artifact in artifacts {
449                let json = artifact.to_json_pretty()?;
450                // For a single export keep the operator's exact path so `rivet
451                // apply <path>` works verbatim. For multiple exports a single
452                // path would overwrite (audit #4): give each export its own file.
453                let out_path = if multi_export {
454                    per_export_output_path(path, &artifact.export_name)
455                } else {
456                    path.clone()
457                };
458                std::fs::write(&out_path, &json)
459                    .map_err(|e| anyhow::anyhow!("cannot write plan file '{}': {}", out_path, e))?;
460                println!("Plan written to: {}", out_path);
461            }
462        }
463    }
464    Ok(())
465}
466
467/// Compact one-line-per-export table for a multi-export plan, sorted by wave
468/// (then by descending score, then name). The full per-export block
469/// (`print_summary`) would be hundreds of lines for a schema scan, so it is
470/// reserved for a single-export plan (`--export <name>`).
471fn print_compact_summary(artifacts: &[PlanArtifact], config_path: &str) {
472    let key = |a: &PlanArtifact| {
473        a.prioritization
474            .as_ref()
475            .map(|p| {
476                (
477                    p.export_recommendation.recommended_wave,
478                    p.export_recommendation.priority_score,
479                )
480            })
481            .unwrap_or((u32::MAX, 0))
482    };
483    let mut order: Vec<&PlanArtifact> = artifacts.iter().collect();
484    order.sort_by(|a, b| {
485        let (wa, sa) = key(a);
486        let (wb, sb) = key(b);
487        wa.cmp(&wb)
488            .then(sb.cmp(&sa))
489            .then(a.export_name.cmp(&b.export_name))
490    });
491    let name_w = order
492        .iter()
493        .map(|a| a.export_name.chars().count())
494        .max()
495        .unwrap_or(6)
496        .clamp(6, 32);
497
498    println!();
499    println!(
500        "  Plan: {} exports — `rivet apply {}` runs them by wave (lowest first)",
501        artifacts.len(),
502        config_path
503    );
504    println!();
505    println!("{}", PlanArtifact::summary_header(name_w));
506    for a in &order {
507        println!("{}", a.summary_line(name_w));
508    }
509    println!();
510    println!("  Full detail for one export:  rivet plan -c <config> --export <name>");
511}
512
513/// Per-export YAML fields that `plan` records in place (`wave`, `parallel_safe`),
514/// keyed by export name → ordered `(field, value)` pairs.
515type ExportFields = HashMap<String, Vec<(&'static str, String)>>;
516
517fn write_plan_fields_to_config(config_path: &str, fields: &ExportFields) -> Result<()> {
518    if fields.is_empty() {
519        return Ok(());
520    }
521    let original = std::fs::read_to_string(config_path).map_err(|e| {
522        anyhow::anyhow!(
523            "cannot read config '{}' to record plan fields: {}",
524            config_path,
525            e
526        )
527    })?;
528    let updated = apply_field_annotations(&original, fields);
529    if updated != original {
530        std::fs::write(config_path, updated).map_err(|e| {
531            anyhow::anyhow!(
532                "cannot write plan fields to config '{}': {}",
533                config_path,
534                e
535            )
536        })?;
537    }
538    Ok(())
539}
540
541/// Pure core of [`write_plan_fields_to_config`] (text in → text out, unit-tested).
542/// For each named export, inserts its `<field>: <value>` lines right after
543/// `- name:` and drops any stale line for those same fields.
544fn apply_field_annotations(yaml: &str, fields: &ExportFields) -> String {
545    let mut out = String::with_capacity(yaml.len() + 64);
546    // (export name, indent of the `-`, indent of the export's fields) while
547    // inside an export block; `None` between / outside exports.
548    let mut current: Option<(String, usize, usize)> = None;
549
550    for line in yaml.split_inclusive('\n') {
551        let content = line.strip_suffix('\n').unwrap_or(line);
552
553        if let Some((dash_indent, name)) = parse_export_name(content) {
554            out.push_str(line);
555            let field_indent = dash_indent + 2;
556            if let Some(items) = fields.get(&name) {
557                for (key, value) in items {
558                    out.push_str(&" ".repeat(field_indent));
559                    out.push_str(&format!("{key}: {value}\n"));
560                }
561            }
562            current = Some((name, dash_indent, field_indent));
563            continue;
564        }
565
566        if let Some((name, dash_indent, field_indent)) = current.as_ref() {
567            let trimmed = content.trim_start();
568            let indent = content.len() - trimmed.len();
569            let blank_or_comment = trimmed.is_empty() || trimmed.starts_with('#');
570            if !blank_or_comment && indent <= *dash_indent {
571                current = None; // dedented out of the export block
572            } else if indent == *field_indent
573                && let Some(items) = fields.get(name)
574                && items
575                    .iter()
576                    .any(|(key, _)| trimmed.starts_with(&format!("{key}:")))
577            {
578                continue; // drop the stale field line — the fresh one is already in
579            }
580        }
581        out.push_str(line);
582    }
583    out
584}
585
586/// Parse a `<indent>- name: <name>` export list item → `(indent_of_dash, name)`.
587/// Handles quoted names; `None` for any other line.
588fn parse_export_name(line: &str) -> Option<(usize, String)> {
589    let trimmed = line.trim_start();
590    let dash_indent = line.len() - trimmed.len();
591    let rest = trimmed.strip_prefix("- ")?;
592    let value = rest.trim_start().strip_prefix("name:")?;
593    // Drop an inline ` # …` comment (a YAML comment is a space then a hash).
594    let value = value.split(" #").next().unwrap_or(value);
595    let name = value
596        .trim()
597        .trim_matches(|c: char| c == '"' || c == '\'')
598        .to_string();
599    (!name.is_empty()).then_some((dash_indent, name))
600}
601
602#[cfg(test)]
603mod tests {
604    use super::per_export_output_path;
605    use std::path::Path;
606
607    use super::{ExportFields, apply_field_annotations};
608
609    fn wave_fields(pairs: &[(&str, u32)]) -> ExportFields {
610        pairs
611            .iter()
612            .map(|(n, w)| (n.to_string(), vec![("wave", w.to_string())]))
613            .collect()
614    }
615
616    /// Inserts `wave:` right after each `- name:`, replaces a stale one, and
617    /// leaves comments / other fields / unlisted exports untouched.
618    #[test]
619    fn wave_annotations_insert_replace_and_preserve() {
620        let yaml = "exports:\n  - name: orders   # the orders table\n    mode: incremental\n    wave: 9\n    cursor_column: updated_at\n  - name: events\n    mode: full\ndestination:\n  type: local\n";
621        let out = apply_field_annotations(yaml, &wave_fields(&[("orders", 2), ("events", 1)]));
622        // orders: stale wave 9 replaced with 2, placed after name (comment kept)
623        assert!(
624            out.contains("- name: orders   # the orders table\n    wave: 2\n"),
625            "{out}"
626        );
627        assert!(
628            !out.contains("wave: 9"),
629            "stale wave must be dropped:\n{out}"
630        );
631        // events: fresh wave 1 inserted right after name
632        assert!(out.contains("- name: events\n    wave: 1\n"), "{out}");
633        // untouched: the cursor field and the trailing top-level key
634        assert!(out.contains("cursor_column: updated_at"));
635        assert!(out.contains("destination:\n  type: local"));
636    }
637
638    /// An export whose name isn't in the map is left byte-identical.
639    #[test]
640    fn wave_annotations_leave_unlisted_export_untouched() {
641        let yaml = "exports:\n  - name: orders\n    mode: full\n";
642        let out = apply_field_annotations(yaml, &wave_fields(&[("other", 1)]));
643        assert_eq!(out, yaml);
644    }
645
646    /// Regression (audit #4): two distinct exports under one `--output FILE`
647    /// must derive two distinct paths, so neither is silently overwritten.
648    #[test]
649    fn per_export_paths_are_distinct() {
650        let a = per_export_output_path("/tmp/plan.json", "orders");
651        let b = per_export_output_path("/tmp/plan.json", "users");
652        assert_ne!(a, b, "distinct exports must not collide on one path");
653        assert_eq!(a, "/tmp/plan.orders.json");
654        assert_eq!(b, "/tmp/plan.users.json");
655    }
656
657    /// The original extension is preserved so `rivet apply` (and any `*.json`
658    /// glob) still find the per-export files.
659    #[test]
660    fn per_export_path_keeps_json_extension() {
661        let p = per_export_output_path("/tmp/plan.json", "orders");
662        assert_eq!(
663            Path::new(&p).extension().and_then(|e| e.to_str()),
664            Some("json"),
665            "per-export file must keep the .json extension"
666        );
667    }
668
669    /// An export name with no extension on the base still gets a distinct,
670    /// non-colliding suffix rather than overwriting the base path.
671    #[test]
672    fn per_export_path_without_extension_appends_name() {
673        let p = per_export_output_path("/tmp/plan", "orders");
674        assert_eq!(p, "/tmp/plan.orders");
675    }
676
677    /// A hostile export name (path separators, dots) must not escape the
678    /// directory or mangle the extension — every unsafe byte is replaced.
679    #[test]
680    fn per_export_path_sanitizes_unsafe_export_name() {
681        let p = per_export_output_path("/tmp/plan.json", "../../etc/passwd");
682        // No remaining path separators or `..` traversal in the derived name.
683        assert!(
684            !p.contains(".."),
685            "sanitized path must not contain a `..` traversal: {p}"
686        );
687        let file = Path::new(&p)
688            .file_name()
689            .and_then(|f| f.to_str())
690            .expect("derived path has a file name");
691        assert!(
692            !file.contains('/') && !file.contains('\\'),
693            "sanitized file name must not contain a path separator: {file}"
694        );
695        assert_eq!(
696            Path::new(&p).parent().and_then(|d| d.to_str()),
697            Some("/tmp"),
698            "derived path must stay in the requested directory"
699        );
700        assert_eq!(
701            Path::new(&p).extension().and_then(|e| e.to_str()),
702            Some("json"),
703            "extension must survive sanitization"
704        );
705    }
706}