Skip to main content

rivet/pipeline/
apply_cmd.rs

1//! **`rivet apply`** — execute a previously-generated `PlanArtifact`.
2//!
3//! Responsibilities:
4//!
5//! 1. Deserialize `PlanArtifact` from the plan JSON file.
6//! 2. Check staleness (warn > 1 h, error > 24 h unless `--force`).
7//! 3. For `Incremental`: validate cursor has not drifted since plan time.
8//! 4. Execute using `ChunkSource::Precomputed` so chunk boundaries from the
9//!    artifact are replayed without re-running `SELECT min/max` queries.
10//! 5. Persist outcomes (manifest, cursor, metrics) via `StateStore` as usual.
11
12use chrono::Duration;
13use std::path::Path;
14
15use crate::error::Result;
16use crate::plan::{PlanArtifact, StalenessCheck};
17use crate::state::StateStore;
18
19use super::chunked::ChunkSource;
20use super::summary::ApplyContext;
21
22/// Entry point for `rivet apply <plan-file> [--force]`.
23pub fn run_apply_command(plan_file: &str, force: bool) -> Result<()> {
24    // 1. Load artifact
25    let artifact = PlanArtifact::from_file(plan_file)?;
26
27    // Track which preflight checks --force actually overrode for this run.
28    // Threaded into RunSummary.apply_context so the run report carries an
29    // auditable record of bypassed gates (finding F5 of the 0.7.5 audit).
30    let mut force_bypassed: Vec<String> = Vec::new();
31
32    // 2. Staleness check
33    let warn_threshold = Duration::hours(1);
34    let error_threshold = Duration::hours(24);
35    match artifact.staleness(warn_threshold, error_threshold) {
36        StalenessCheck::Fresh => {}
37        StalenessCheck::StaleWarn(age) => {
38            log::warn!(
39                "plan '{}' is {} minutes old — consider regenerating with `rivet plan`",
40                artifact.export_name,
41                age.num_minutes()
42            );
43        }
44        StalenessCheck::StaleError(age) => {
45            // F6 (0.7.5 audit): "56035 hours old" is mathematically
46            // correct but unreadable.  For ages above 48 h switch to
47            // days + the actual `created_at` date; below that keep
48            // hours so the boundary right above 24 h reads cleanly.
49            let age_phrase = if age.num_hours() >= 48 {
50                format!(
51                    "{} days old (created {})",
52                    age.num_days(),
53                    artifact.created_at.format("%Y-%m-%d")
54                )
55            } else {
56                format!("{} hours old", age.num_hours())
57            };
58            if !force {
59                anyhow::bail!(
60                    "plan '{}' is {} (limit: 24 h). Regenerate with `rivet plan` or pass --force to override.",
61                    artifact.export_name,
62                    age_phrase,
63                );
64            }
65            force_bypassed.push("staleness".into());
66            log::warn!(
67                "plan '{}': ignoring staleness ({}) because --force was passed",
68                artifact.export_name,
69                age_phrase,
70            );
71        }
72    }
73
74    // 3. Open StateStore — F13 (0.7.5 audit) resolution policy:
75    //    a) If the artifact recorded the original config path AND that
76    //       directory exists, open state next to it.  This is the only
77    //       path that keeps `apply` consistent with `rivet run`'s state
78    //       location, so cursors, manifests, and schema history line up.
79    //    b) Otherwise fall back to the plan file's own directory — the
80    //       pre-0.7.5 behaviour, which is the right answer when the
81    //       config has been deleted or moved and the operator just
82    //       wants to replay the artifact in isolation.
83    //
84    //    The fallback path also emits a WARN so the operator notices
85    //    the divergence (e.g. plan files stored separately from
86    //    configs without explicit intent).
87    let plan_dir = Path::new(plan_file)
88        .parent()
89        .unwrap_or_else(|| Path::new("."));
90    let state_dir = match artifact
91        .config_path
92        .as_deref()
93        .map(Path::new)
94        .and_then(Path::parent)
95    {
96        Some(dir) if dir.exists() => dir.to_path_buf(),
97        Some(dir) => {
98            log::warn!(
99                "plan '{}': original config dir '{}' no longer exists; opening state next to plan file instead. \
100                 Cursors and manifest history from the original run will not be visible.",
101                artifact.export_name,
102                dir.display(),
103            );
104            plan_dir.to_path_buf()
105        }
106        None => {
107            log::warn!(
108                "plan '{}': artifact has no recorded config path (pre-0.7.5 plan?). \
109                 Opening state next to the plan file; this may diverge from the state \
110                 used by `rivet run` for the same config.",
111                artifact.export_name,
112            );
113            plan_dir.to_path_buf()
114        }
115    };
116    let state_path = state_dir.join(".rivet_state.db");
117    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
118
119    // 4. Cursor drift check for Incremental exports
120    //
121    // Finding F1 of the 0.7.5 audit: the error message has always told the
122    // user "pass --force to skip this check", but until 0.7.5 the code did
123    // not actually honour `--force` here.  The flag now bypasses the bail
124    // with a WARN log, matching the documented contract and the analogous
125    // staleness path above.
126    if artifact.computed.cursor_snapshot.is_some() {
127        let current = state.get(&artifact.export_name)?.last_cursor_value;
128        if !artifact.cursor_matches(current.as_deref()) {
129            if !force {
130                anyhow::bail!(
131                    "plan '{}': cursor has drifted since plan was generated \
132                     (plan snapshot: {:?}, current: {:?}). \
133                     Regenerate with `rivet plan` or pass --force to skip this check.",
134                    artifact.export_name,
135                    artifact.computed.cursor_snapshot,
136                    current,
137                );
138            }
139            force_bypassed.push("cursor_drift".into());
140            log::warn!(
141                "plan '{}': cursor has drifted (plan snapshot: {:?}, current: {:?}) — \
142                 proceeding because --force was passed",
143                artifact.export_name,
144                artifact.computed.cursor_snapshot,
145                current,
146            );
147        }
148    }
149
150    // 5. Build ChunkSource from the pre-computed ranges in the artifact
151    let chunk_source = if artifact.computed.chunk_ranges.is_empty() {
152        ChunkSource::Detect
153    } else {
154        ChunkSource::Precomputed(artifact.computed.chunk_ranges.clone())
155    };
156
157    // 6. Execute using the plan from the artifact, carrying the apply audit
158    // context into the run report (F5).
159    let apply_context = ApplyContext {
160        plan_id: artifact.plan_id.clone(),
161        forced: force,
162        force_bypassed,
163    };
164    let plan = artifact.resolved_plan.clone();
165    super::run_export_job_with_chunk_source(
166        &plan,
167        &state,
168        chunk_source,
169        plan_file,
170        Some(apply_context),
171    )
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use crate::config::{
178        CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
179        SourceType,
180    };
181    use crate::plan::{
182        ComputedPlanData, ExtractionStrategy, PlanArtifact, PlanDiagnostics, ResolvedRunPlan,
183    };
184    use crate::tuning::SourceTuning;
185    use chrono::{Duration, Utc};
186
187    fn unreachable_plan() -> ResolvedRunPlan {
188        ResolvedRunPlan {
189            export_name: "orders".into(),
190            base_query: "SELECT 1".into(),
191            strategy: ExtractionStrategy::Snapshot,
192            format: FormatType::Parquet,
193            compression: CompressionType::Zstd,
194            compression_level: None,
195            max_file_size_bytes: None,
196            skip_empty: false,
197            meta_columns: MetaColumns::default(),
198            destination: DestinationConfig {
199                destination_type: DestinationType::Local,
200                path: Some("/tmp/rivet_apply_test".into()),
201                ..Default::default()
202            },
203            quality: None,
204            tuning: SourceTuning::from_config(None),
205            tuning_profile_label: "balanced".into(),
206            validate: false,
207            reconcile: false,
208            resume: false,
209            // Deliberately unreachable — tests that fail early won't try to connect.
210            source: SourceConfig {
211                source_type: SourceType::Postgres,
212                url: Some("postgresql://nobody:wrong@127.0.0.2:9999/nonexistent".into()),
213                url_env: None,
214                url_file: None,
215                host: None,
216                port: None,
217                user: None,
218                password: None,
219                password_env: None,
220                database: None,
221                environment: None,
222                tuning: None,
223                tls: None,
224            },
225            column_overrides: Default::default(),
226            verify: crate::config::VerifyMode::Size,
227            schema_drift_policy: Default::default(),
228            shape_drift_warn_factor: 0.0,
229            parquet: None,
230        }
231    }
232
233    fn fresh_artifact() -> PlanArtifact {
234        PlanArtifact::new(
235            "orders".into(),
236            "full".into(),
237            String::new(),
238            unreachable_plan(),
239            ComputedPlanData {
240                chunk_ranges: vec![],
241                chunk_count: 0,
242                cursor_snapshot: None,
243                row_estimate: None,
244            },
245            PlanDiagnostics {
246                verdict: "Efficient".into(),
247                warnings: vec![],
248                recommended_profile: "balanced".into(),
249            },
250        )
251    }
252
253    fn write_artifact(dir: &tempfile::TempDir, artifact: &PlanArtifact) -> String {
254        let path = dir.path().join("plan.json");
255        let json = artifact.to_json_pretty().expect("serialize");
256        std::fs::write(&path, json).expect("write plan.json");
257        path.to_str().unwrap().to_string()
258    }
259
260    // ── staleness enforcement ────────────────────────────────────────────────
261
262    #[test]
263    fn stale_error_without_force_is_rejected() {
264        let mut artifact = fresh_artifact();
265        artifact.created_at = Utc::now() - Duration::hours(25);
266        let dir = tempfile::TempDir::new().unwrap();
267        let path = write_artifact(&dir, &artifact);
268
269        let err = run_apply_command(&path, false).unwrap_err();
270        let msg = format!("{err:#}");
271        assert!(
272            msg.contains("hours old") || msg.contains("24 h"),
273            "expected staleness error: {msg}"
274        );
275    }
276
277    // ── cursor drift ─────────────────────────────────────────────────────────
278
279    #[test]
280    fn cursor_drift_detected_no_prior_state() {
281        let mut artifact = fresh_artifact();
282        // Plan recorded a cursor snapshot; state store starts empty → drift.
283        artifact.computed.cursor_snapshot = Some("2025-06-01T00:00:00Z".into());
284        let dir = tempfile::TempDir::new().unwrap();
285        let path = write_artifact(&dir, &artifact);
286
287        let err = run_apply_command(&path, false).unwrap_err();
288        let msg = format!("{err:#}");
289        assert!(
290            msg.contains("drifted") || msg.contains("cursor"),
291            "expected cursor drift error: {msg}"
292        );
293    }
294
295    // ── file I/O errors ──────────────────────────────────────────────────────
296
297    #[test]
298    fn missing_plan_file_returns_error() {
299        let err = run_apply_command("/tmp/rivet_nonexistent_xyzxyz.json", false).unwrap_err();
300        let msg = format!("{err:#}");
301        assert!(
302            msg.contains("cannot read") || msg.contains("No such file"),
303            "expected file-not-found: {msg}"
304        );
305    }
306
307    #[test]
308    fn corrupt_plan_file_returns_parse_error() {
309        let dir = tempfile::TempDir::new().unwrap();
310        let path = dir.path().join("plan.json");
311        std::fs::write(&path, b"not valid json at all").unwrap();
312        let err = run_apply_command(path.to_str().unwrap(), false).unwrap_err();
313        let msg = format!("{err:#}");
314        assert!(
315            msg.contains("invalid plan") || msg.contains("JSON") || msg.contains("expected"),
316            "expected parse error: {msg}"
317        );
318    }
319}