Skip to main content

rivet/pipeline/
apply_cmd.rs

1//! **`rivet apply`** — execute a previously-generated `PlanArtifact`.
2//!
3//! Responsibilities:
4//!
5//! 1. Deserialize `PlanArtifact` from the plan JSON file.
6//! 2. Check staleness (warn > 1 h, error > 24 h unless `--force`).
7//! 3. For `Incremental`: validate cursor has not drifted since plan time.
8//! 4. Execute using `ChunkSource::Precomputed` so chunk boundaries from the
9//!    artifact are replayed without re-running `SELECT min/max` queries.
10//! 5. Persist outcomes (manifest, cursor, metrics) via `StateStore` as usual.
11
12use chrono::Duration;
13use std::path::Path;
14
15use crate::error::Result;
16use crate::plan::{PlanArtifact, StalenessCheck};
17use crate::state::StateStore;
18
19use super::chunked::ChunkSource;
20use super::summary::ApplyContext;
21
22/// Entry point for `rivet apply <plan-file> [--parallel-export-processes] [--resume] [--force]`.
23pub fn run_apply_command(plan_file: &str, force: bool, parallel: bool, resume: bool) -> Result<()> {
24    // A YAML config selects the wave-ordered multi-export path (plan→apply
25    // cycle): run every export wave-by-wave in ascending `wave:` order. A JSON
26    // plan artifact falls through to the sealed single-export replay below.
27    if plan_file.ends_with(".yaml") || plan_file.ends_with(".yml") {
28        return super::run::run_waves(plan_file, force, parallel, resume);
29    }
30    if parallel || resume {
31        log::warn!(
32            "--parallel-export-processes / --resume apply only to wave-ordered config execution; ignored for a sealed plan artifact"
33        );
34    }
35
36    // 1. Load artifact
37    let artifact = PlanArtifact::from_file(plan_file)?;
38
39    // 1a. Tamper-evidence (ADR-0005 PA10, finding #16): reject a plan whose
40    //     `resolved_plan` was edited after planning before we run any query.
41    //     Unlike staleness/cursor-drift this is NOT bypassable by --force —
42    //     a hand-edited execution contract is never something the operator
43    //     can opt into; the only correct recovery is to re-run `rivet plan`.
44    artifact.verify_integrity()?;
45
46    // 1b. Un-appliable inline-url plan (finding #17): a plan generated from an
47    //     inline `url:` config has its credentials redacted to `REDACTED@…`
48    //     with no env/file reference to re-resolve them, so apply would die
49    //     deep in the driver with an opaque auth/"password missing" error.
50    //     Catch it here with the exact remedy.
51    reject_unrecoverable_inline_url(&artifact)?;
52
53    // Track which preflight checks --force actually overrode for this run.
54    // Threaded into RunSummary.apply_context so the run report carries an
55    // auditable record of bypassed gates (finding F5 of the 0.7.5 audit).
56    let mut force_bypassed: Vec<String> = Vec::new();
57
58    // 2. Staleness check
59    let warn_threshold = Duration::hours(1);
60    let error_threshold = Duration::hours(24);
61    match artifact.staleness(warn_threshold, error_threshold) {
62        StalenessCheck::Fresh => {}
63        StalenessCheck::StaleWarn(age) => {
64            log::warn!(
65                "plan '{}' is {} minutes old — consider regenerating with `rivet plan`",
66                artifact.export_name,
67                age.num_minutes()
68            );
69        }
70        StalenessCheck::StaleError(age) => {
71            // F6 (0.7.5 audit): "56035 hours old" is mathematically
72            // correct but unreadable.  For ages above 48 h switch to
73            // days + the actual `created_at` date; below that keep
74            // hours so the boundary right above 24 h reads cleanly.
75            let age_phrase = if age.num_hours() >= 48 {
76                format!(
77                    "{} days old (created {})",
78                    age.num_days(),
79                    artifact.created_at.format("%Y-%m-%d")
80                )
81            } else {
82                format!("{} hours old", age.num_hours())
83            };
84            if !force {
85                anyhow::bail!(
86                    "plan '{}' is {} (limit: 24 h). Regenerate with `rivet plan` or pass --force to override.",
87                    artifact.export_name,
88                    age_phrase,
89                );
90            }
91            force_bypassed.push("staleness".into());
92            log::warn!(
93                "plan '{}': ignoring staleness ({}) because --force was passed",
94                artifact.export_name,
95                age_phrase,
96            );
97        }
98    }
99
100    // 3. Open StateStore — F13 (0.7.5 audit) resolution policy:
101    //    a) If the artifact recorded the original config path AND that
102    //       directory exists, open state next to it.  This is the only
103    //       path that keeps `apply` consistent with `rivet run`'s state
104    //       location, so cursors, manifests, and schema history line up.
105    //    b) Otherwise fall back to the plan file's own directory — the
106    //       pre-0.7.5 behaviour, which is the right answer when the
107    //       config has been deleted or moved and the operator just
108    //       wants to replay the artifact in isolation.
109    //
110    //    The fallback path also emits a WARN so the operator notices
111    //    the divergence (e.g. plan files stored separately from
112    //    configs without explicit intent).
113    let plan_dir = Path::new(plan_file)
114        .parent()
115        .unwrap_or_else(|| Path::new("."));
116    let state_dir = match artifact
117        .config_path
118        .as_deref()
119        .map(Path::new)
120        .and_then(Path::parent)
121    {
122        Some(dir) if dir.exists() => dir.to_path_buf(),
123        Some(dir) => {
124            log::warn!(
125                "plan '{}': original config dir '{}' no longer exists; opening state next to plan file instead. \
126                 Cursors and manifest history from the original run will not be visible.",
127                artifact.export_name,
128                dir.display(),
129            );
130            plan_dir.to_path_buf()
131        }
132        None => {
133            log::warn!(
134                "plan '{}': artifact has no recorded config path (pre-0.7.5 plan?). \
135                 Opening state next to the plan file; this may diverge from the state \
136                 used by `rivet run` for the same config.",
137                artifact.export_name,
138            );
139            plan_dir.to_path_buf()
140        }
141    };
142    let state_path = state_dir.join(".rivet_state.db");
143    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
144
145    // 4. Cursor drift check for Incremental exports
146    //
147    // Finding F1 of the 0.7.5 audit: the error message has always told the
148    // user "pass --force to skip this check", but until 0.7.5 the code did
149    // not actually honour `--force` here.  The flag now bypasses the bail
150    // with a WARN log, matching the documented contract and the analogous
151    // staleness path above.
152    if artifact.computed.cursor_snapshot.is_some() {
153        let current = state.get(&artifact.export_name)?.last_cursor_value;
154        if !artifact.cursor_matches(current.as_deref()) {
155            if !force {
156                anyhow::bail!(
157                    "plan '{}': cursor has drifted since plan was generated \
158                     (plan snapshot: {:?}, current: {:?}). \
159                     Regenerate with `rivet plan` or pass --force to skip this check.",
160                    artifact.export_name,
161                    artifact.computed.cursor_snapshot,
162                    current,
163                );
164            }
165            force_bypassed.push("cursor_drift".into());
166            log::warn!(
167                "plan '{}': cursor has drifted (plan snapshot: {:?}, current: {:?}) — \
168                 proceeding because --force was passed",
169                artifact.export_name,
170                artifact.computed.cursor_snapshot,
171                current,
172            );
173        }
174    }
175
176    // 5. Build ChunkSource from the pre-computed ranges in the artifact
177    let chunk_source = if artifact.computed.chunk_ranges.is_empty() {
178        ChunkSource::Detect
179    } else {
180        ChunkSource::Precomputed(artifact.computed.chunk_ranges.clone())
181    };
182
183    // 6. Execute using the plan from the artifact, carrying the apply audit
184    // context into the run report (F5).
185    let apply_context = ApplyContext {
186        plan_id: artifact.plan_id.clone(),
187        forced: force,
188        force_bypassed,
189    };
190    let plan = artifact.resolved_plan.clone();
191    super::run_export_job_with_chunk_source(
192        &plan,
193        &state,
194        chunk_source,
195        plan_file,
196        Some(apply_context),
197    )
198}
199
200/// Finding #17: reject — with an actionable remedy — a plan whose source
201/// credentials were stripped at plan time (inline `url:`) and cannot be
202/// re-resolved at apply time.
203///
204/// `PlanArtifact::new` redacts plaintext credentials (PA9): an inline
205/// `url: "postgresql://user:pass@host/db"` becomes `postgresql://REDACTED@host/db`.
206/// That is recoverable only if the source *also* carries a reference apply can
207/// resolve (`url_env` / `url_file` / `password_env`, or structured `host`+`user`).
208/// When none of those exist, connecting later fails deep in the driver with an
209/// opaque auth error; surface the fix instead.
210fn reject_unrecoverable_inline_url(artifact: &PlanArtifact) -> Result<()> {
211    let source = &artifact.resolved_plan.source;
212
213    let url_redacted = source
214        .url
215        .as_deref()
216        .is_some_and(|u| u.contains("REDACTED@"));
217    if !url_redacted {
218        return Ok(());
219    }
220
221    // A redacted URL is still appliable if apply has another way to obtain
222    // credentials: an env/file URL reference, an env password, or enough
223    // structured fields to rebuild the connection.
224    let has_recovery = source.url_env.is_some()
225        || source.url_file.is_some()
226        || source.password_env.is_some()
227        || (source.host.is_some() && source.user.is_some());
228    if has_recovery {
229        return Ok(());
230    }
231
232    anyhow::bail!(
233        "plan '{}': source credentials were stripped from this artifact and cannot be \
234         recovered at apply time — the plan was created from an inline `url:` config, whose \
235         password is never persisted.\n  \
236         Fix: re-plan from a config that uses `url_env: <VAR>` (or `url_file:`) so `rivet apply` \
237         can resolve credentials, e.g.\n    \
238         source:\n      type: {}\n      url_env: DATABASE_URL\n  \
239         then `export DATABASE_URL=...` before running apply.",
240        artifact.export_name,
241        match artifact.resolved_plan.source.source_type {
242            crate::config::SourceType::Postgres => "postgres",
243            crate::config::SourceType::Mysql => "mysql",
244            crate::config::SourceType::Mssql => "mssql",
245        },
246    )
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252    use crate::config::{
253        CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
254        SourceType,
255    };
256    use crate::plan::{
257        ComputedPlanData, ExtractionStrategy, PlanArtifact, PlanDiagnostics, ResolvedRunPlan,
258    };
259    use crate::tuning::SourceTuning;
260    use chrono::{Duration, Utc};
261
262    fn unreachable_plan() -> ResolvedRunPlan {
263        ResolvedRunPlan {
264            export_name: "orders".into(),
265            base_query: "SELECT 1".into(),
266            strategy: ExtractionStrategy::Snapshot,
267            format: FormatType::Parquet,
268            compression: CompressionType::Zstd,
269            compression_level: None,
270            max_file_size_bytes: None,
271            skip_empty: false,
272            meta_columns: MetaColumns::default(),
273            destination: DestinationConfig {
274                destination_type: DestinationType::Local,
275                path: Some("/tmp/rivet_apply_test".into()),
276                ..Default::default()
277            },
278            quality: None,
279            tuning: SourceTuning::from_config(None),
280            tuning_profile_label: "balanced".into(),
281            validate: false,
282            reconcile: false,
283            resume: false,
284            // Deliberately unreachable — tests that fail early won't try to
285            // connect. No userinfo in the URL: nothing to redact, so the
286            // finding-#17 gate (`reject_unrecoverable_inline_url`) does not fire
287            // and these fixtures still exercise the staleness / cursor / driver
288            // paths they were written for. A credentialed URL here would be
289            // rewritten to `REDACTED@…` by PA9 redaction and then rejected by
290            // #17 before reaching those gates.
291            source: SourceConfig {
292                source_type: SourceType::Postgres,
293                url: Some("postgresql://127.0.0.2:9999/nonexistent".into()),
294                url_env: None,
295                url_file: None,
296                host: None,
297                port: None,
298                user: None,
299                password: None,
300                password_env: None,
301                database: None,
302                environment: None,
303                tuning: None,
304                tls: None,
305            },
306            column_overrides: Default::default(),
307            verify: crate::config::VerifyMode::Size,
308            schema_drift_policy: Default::default(),
309            shape_drift_warn_factor: 0.0,
310            parquet: None,
311        }
312    }
313
314    fn fresh_artifact() -> PlanArtifact {
315        PlanArtifact::new(
316            "orders".into(),
317            "full".into(),
318            String::new(),
319            unreachable_plan(),
320            ComputedPlanData {
321                chunk_ranges: vec![],
322                chunk_count: 0,
323                cursor_snapshot: None,
324                row_estimate: None,
325            },
326            PlanDiagnostics {
327                verdict: "Efficient".into(),
328                warnings: vec![],
329                recommended_profile: "balanced".into(),
330                strategy_rationale: String::new(),
331            },
332        )
333    }
334
335    fn write_artifact(dir: &tempfile::TempDir, artifact: &PlanArtifact) -> String {
336        let path = dir.path().join("plan.json");
337        let json = artifact.to_json_pretty().expect("serialize");
338        std::fs::write(&path, json).expect("write plan.json");
339        path.to_str().unwrap().to_string()
340    }
341
342    // ── staleness enforcement ────────────────────────────────────────────────
343
344    #[test]
345    fn stale_error_without_force_is_rejected() {
346        let mut artifact = fresh_artifact();
347        artifact.created_at = Utc::now() - Duration::hours(25);
348        let dir = tempfile::TempDir::new().unwrap();
349        let path = write_artifact(&dir, &artifact);
350
351        let err = run_apply_command(&path, false, false, false).unwrap_err();
352        let msg = format!("{err:#}");
353        assert!(
354            msg.contains("hours old") || msg.contains("24 h"),
355            "expected staleness error: {msg}"
356        );
357    }
358
359    // ── cursor drift ─────────────────────────────────────────────────────────
360
361    #[test]
362    fn cursor_drift_detected_no_prior_state() {
363        let mut artifact = fresh_artifact();
364        // Plan recorded a cursor snapshot; state store starts empty → drift.
365        artifact.computed.cursor_snapshot = Some("2025-06-01T00:00:00Z".into());
366        let dir = tempfile::TempDir::new().unwrap();
367        let path = write_artifact(&dir, &artifact);
368
369        let err = run_apply_command(&path, false, false, false).unwrap_err();
370        let msg = format!("{err:#}");
371        assert!(
372            msg.contains("drifted") || msg.contains("cursor"),
373            "expected cursor drift error: {msg}"
374        );
375    }
376
377    // ── file I/O errors ──────────────────────────────────────────────────────
378
379    #[test]
380    fn missing_plan_file_returns_error() {
381        let err = run_apply_command("/tmp/rivet_nonexistent_xyzxyz.json", false, false, false)
382            .unwrap_err();
383        let msg = format!("{err:#}");
384        assert!(
385            msg.contains("cannot read") || msg.contains("No such file"),
386            "expected file-not-found: {msg}"
387        );
388    }
389
390    #[test]
391    fn corrupt_plan_file_returns_parse_error() {
392        let dir = tempfile::TempDir::new().unwrap();
393        let path = dir.path().join("plan.json");
394        std::fs::write(&path, b"not valid json at all").unwrap();
395        let err = run_apply_command(path.to_str().unwrap(), false, false, false).unwrap_err();
396        let msg = format!("{err:#}");
397        assert!(
398            msg.contains("invalid plan") || msg.contains("JSON") || msg.contains("expected"),
399            "expected parse error: {msg}"
400        );
401    }
402
403    // ── artifact integrity / tamper-evidence (ADR-0005 PA10, finding #16) ─────
404
405    /// Offline mirror of the live RED test `audit_apply_rejects_tampered_plan`:
406    /// hand-edit `base_query` in the written plan file (orders → users), then
407    /// `apply` must REJECT it at the integrity gate — before any DB connection,
408    /// so this test needs no live source. Not bypassable by `--force`.
409    #[test]
410    fn apply_rejects_tampered_base_query() {
411        let artifact = fresh_artifact();
412        let dir = tempfile::TempDir::new().unwrap();
413        let path = write_artifact(&dir, &artifact);
414
415        // Tamper the serialized artifact the same way the live test does:
416        // rewrite the embedded base_query string in place, leaving the seal
417        // (and created_at) untouched so only the integrity check can catch it.
418        let json = std::fs::read_to_string(&path).unwrap();
419        assert!(
420            json.contains("SELECT 1"),
421            "fixture must embed the planned base_query"
422        );
423        let tampered = json.replace("SELECT 1", "SELECT * FROM secrets");
424        std::fs::write(&path, &tampered).unwrap();
425
426        let err = run_apply_command(&path, false, false, false).unwrap_err();
427        let msg = format!("{err:#}");
428        assert!(
429            msg.contains("integrity check failed") && msg.contains("modified after planning"),
430            "tampered plan must be rejected at the integrity gate, got: {msg}"
431        );
432
433        // And --force must NOT override it — a hand-edited contract is not opt-in.
434        let err_forced = run_apply_command(&path, true, false, false).unwrap_err();
435        let msg_forced = format!("{err_forced:#}");
436        assert!(
437            msg_forced.contains("integrity check failed"),
438            "--force must not bypass the integrity gate, got: {msg_forced}"
439        );
440    }
441
442    // The "untouched artifact is accepted" half is covered deterministically and
443    // without any connection attempt by `artifact.rs::integrity_seal_accepts_
444    // untouched_artifact`, and indirectly here by `stale_error_without_force_is_
445    // rejected` / `cursor_drift_detected_no_prior_state`, which both pass the
446    // integrity gate to reach the staleness / cursor gates they assert on.
447}