1use chrono::Duration;
13use std::path::Path;
14
15use crate::error::Result;
16use crate::plan::{PlanArtifact, StalenessCheck};
17use crate::state::StateStore;
18
19use super::chunked::ChunkSource;
20use super::summary::ApplyContext;
21
22pub fn run_apply_command(plan_file: &str, force: bool) -> Result<()> {
24 let artifact = PlanArtifact::from_file(plan_file)?;
26
27 let mut force_bypassed: Vec<String> = Vec::new();
31
32 let warn_threshold = Duration::hours(1);
34 let error_threshold = Duration::hours(24);
35 match artifact.staleness(warn_threshold, error_threshold) {
36 StalenessCheck::Fresh => {}
37 StalenessCheck::StaleWarn(age) => {
38 log::warn!(
39 "plan '{}' is {} minutes old — consider regenerating with `rivet plan`",
40 artifact.export_name,
41 age.num_minutes()
42 );
43 }
44 StalenessCheck::StaleError(age) => {
45 let age_phrase = if age.num_hours() >= 48 {
50 format!(
51 "{} days old (created {})",
52 age.num_days(),
53 artifact.created_at.format("%Y-%m-%d")
54 )
55 } else {
56 format!("{} hours old", age.num_hours())
57 };
58 if !force {
59 anyhow::bail!(
60 "plan '{}' is {} (limit: 24 h). Regenerate with `rivet plan` or pass --force to override.",
61 artifact.export_name,
62 age_phrase,
63 );
64 }
65 force_bypassed.push("staleness".into());
66 log::warn!(
67 "plan '{}': ignoring staleness ({}) because --force was passed",
68 artifact.export_name,
69 age_phrase,
70 );
71 }
72 }
73
74 let plan_dir = Path::new(plan_file)
88 .parent()
89 .unwrap_or_else(|| Path::new("."));
90 let state_dir = match artifact
91 .config_path
92 .as_deref()
93 .map(Path::new)
94 .and_then(Path::parent)
95 {
96 Some(dir) if dir.exists() => dir.to_path_buf(),
97 Some(dir) => {
98 log::warn!(
99 "plan '{}': original config dir '{}' no longer exists; opening state next to plan file instead. \
100 Cursors and manifest history from the original run will not be visible.",
101 artifact.export_name,
102 dir.display(),
103 );
104 plan_dir.to_path_buf()
105 }
106 None => {
107 log::warn!(
108 "plan '{}': artifact has no recorded config path (pre-0.7.5 plan?). \
109 Opening state next to the plan file; this may diverge from the state \
110 used by `rivet run` for the same config.",
111 artifact.export_name,
112 );
113 plan_dir.to_path_buf()
114 }
115 };
116 let state_path = state_dir.join(".rivet_state.db");
117 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
118
119 if artifact.computed.cursor_snapshot.is_some() {
127 let current = state.get(&artifact.export_name)?.last_cursor_value;
128 if !artifact.cursor_matches(current.as_deref()) {
129 if !force {
130 anyhow::bail!(
131 "plan '{}': cursor has drifted since plan was generated \
132 (plan snapshot: {:?}, current: {:?}). \
133 Regenerate with `rivet plan` or pass --force to skip this check.",
134 artifact.export_name,
135 artifact.computed.cursor_snapshot,
136 current,
137 );
138 }
139 force_bypassed.push("cursor_drift".into());
140 log::warn!(
141 "plan '{}': cursor has drifted (plan snapshot: {:?}, current: {:?}) — \
142 proceeding because --force was passed",
143 artifact.export_name,
144 artifact.computed.cursor_snapshot,
145 current,
146 );
147 }
148 }
149
150 let chunk_source = if artifact.computed.chunk_ranges.is_empty() {
152 ChunkSource::Detect
153 } else {
154 ChunkSource::Precomputed(artifact.computed.chunk_ranges.clone())
155 };
156
157 let apply_context = ApplyContext {
160 plan_id: artifact.plan_id.clone(),
161 forced: force,
162 force_bypassed,
163 };
164 let plan = artifact.resolved_plan.clone();
165 super::run_export_job_with_chunk_source(
166 &plan,
167 &state,
168 chunk_source,
169 plan_file,
170 Some(apply_context),
171 )
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use crate::config::{
178 CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
179 SourceType,
180 };
181 use crate::plan::{
182 ComputedPlanData, ExtractionStrategy, PlanArtifact, PlanDiagnostics, ResolvedRunPlan,
183 };
184 use crate::tuning::SourceTuning;
185 use chrono::{Duration, Utc};
186
187 fn unreachable_plan() -> ResolvedRunPlan {
188 ResolvedRunPlan {
189 export_name: "orders".into(),
190 base_query: "SELECT 1".into(),
191 strategy: ExtractionStrategy::Snapshot,
192 format: FormatType::Parquet,
193 compression: CompressionType::Zstd,
194 compression_level: None,
195 max_file_size_bytes: None,
196 skip_empty: false,
197 meta_columns: MetaColumns::default(),
198 destination: DestinationConfig {
199 destination_type: DestinationType::Local,
200 path: Some("/tmp/rivet_apply_test".into()),
201 ..Default::default()
202 },
203 quality: None,
204 tuning: SourceTuning::from_config(None),
205 tuning_profile_label: "balanced".into(),
206 validate: false,
207 reconcile: false,
208 resume: false,
209 source: SourceConfig {
211 source_type: SourceType::Postgres,
212 url: Some("postgresql://nobody:wrong@127.0.0.2:9999/nonexistent".into()),
213 url_env: None,
214 url_file: None,
215 host: None,
216 port: None,
217 user: None,
218 password: None,
219 password_env: None,
220 database: None,
221 environment: None,
222 tuning: None,
223 tls: None,
224 },
225 column_overrides: Default::default(),
226 schema_drift_policy: Default::default(),
227 shape_drift_warn_factor: 0.0,
228 parquet: None,
229 }
230 }
231
232 fn fresh_artifact() -> PlanArtifact {
233 PlanArtifact::new(
234 "orders".into(),
235 "full".into(),
236 String::new(),
237 unreachable_plan(),
238 ComputedPlanData {
239 chunk_ranges: vec![],
240 chunk_count: 0,
241 cursor_snapshot: None,
242 row_estimate: None,
243 },
244 PlanDiagnostics {
245 verdict: "Efficient".into(),
246 warnings: vec![],
247 recommended_profile: "balanced".into(),
248 },
249 )
250 }
251
252 fn write_artifact(dir: &tempfile::TempDir, artifact: &PlanArtifact) -> String {
253 let path = dir.path().join("plan.json");
254 let json = artifact.to_json_pretty().expect("serialize");
255 std::fs::write(&path, json).expect("write plan.json");
256 path.to_str().unwrap().to_string()
257 }
258
259 #[test]
262 fn stale_error_without_force_is_rejected() {
263 let mut artifact = fresh_artifact();
264 artifact.created_at = Utc::now() - Duration::hours(25);
265 let dir = tempfile::TempDir::new().unwrap();
266 let path = write_artifact(&dir, &artifact);
267
268 let err = run_apply_command(&path, false).unwrap_err();
269 let msg = format!("{err:#}");
270 assert!(
271 msg.contains("hours old") || msg.contains("24 h"),
272 "expected staleness error: {msg}"
273 );
274 }
275
276 #[test]
279 fn cursor_drift_detected_no_prior_state() {
280 let mut artifact = fresh_artifact();
281 artifact.computed.cursor_snapshot = Some("2025-06-01T00:00:00Z".into());
283 let dir = tempfile::TempDir::new().unwrap();
284 let path = write_artifact(&dir, &artifact);
285
286 let err = run_apply_command(&path, false).unwrap_err();
287 let msg = format!("{err:#}");
288 assert!(
289 msg.contains("drifted") || msg.contains("cursor"),
290 "expected cursor drift error: {msg}"
291 );
292 }
293
294 #[test]
297 fn missing_plan_file_returns_error() {
298 let err = run_apply_command("/tmp/rivet_nonexistent_xyzxyz.json", false).unwrap_err();
299 let msg = format!("{err:#}");
300 assert!(
301 msg.contains("cannot read") || msg.contains("No such file"),
302 "expected file-not-found: {msg}"
303 );
304 }
305
306 #[test]
307 fn corrupt_plan_file_returns_parse_error() {
308 let dir = tempfile::TempDir::new().unwrap();
309 let path = dir.path().join("plan.json");
310 std::fs::write(&path, b"not valid json at all").unwrap();
311 let err = run_apply_command(path.to_str().unwrap(), false).unwrap_err();
312 let msg = format!("{err:#}");
313 assert!(
314 msg.contains("invalid plan") || msg.contains("JSON") || msg.contains("expected"),
315 "expected parse error: {msg}"
316 );
317 }
318}