1use chrono::Duration;
13use std::path::Path;
14
15use crate::error::Result;
16use crate::plan::{PlanArtifact, StalenessCheck};
17use crate::state::StateStore;
18
19use super::chunked::ChunkSource;
20use super::summary::ApplyContext;
21
22pub fn run_apply_command(plan_file: &str, force: bool) -> Result<()> {
24 let artifact = PlanArtifact::from_file(plan_file)?;
26
27 let mut force_bypassed: Vec<String> = Vec::new();
31
32 let warn_threshold = Duration::hours(1);
34 let error_threshold = Duration::hours(24);
35 match artifact.staleness(warn_threshold, error_threshold) {
36 StalenessCheck::Fresh => {}
37 StalenessCheck::StaleWarn(age) => {
38 log::warn!(
39 "plan '{}' is {} minutes old — consider regenerating with `rivet plan`",
40 artifact.export_name,
41 age.num_minutes()
42 );
43 }
44 StalenessCheck::StaleError(age) => {
45 let age_phrase = if age.num_hours() >= 48 {
50 format!(
51 "{} days old (created {})",
52 age.num_days(),
53 artifact.created_at.format("%Y-%m-%d")
54 )
55 } else {
56 format!("{} hours old", age.num_hours())
57 };
58 if !force {
59 anyhow::bail!(
60 "plan '{}' is {} (limit: 24 h). Regenerate with `rivet plan` or pass --force to override.",
61 artifact.export_name,
62 age_phrase,
63 );
64 }
65 force_bypassed.push("staleness".into());
66 log::warn!(
67 "plan '{}': ignoring staleness ({}) because --force was passed",
68 artifact.export_name,
69 age_phrase,
70 );
71 }
72 }
73
74 let plan_dir = Path::new(plan_file)
88 .parent()
89 .unwrap_or_else(|| Path::new("."));
90 let state_dir = match artifact
91 .config_path
92 .as_deref()
93 .map(Path::new)
94 .and_then(Path::parent)
95 {
96 Some(dir) if dir.exists() => dir.to_path_buf(),
97 Some(dir) => {
98 log::warn!(
99 "plan '{}': original config dir '{}' no longer exists; opening state next to plan file instead. \
100 Cursors and manifest history from the original run will not be visible.",
101 artifact.export_name,
102 dir.display(),
103 );
104 plan_dir.to_path_buf()
105 }
106 None => {
107 log::warn!(
108 "plan '{}': artifact has no recorded config path (pre-0.7.5 plan?). \
109 Opening state next to the plan file; this may diverge from the state \
110 used by `rivet run` for the same config.",
111 artifact.export_name,
112 );
113 plan_dir.to_path_buf()
114 }
115 };
116 let state_path = state_dir.join(".rivet_state.db");
117 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
118
119 if artifact.computed.cursor_snapshot.is_some() {
127 let current = state.get(&artifact.export_name)?.last_cursor_value;
128 if !artifact.cursor_matches(current.as_deref()) {
129 if !force {
130 anyhow::bail!(
131 "plan '{}': cursor has drifted since plan was generated \
132 (plan snapshot: {:?}, current: {:?}). \
133 Regenerate with `rivet plan` or pass --force to skip this check.",
134 artifact.export_name,
135 artifact.computed.cursor_snapshot,
136 current,
137 );
138 }
139 force_bypassed.push("cursor_drift".into());
140 log::warn!(
141 "plan '{}': cursor has drifted (plan snapshot: {:?}, current: {:?}) — \
142 proceeding because --force was passed",
143 artifact.export_name,
144 artifact.computed.cursor_snapshot,
145 current,
146 );
147 }
148 }
149
150 let chunk_source = if artifact.computed.chunk_ranges.is_empty() {
152 ChunkSource::Detect
153 } else {
154 ChunkSource::Precomputed(artifact.computed.chunk_ranges.clone())
155 };
156
157 let apply_context = ApplyContext {
160 plan_id: artifact.plan_id.clone(),
161 forced: force,
162 force_bypassed,
163 };
164 let plan = artifact.resolved_plan.clone();
165 super::run_export_job_with_chunk_source(
166 &plan,
167 &state,
168 chunk_source,
169 plan_file,
170 Some(apply_context),
171 )
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use crate::config::{
178 CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
179 SourceType,
180 };
181 use crate::plan::{
182 ComputedPlanData, ExtractionStrategy, PlanArtifact, PlanDiagnostics, ResolvedRunPlan,
183 };
184 use crate::tuning::SourceTuning;
185 use chrono::{Duration, Utc};
186
187 fn unreachable_plan() -> ResolvedRunPlan {
188 ResolvedRunPlan {
189 export_name: "orders".into(),
190 base_query: "SELECT 1".into(),
191 strategy: ExtractionStrategy::Snapshot,
192 format: FormatType::Parquet,
193 compression: CompressionType::Zstd,
194 compression_level: None,
195 max_file_size_bytes: None,
196 skip_empty: false,
197 meta_columns: MetaColumns::default(),
198 destination: DestinationConfig {
199 destination_type: DestinationType::Local,
200 path: Some("/tmp/rivet_apply_test".into()),
201 ..Default::default()
202 },
203 quality: None,
204 tuning: SourceTuning::from_config(None),
205 tuning_profile_label: "balanced".into(),
206 validate: false,
207 reconcile: false,
208 resume: false,
209 source: SourceConfig {
211 source_type: SourceType::Postgres,
212 url: Some("postgresql://nobody:wrong@127.0.0.2:9999/nonexistent".into()),
213 url_env: None,
214 url_file: None,
215 host: None,
216 port: None,
217 user: None,
218 password: None,
219 password_env: None,
220 database: None,
221 environment: None,
222 tuning: None,
223 tls: None,
224 },
225 column_overrides: Default::default(),
226 verify: crate::config::VerifyMode::Size,
227 schema_drift_policy: Default::default(),
228 shape_drift_warn_factor: 0.0,
229 parquet: None,
230 }
231 }
232
233 fn fresh_artifact() -> PlanArtifact {
234 PlanArtifact::new(
235 "orders".into(),
236 "full".into(),
237 String::new(),
238 unreachable_plan(),
239 ComputedPlanData {
240 chunk_ranges: vec![],
241 chunk_count: 0,
242 cursor_snapshot: None,
243 row_estimate: None,
244 },
245 PlanDiagnostics {
246 verdict: "Efficient".into(),
247 warnings: vec![],
248 recommended_profile: "balanced".into(),
249 },
250 )
251 }
252
253 fn write_artifact(dir: &tempfile::TempDir, artifact: &PlanArtifact) -> String {
254 let path = dir.path().join("plan.json");
255 let json = artifact.to_json_pretty().expect("serialize");
256 std::fs::write(&path, json).expect("write plan.json");
257 path.to_str().unwrap().to_string()
258 }
259
260 #[test]
263 fn stale_error_without_force_is_rejected() {
264 let mut artifact = fresh_artifact();
265 artifact.created_at = Utc::now() - Duration::hours(25);
266 let dir = tempfile::TempDir::new().unwrap();
267 let path = write_artifact(&dir, &artifact);
268
269 let err = run_apply_command(&path, false).unwrap_err();
270 let msg = format!("{err:#}");
271 assert!(
272 msg.contains("hours old") || msg.contains("24 h"),
273 "expected staleness error: {msg}"
274 );
275 }
276
277 #[test]
280 fn cursor_drift_detected_no_prior_state() {
281 let mut artifact = fresh_artifact();
282 artifact.computed.cursor_snapshot = Some("2025-06-01T00:00:00Z".into());
284 let dir = tempfile::TempDir::new().unwrap();
285 let path = write_artifact(&dir, &artifact);
286
287 let err = run_apply_command(&path, false).unwrap_err();
288 let msg = format!("{err:#}");
289 assert!(
290 msg.contains("drifted") || msg.contains("cursor"),
291 "expected cursor drift error: {msg}"
292 );
293 }
294
295 #[test]
298 fn missing_plan_file_returns_error() {
299 let err = run_apply_command("/tmp/rivet_nonexistent_xyzxyz.json", false).unwrap_err();
300 let msg = format!("{err:#}");
301 assert!(
302 msg.contains("cannot read") || msg.contains("No such file"),
303 "expected file-not-found: {msg}"
304 );
305 }
306
307 #[test]
308 fn corrupt_plan_file_returns_parse_error() {
309 let dir = tempfile::TempDir::new().unwrap();
310 let path = dir.path().join("plan.json");
311 std::fs::write(&path, b"not valid json at all").unwrap();
312 let err = run_apply_command(path.to_str().unwrap(), false).unwrap_err();
313 let msg = format!("{err:#}");
314 assert!(
315 msg.contains("invalid plan") || msg.contains("JSON") || msg.contains("expected"),
316 "expected parse error: {msg}"
317 );
318 }
319}