Skip to main content

rivet/pipeline/
apply_cmd.rs

1//! **`rivet apply`** — execute a previously-generated `PlanArtifact`.
2//!
3//! Responsibilities:
4//!
5//! 1. Deserialize `PlanArtifact` from the plan JSON file.
6//! 2. Check staleness (warn > 1 h, error > 24 h unless `--force`).
7//! 3. For `Incremental`: validate cursor has not drifted since plan time.
8//! 4. Execute using `ChunkSource::Precomputed` so chunk boundaries from the
9//!    artifact are replayed without re-running `SELECT min/max` queries.
10//! 5. Persist outcomes (manifest, cursor, metrics) via `StateStore` as usual.
11
12use chrono::Duration;
13use std::path::Path;
14
15use crate::error::Result;
16use crate::plan::{PlanArtifact, StalenessCheck};
17use crate::state::StateStore;
18
19use super::chunked::ChunkSource;
20use super::summary::ApplyContext;
21
22/// Entry point for `rivet apply <plan-file> [--force]`.
23pub fn run_apply_command(plan_file: &str, force: bool) -> Result<()> {
24    // 1. Load artifact
25    let artifact = PlanArtifact::from_file(plan_file)?;
26
27    // 1a. Tamper-evidence (ADR-0005 PA10, finding #16): reject a plan whose
28    //     `resolved_plan` was edited after planning before we run any query.
29    //     Unlike staleness/cursor-drift this is NOT bypassable by --force —
30    //     a hand-edited execution contract is never something the operator
31    //     can opt into; the only correct recovery is to re-run `rivet plan`.
32    artifact.verify_integrity()?;
33
34    // 1b. Un-appliable inline-url plan (finding #17): a plan generated from an
35    //     inline `url:` config has its credentials redacted to `REDACTED@…`
36    //     with no env/file reference to re-resolve them, so apply would die
37    //     deep in the driver with an opaque auth/"password missing" error.
38    //     Catch it here with the exact remedy.
39    reject_unrecoverable_inline_url(&artifact)?;
40
41    // Track which preflight checks --force actually overrode for this run.
42    // Threaded into RunSummary.apply_context so the run report carries an
43    // auditable record of bypassed gates (finding F5 of the 0.7.5 audit).
44    let mut force_bypassed: Vec<String> = Vec::new();
45
46    // 2. Staleness check
47    let warn_threshold = Duration::hours(1);
48    let error_threshold = Duration::hours(24);
49    match artifact.staleness(warn_threshold, error_threshold) {
50        StalenessCheck::Fresh => {}
51        StalenessCheck::StaleWarn(age) => {
52            log::warn!(
53                "plan '{}' is {} minutes old — consider regenerating with `rivet plan`",
54                artifact.export_name,
55                age.num_minutes()
56            );
57        }
58        StalenessCheck::StaleError(age) => {
59            // F6 (0.7.5 audit): "56035 hours old" is mathematically
60            // correct but unreadable.  For ages above 48 h switch to
61            // days + the actual `created_at` date; below that keep
62            // hours so the boundary right above 24 h reads cleanly.
63            let age_phrase = if age.num_hours() >= 48 {
64                format!(
65                    "{} days old (created {})",
66                    age.num_days(),
67                    artifact.created_at.format("%Y-%m-%d")
68                )
69            } else {
70                format!("{} hours old", age.num_hours())
71            };
72            if !force {
73                anyhow::bail!(
74                    "plan '{}' is {} (limit: 24 h). Regenerate with `rivet plan` or pass --force to override.",
75                    artifact.export_name,
76                    age_phrase,
77                );
78            }
79            force_bypassed.push("staleness".into());
80            log::warn!(
81                "plan '{}': ignoring staleness ({}) because --force was passed",
82                artifact.export_name,
83                age_phrase,
84            );
85        }
86    }
87
88    // 3. Open StateStore — F13 (0.7.5 audit) resolution policy:
89    //    a) If the artifact recorded the original config path AND that
90    //       directory exists, open state next to it.  This is the only
91    //       path that keeps `apply` consistent with `rivet run`'s state
92    //       location, so cursors, manifests, and schema history line up.
93    //    b) Otherwise fall back to the plan file's own directory — the
94    //       pre-0.7.5 behaviour, which is the right answer when the
95    //       config has been deleted or moved and the operator just
96    //       wants to replay the artifact in isolation.
97    //
98    //    The fallback path also emits a WARN so the operator notices
99    //    the divergence (e.g. plan files stored separately from
100    //    configs without explicit intent).
101    let plan_dir = Path::new(plan_file)
102        .parent()
103        .unwrap_or_else(|| Path::new("."));
104    let state_dir = match artifact
105        .config_path
106        .as_deref()
107        .map(Path::new)
108        .and_then(Path::parent)
109    {
110        Some(dir) if dir.exists() => dir.to_path_buf(),
111        Some(dir) => {
112            log::warn!(
113                "plan '{}': original config dir '{}' no longer exists; opening state next to plan file instead. \
114                 Cursors and manifest history from the original run will not be visible.",
115                artifact.export_name,
116                dir.display(),
117            );
118            plan_dir.to_path_buf()
119        }
120        None => {
121            log::warn!(
122                "plan '{}': artifact has no recorded config path (pre-0.7.5 plan?). \
123                 Opening state next to the plan file; this may diverge from the state \
124                 used by `rivet run` for the same config.",
125                artifact.export_name,
126            );
127            plan_dir.to_path_buf()
128        }
129    };
130    let state_path = state_dir.join(".rivet_state.db");
131    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
132
133    // 4. Cursor drift check for Incremental exports
134    //
135    // Finding F1 of the 0.7.5 audit: the error message has always told the
136    // user "pass --force to skip this check", but until 0.7.5 the code did
137    // not actually honour `--force` here.  The flag now bypasses the bail
138    // with a WARN log, matching the documented contract and the analogous
139    // staleness path above.
140    if artifact.computed.cursor_snapshot.is_some() {
141        let current = state.get(&artifact.export_name)?.last_cursor_value;
142        if !artifact.cursor_matches(current.as_deref()) {
143            if !force {
144                anyhow::bail!(
145                    "plan '{}': cursor has drifted since plan was generated \
146                     (plan snapshot: {:?}, current: {:?}). \
147                     Regenerate with `rivet plan` or pass --force to skip this check.",
148                    artifact.export_name,
149                    artifact.computed.cursor_snapshot,
150                    current,
151                );
152            }
153            force_bypassed.push("cursor_drift".into());
154            log::warn!(
155                "plan '{}': cursor has drifted (plan snapshot: {:?}, current: {:?}) — \
156                 proceeding because --force was passed",
157                artifact.export_name,
158                artifact.computed.cursor_snapshot,
159                current,
160            );
161        }
162    }
163
164    // 5. Build ChunkSource from the pre-computed ranges in the artifact
165    let chunk_source = if artifact.computed.chunk_ranges.is_empty() {
166        ChunkSource::Detect
167    } else {
168        ChunkSource::Precomputed(artifact.computed.chunk_ranges.clone())
169    };
170
171    // 6. Execute using the plan from the artifact, carrying the apply audit
172    // context into the run report (F5).
173    let apply_context = ApplyContext {
174        plan_id: artifact.plan_id.clone(),
175        forced: force,
176        force_bypassed,
177    };
178    let plan = artifact.resolved_plan.clone();
179    super::run_export_job_with_chunk_source(
180        &plan,
181        &state,
182        chunk_source,
183        plan_file,
184        Some(apply_context),
185    )
186}
187
188/// Finding #17: reject — with an actionable remedy — a plan whose source
189/// credentials were stripped at plan time (inline `url:`) and cannot be
190/// re-resolved at apply time.
191///
192/// `PlanArtifact::new` redacts plaintext credentials (PA9): an inline
193/// `url: "postgresql://user:pass@host/db"` becomes `postgresql://REDACTED@host/db`.
194/// That is recoverable only if the source *also* carries a reference apply can
195/// resolve (`url_env` / `url_file` / `password_env`, or structured `host`+`user`).
196/// When none of those exist, connecting later fails deep in the driver with an
197/// opaque auth error; surface the fix instead.
198fn reject_unrecoverable_inline_url(artifact: &PlanArtifact) -> Result<()> {
199    let source = &artifact.resolved_plan.source;
200
201    let url_redacted = source
202        .url
203        .as_deref()
204        .is_some_and(|u| u.contains("REDACTED@"));
205    if !url_redacted {
206        return Ok(());
207    }
208
209    // A redacted URL is still appliable if apply has another way to obtain
210    // credentials: an env/file URL reference, an env password, or enough
211    // structured fields to rebuild the connection.
212    let has_recovery = source.url_env.is_some()
213        || source.url_file.is_some()
214        || source.password_env.is_some()
215        || (source.host.is_some() && source.user.is_some());
216    if has_recovery {
217        return Ok(());
218    }
219
220    anyhow::bail!(
221        "plan '{}': source credentials were stripped from this artifact and cannot be \
222         recovered at apply time — the plan was created from an inline `url:` config, whose \
223         password is never persisted.\n  \
224         Fix: re-plan from a config that uses `url_env: <VAR>` (or `url_file:`) so `rivet apply` \
225         can resolve credentials, e.g.\n    \
226         source:\n      type: {}\n      url_env: DATABASE_URL\n  \
227         then `export DATABASE_URL=...` before running apply.",
228        artifact.export_name,
229        match artifact.resolved_plan.source.source_type {
230            crate::config::SourceType::Postgres => "postgres",
231            crate::config::SourceType::Mysql => "mysql",
232            crate::config::SourceType::Mssql => "mssql",
233        },
234    )
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use crate::config::{
241        CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
242        SourceType,
243    };
244    use crate::plan::{
245        ComputedPlanData, ExtractionStrategy, PlanArtifact, PlanDiagnostics, ResolvedRunPlan,
246    };
247    use crate::tuning::SourceTuning;
248    use chrono::{Duration, Utc};
249
250    fn unreachable_plan() -> ResolvedRunPlan {
251        ResolvedRunPlan {
252            export_name: "orders".into(),
253            base_query: "SELECT 1".into(),
254            strategy: ExtractionStrategy::Snapshot,
255            format: FormatType::Parquet,
256            compression: CompressionType::Zstd,
257            compression_level: None,
258            max_file_size_bytes: None,
259            skip_empty: false,
260            meta_columns: MetaColumns::default(),
261            destination: DestinationConfig {
262                destination_type: DestinationType::Local,
263                path: Some("/tmp/rivet_apply_test".into()),
264                ..Default::default()
265            },
266            quality: None,
267            tuning: SourceTuning::from_config(None),
268            tuning_profile_label: "balanced".into(),
269            validate: false,
270            reconcile: false,
271            resume: false,
272            // Deliberately unreachable — tests that fail early won't try to
273            // connect. No userinfo in the URL: nothing to redact, so the
274            // finding-#17 gate (`reject_unrecoverable_inline_url`) does not fire
275            // and these fixtures still exercise the staleness / cursor / driver
276            // paths they were written for. A credentialed URL here would be
277            // rewritten to `REDACTED@…` by PA9 redaction and then rejected by
278            // #17 before reaching those gates.
279            source: SourceConfig {
280                source_type: SourceType::Postgres,
281                url: Some("postgresql://127.0.0.2:9999/nonexistent".into()),
282                url_env: None,
283                url_file: None,
284                host: None,
285                port: None,
286                user: None,
287                password: None,
288                password_env: None,
289                database: None,
290                environment: None,
291                tuning: None,
292                tls: None,
293            },
294            column_overrides: Default::default(),
295            verify: crate::config::VerifyMode::Size,
296            schema_drift_policy: Default::default(),
297            shape_drift_warn_factor: 0.0,
298            parquet: None,
299        }
300    }
301
302    fn fresh_artifact() -> PlanArtifact {
303        PlanArtifact::new(
304            "orders".into(),
305            "full".into(),
306            String::new(),
307            unreachable_plan(),
308            ComputedPlanData {
309                chunk_ranges: vec![],
310                chunk_count: 0,
311                cursor_snapshot: None,
312                row_estimate: None,
313            },
314            PlanDiagnostics {
315                verdict: "Efficient".into(),
316                warnings: vec![],
317                recommended_profile: "balanced".into(),
318            },
319        )
320    }
321
322    fn write_artifact(dir: &tempfile::TempDir, artifact: &PlanArtifact) -> String {
323        let path = dir.path().join("plan.json");
324        let json = artifact.to_json_pretty().expect("serialize");
325        std::fs::write(&path, json).expect("write plan.json");
326        path.to_str().unwrap().to_string()
327    }
328
329    // ── staleness enforcement ────────────────────────────────────────────────
330
331    #[test]
332    fn stale_error_without_force_is_rejected() {
333        let mut artifact = fresh_artifact();
334        artifact.created_at = Utc::now() - Duration::hours(25);
335        let dir = tempfile::TempDir::new().unwrap();
336        let path = write_artifact(&dir, &artifact);
337
338        let err = run_apply_command(&path, false).unwrap_err();
339        let msg = format!("{err:#}");
340        assert!(
341            msg.contains("hours old") || msg.contains("24 h"),
342            "expected staleness error: {msg}"
343        );
344    }
345
346    // ── cursor drift ─────────────────────────────────────────────────────────
347
348    #[test]
349    fn cursor_drift_detected_no_prior_state() {
350        let mut artifact = fresh_artifact();
351        // Plan recorded a cursor snapshot; state store starts empty → drift.
352        artifact.computed.cursor_snapshot = Some("2025-06-01T00:00:00Z".into());
353        let dir = tempfile::TempDir::new().unwrap();
354        let path = write_artifact(&dir, &artifact);
355
356        let err = run_apply_command(&path, false).unwrap_err();
357        let msg = format!("{err:#}");
358        assert!(
359            msg.contains("drifted") || msg.contains("cursor"),
360            "expected cursor drift error: {msg}"
361        );
362    }
363
364    // ── file I/O errors ──────────────────────────────────────────────────────
365
366    #[test]
367    fn missing_plan_file_returns_error() {
368        let err = run_apply_command("/tmp/rivet_nonexistent_xyzxyz.json", false).unwrap_err();
369        let msg = format!("{err:#}");
370        assert!(
371            msg.contains("cannot read") || msg.contains("No such file"),
372            "expected file-not-found: {msg}"
373        );
374    }
375
376    #[test]
377    fn corrupt_plan_file_returns_parse_error() {
378        let dir = tempfile::TempDir::new().unwrap();
379        let path = dir.path().join("plan.json");
380        std::fs::write(&path, b"not valid json at all").unwrap();
381        let err = run_apply_command(path.to_str().unwrap(), false).unwrap_err();
382        let msg = format!("{err:#}");
383        assert!(
384            msg.contains("invalid plan") || msg.contains("JSON") || msg.contains("expected"),
385            "expected parse error: {msg}"
386        );
387    }
388
389    // ── artifact integrity / tamper-evidence (ADR-0005 PA10, finding #16) ─────
390
391    /// Offline mirror of the live RED test `audit_apply_rejects_tampered_plan`:
392    /// hand-edit `base_query` in the written plan file (orders → users), then
393    /// `apply` must REJECT it at the integrity gate — before any DB connection,
394    /// so this test needs no live source. Not bypassable by `--force`.
395    #[test]
396    fn apply_rejects_tampered_base_query() {
397        let artifact = fresh_artifact();
398        let dir = tempfile::TempDir::new().unwrap();
399        let path = write_artifact(&dir, &artifact);
400
401        // Tamper the serialized artifact the same way the live test does:
402        // rewrite the embedded base_query string in place, leaving the seal
403        // (and created_at) untouched so only the integrity check can catch it.
404        let json = std::fs::read_to_string(&path).unwrap();
405        assert!(
406            json.contains("SELECT 1"),
407            "fixture must embed the planned base_query"
408        );
409        let tampered = json.replace("SELECT 1", "SELECT * FROM secrets");
410        std::fs::write(&path, &tampered).unwrap();
411
412        let err = run_apply_command(&path, false).unwrap_err();
413        let msg = format!("{err:#}");
414        assert!(
415            msg.contains("integrity check failed") && msg.contains("modified after planning"),
416            "tampered plan must be rejected at the integrity gate, got: {msg}"
417        );
418
419        // And --force must NOT override it — a hand-edited contract is not opt-in.
420        let err_forced = run_apply_command(&path, true).unwrap_err();
421        let msg_forced = format!("{err_forced:#}");
422        assert!(
423            msg_forced.contains("integrity check failed"),
424            "--force must not bypass the integrity gate, got: {msg_forced}"
425        );
426    }
427
428    // The "untouched artifact is accepted" half is covered deterministically and
429    // without any connection attempt by `artifact.rs::integrity_seal_accepts_
430    // untouched_artifact`, and indirectly here by `stale_error_without_force_is_
431    // rejected` / `cursor_drift_detected_no_prior_state`, which both pass the
432    // integrity gate to reach the staleness / cursor gates they assert on.
433}