Skip to main content

rivet/pipeline/
apply_cmd.rs

1//! **`rivet apply`** — execute a previously-generated `PlanArtifact`.
2//!
3//! Responsibilities:
4//!
5//! 1. Deserialize `PlanArtifact` from the plan JSON file.
6//! 2. Check staleness (warn > 1 h, error > 24 h unless `--force`).
7//! 3. For `Incremental`: validate cursor has not drifted since plan time.
8//! 4. Execute using `ChunkSource::Precomputed` so chunk boundaries from the
9//!    artifact are replayed without re-running `SELECT min/max` queries.
10//! 5. Persist outcomes (manifest, cursor, metrics) via `StateStore` as usual.
11
12use chrono::Duration;
13use std::path::Path;
14
15use crate::error::Result;
16use crate::plan::{PlanArtifact, StalenessCheck};
17use crate::state::StateStore;
18
19use super::chunked::ChunkSource;
20use super::summary::ApplyContext;
21
22/// Entry point for `rivet apply <plan-file> [--force]`.
23pub fn run_apply_command(plan_file: &str, force: bool) -> Result<()> {
24    // 1. Load artifact
25    let artifact = PlanArtifact::from_file(plan_file)?;
26
27    // Track which preflight checks --force actually overrode for this run.
28    // Threaded into RunSummary.apply_context so the run report carries an
29    // auditable record of bypassed gates (finding F5 of the 0.7.5 audit).
30    let mut force_bypassed: Vec<String> = Vec::new();
31
32    // 2. Staleness check
33    let warn_threshold = Duration::hours(1);
34    let error_threshold = Duration::hours(24);
35    match artifact.staleness(warn_threshold, error_threshold) {
36        StalenessCheck::Fresh => {}
37        StalenessCheck::StaleWarn(age) => {
38            log::warn!(
39                "plan '{}' is {} minutes old — consider regenerating with `rivet plan`",
40                artifact.export_name,
41                age.num_minutes()
42            );
43        }
44        StalenessCheck::StaleError(age) => {
45            // F6 (0.7.5 audit): "56035 hours old" is mathematically
46            // correct but unreadable.  For ages above 48 h switch to
47            // days + the actual `created_at` date; below that keep
48            // hours so the boundary right above 24 h reads cleanly.
49            let age_phrase = if age.num_hours() >= 48 {
50                format!(
51                    "{} days old (created {})",
52                    age.num_days(),
53                    artifact.created_at.format("%Y-%m-%d")
54                )
55            } else {
56                format!("{} hours old", age.num_hours())
57            };
58            if !force {
59                anyhow::bail!(
60                    "plan '{}' is {} (limit: 24 h). Regenerate with `rivet plan` or pass --force to override.",
61                    artifact.export_name,
62                    age_phrase,
63                );
64            }
65            force_bypassed.push("staleness".into());
66            log::warn!(
67                "plan '{}': ignoring staleness ({}) because --force was passed",
68                artifact.export_name,
69                age_phrase,
70            );
71        }
72    }
73
74    // 3. Open StateStore — F13 (0.7.5 audit) resolution policy:
75    //    a) If the artifact recorded the original config path AND that
76    //       directory exists, open state next to it.  This is the only
77    //       path that keeps `apply` consistent with `rivet run`'s state
78    //       location, so cursors, manifests, and schema history line up.
79    //    b) Otherwise fall back to the plan file's own directory — the
80    //       pre-0.7.5 behaviour, which is the right answer when the
81    //       config has been deleted or moved and the operator just
82    //       wants to replay the artifact in isolation.
83    //
84    //    The fallback path also emits a WARN so the operator notices
85    //    the divergence (e.g. plan files stored separately from
86    //    configs without explicit intent).
87    let plan_dir = Path::new(plan_file)
88        .parent()
89        .unwrap_or_else(|| Path::new("."));
90    let state_dir = match artifact
91        .config_path
92        .as_deref()
93        .map(Path::new)
94        .and_then(Path::parent)
95    {
96        Some(dir) if dir.exists() => dir.to_path_buf(),
97        Some(dir) => {
98            log::warn!(
99                "plan '{}': original config dir '{}' no longer exists; opening state next to plan file instead. \
100                 Cursors and manifest history from the original run will not be visible.",
101                artifact.export_name,
102                dir.display(),
103            );
104            plan_dir.to_path_buf()
105        }
106        None => {
107            log::warn!(
108                "plan '{}': artifact has no recorded config path (pre-0.7.5 plan?). \
109                 Opening state next to the plan file; this may diverge from the state \
110                 used by `rivet run` for the same config.",
111                artifact.export_name,
112            );
113            plan_dir.to_path_buf()
114        }
115    };
116    let state_path = state_dir.join(".rivet_state.db");
117    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
118
119    // 4. Cursor drift check for Incremental exports
120    //
121    // Finding F1 of the 0.7.5 audit: the error message has always told the
122    // user "pass --force to skip this check", but until 0.7.5 the code did
123    // not actually honour `--force` here.  The flag now bypasses the bail
124    // with a WARN log, matching the documented contract and the analogous
125    // staleness path above.
126    if artifact.computed.cursor_snapshot.is_some() {
127        let current = state.get(&artifact.export_name)?.last_cursor_value;
128        if !artifact.cursor_matches(current.as_deref()) {
129            if !force {
130                anyhow::bail!(
131                    "plan '{}': cursor has drifted since plan was generated \
132                     (plan snapshot: {:?}, current: {:?}). \
133                     Regenerate with `rivet plan` or pass --force to skip this check.",
134                    artifact.export_name,
135                    artifact.computed.cursor_snapshot,
136                    current,
137                );
138            }
139            force_bypassed.push("cursor_drift".into());
140            log::warn!(
141                "plan '{}': cursor has drifted (plan snapshot: {:?}, current: {:?}) — \
142                 proceeding because --force was passed",
143                artifact.export_name,
144                artifact.computed.cursor_snapshot,
145                current,
146            );
147        }
148    }
149
150    // 5. Build ChunkSource from the pre-computed ranges in the artifact
151    let chunk_source = if artifact.computed.chunk_ranges.is_empty() {
152        ChunkSource::Detect
153    } else {
154        ChunkSource::Precomputed(artifact.computed.chunk_ranges.clone())
155    };
156
157    // 6. Execute using the plan from the artifact, carrying the apply audit
158    // context into the run report (F5).
159    let apply_context = ApplyContext {
160        plan_id: artifact.plan_id.clone(),
161        forced: force,
162        force_bypassed,
163    };
164    let plan = artifact.resolved_plan.clone();
165    super::run_export_job_with_chunk_source(
166        &plan,
167        &state,
168        chunk_source,
169        plan_file,
170        Some(apply_context),
171    )
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use crate::config::{
178        CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
179        SourceType,
180    };
181    use crate::plan::{
182        ComputedPlanData, ExtractionStrategy, PlanArtifact, PlanDiagnostics, ResolvedRunPlan,
183    };
184    use crate::tuning::SourceTuning;
185    use chrono::{Duration, Utc};
186
187    fn unreachable_plan() -> ResolvedRunPlan {
188        ResolvedRunPlan {
189            export_name: "orders".into(),
190            base_query: "SELECT 1".into(),
191            strategy: ExtractionStrategy::Snapshot,
192            format: FormatType::Parquet,
193            compression: CompressionType::Zstd,
194            compression_level: None,
195            max_file_size_bytes: None,
196            skip_empty: false,
197            meta_columns: MetaColumns::default(),
198            destination: DestinationConfig {
199                destination_type: DestinationType::Local,
200                path: Some("/tmp/rivet_apply_test".into()),
201                ..Default::default()
202            },
203            quality: None,
204            tuning: SourceTuning::from_config(None),
205            tuning_profile_label: "balanced".into(),
206            validate: false,
207            reconcile: false,
208            resume: false,
209            // Deliberately unreachable — tests that fail early won't try to connect.
210            source: SourceConfig {
211                source_type: SourceType::Postgres,
212                url: Some("postgresql://nobody:wrong@127.0.0.2:9999/nonexistent".into()),
213                url_env: None,
214                url_file: None,
215                host: None,
216                port: None,
217                user: None,
218                password: None,
219                password_env: None,
220                database: None,
221                environment: None,
222                tuning: None,
223                tls: None,
224            },
225            column_overrides: Default::default(),
226            schema_drift_policy: Default::default(),
227            shape_drift_warn_factor: 0.0,
228            parquet: None,
229        }
230    }
231
232    fn fresh_artifact() -> PlanArtifact {
233        PlanArtifact::new(
234            "orders".into(),
235            "full".into(),
236            String::new(),
237            unreachable_plan(),
238            ComputedPlanData {
239                chunk_ranges: vec![],
240                chunk_count: 0,
241                cursor_snapshot: None,
242                row_estimate: None,
243            },
244            PlanDiagnostics {
245                verdict: "Efficient".into(),
246                warnings: vec![],
247                recommended_profile: "balanced".into(),
248            },
249        )
250    }
251
252    fn write_artifact(dir: &tempfile::TempDir, artifact: &PlanArtifact) -> String {
253        let path = dir.path().join("plan.json");
254        let json = artifact.to_json_pretty().expect("serialize");
255        std::fs::write(&path, json).expect("write plan.json");
256        path.to_str().unwrap().to_string()
257    }
258
259    // ── staleness enforcement ────────────────────────────────────────────────
260
261    #[test]
262    fn stale_error_without_force_is_rejected() {
263        let mut artifact = fresh_artifact();
264        artifact.created_at = Utc::now() - Duration::hours(25);
265        let dir = tempfile::TempDir::new().unwrap();
266        let path = write_artifact(&dir, &artifact);
267
268        let err = run_apply_command(&path, false).unwrap_err();
269        let msg = format!("{err:#}");
270        assert!(
271            msg.contains("hours old") || msg.contains("24 h"),
272            "expected staleness error: {msg}"
273        );
274    }
275
276    // ── cursor drift ─────────────────────────────────────────────────────────
277
278    #[test]
279    fn cursor_drift_detected_no_prior_state() {
280        let mut artifact = fresh_artifact();
281        // Plan recorded a cursor snapshot; state store starts empty → drift.
282        artifact.computed.cursor_snapshot = Some("2025-06-01T00:00:00Z".into());
283        let dir = tempfile::TempDir::new().unwrap();
284        let path = write_artifact(&dir, &artifact);
285
286        let err = run_apply_command(&path, false).unwrap_err();
287        let msg = format!("{err:#}");
288        assert!(
289            msg.contains("drifted") || msg.contains("cursor"),
290            "expected cursor drift error: {msg}"
291        );
292    }
293
294    // ── file I/O errors ──────────────────────────────────────────────────────
295
296    #[test]
297    fn missing_plan_file_returns_error() {
298        let err = run_apply_command("/tmp/rivet_nonexistent_xyzxyz.json", false).unwrap_err();
299        let msg = format!("{err:#}");
300        assert!(
301            msg.contains("cannot read") || msg.contains("No such file"),
302            "expected file-not-found: {msg}"
303        );
304    }
305
306    #[test]
307    fn corrupt_plan_file_returns_parse_error() {
308        let dir = tempfile::TempDir::new().unwrap();
309        let path = dir.path().join("plan.json");
310        std::fs::write(&path, b"not valid json at all").unwrap();
311        let err = run_apply_command(path.to_str().unwrap(), false).unwrap_err();
312        let msg = format!("{err:#}");
313        assert!(
314            msg.contains("invalid plan") || msg.contains("JSON") || msg.contains("expected"),
315            "expected parse error: {msg}"
316        );
317    }
318}