1use chrono::Duration;
13use std::path::Path;
14
15use crate::error::Result;
16use crate::plan::{PlanArtifact, StalenessCheck};
17use crate::state::StateStore;
18
19use super::chunked::ChunkSource;
20use super::summary::ApplyContext;
21
22pub fn run_apply_command(plan_file: &str, force: bool) -> Result<()> {
24 let artifact = PlanArtifact::from_file(plan_file)?;
26
27 artifact.verify_integrity()?;
33
34 reject_unrecoverable_inline_url(&artifact)?;
40
41 let mut force_bypassed: Vec<String> = Vec::new();
45
46 let warn_threshold = Duration::hours(1);
48 let error_threshold = Duration::hours(24);
49 match artifact.staleness(warn_threshold, error_threshold) {
50 StalenessCheck::Fresh => {}
51 StalenessCheck::StaleWarn(age) => {
52 log::warn!(
53 "plan '{}' is {} minutes old — consider regenerating with `rivet plan`",
54 artifact.export_name,
55 age.num_minutes()
56 );
57 }
58 StalenessCheck::StaleError(age) => {
59 let age_phrase = if age.num_hours() >= 48 {
64 format!(
65 "{} days old (created {})",
66 age.num_days(),
67 artifact.created_at.format("%Y-%m-%d")
68 )
69 } else {
70 format!("{} hours old", age.num_hours())
71 };
72 if !force {
73 anyhow::bail!(
74 "plan '{}' is {} (limit: 24 h). Regenerate with `rivet plan` or pass --force to override.",
75 artifact.export_name,
76 age_phrase,
77 );
78 }
79 force_bypassed.push("staleness".into());
80 log::warn!(
81 "plan '{}': ignoring staleness ({}) because --force was passed",
82 artifact.export_name,
83 age_phrase,
84 );
85 }
86 }
87
88 let plan_dir = Path::new(plan_file)
102 .parent()
103 .unwrap_or_else(|| Path::new("."));
104 let state_dir = match artifact
105 .config_path
106 .as_deref()
107 .map(Path::new)
108 .and_then(Path::parent)
109 {
110 Some(dir) if dir.exists() => dir.to_path_buf(),
111 Some(dir) => {
112 log::warn!(
113 "plan '{}': original config dir '{}' no longer exists; opening state next to plan file instead. \
114 Cursors and manifest history from the original run will not be visible.",
115 artifact.export_name,
116 dir.display(),
117 );
118 plan_dir.to_path_buf()
119 }
120 None => {
121 log::warn!(
122 "plan '{}': artifact has no recorded config path (pre-0.7.5 plan?). \
123 Opening state next to the plan file; this may diverge from the state \
124 used by `rivet run` for the same config.",
125 artifact.export_name,
126 );
127 plan_dir.to_path_buf()
128 }
129 };
130 let state_path = state_dir.join(".rivet_state.db");
131 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
132
133 if artifact.computed.cursor_snapshot.is_some() {
141 let current = state.get(&artifact.export_name)?.last_cursor_value;
142 if !artifact.cursor_matches(current.as_deref()) {
143 if !force {
144 anyhow::bail!(
145 "plan '{}': cursor has drifted since plan was generated \
146 (plan snapshot: {:?}, current: {:?}). \
147 Regenerate with `rivet plan` or pass --force to skip this check.",
148 artifact.export_name,
149 artifact.computed.cursor_snapshot,
150 current,
151 );
152 }
153 force_bypassed.push("cursor_drift".into());
154 log::warn!(
155 "plan '{}': cursor has drifted (plan snapshot: {:?}, current: {:?}) — \
156 proceeding because --force was passed",
157 artifact.export_name,
158 artifact.computed.cursor_snapshot,
159 current,
160 );
161 }
162 }
163
164 let chunk_source = if artifact.computed.chunk_ranges.is_empty() {
166 ChunkSource::Detect
167 } else {
168 ChunkSource::Precomputed(artifact.computed.chunk_ranges.clone())
169 };
170
171 let apply_context = ApplyContext {
174 plan_id: artifact.plan_id.clone(),
175 forced: force,
176 force_bypassed,
177 };
178 let plan = artifact.resolved_plan.clone();
179 super::run_export_job_with_chunk_source(
180 &plan,
181 &state,
182 chunk_source,
183 plan_file,
184 Some(apply_context),
185 )
186}
187
188fn reject_unrecoverable_inline_url(artifact: &PlanArtifact) -> Result<()> {
199 let source = &artifact.resolved_plan.source;
200
201 let url_redacted = source
202 .url
203 .as_deref()
204 .is_some_and(|u| u.contains("REDACTED@"));
205 if !url_redacted {
206 return Ok(());
207 }
208
209 let has_recovery = source.url_env.is_some()
213 || source.url_file.is_some()
214 || source.password_env.is_some()
215 || (source.host.is_some() && source.user.is_some());
216 if has_recovery {
217 return Ok(());
218 }
219
220 anyhow::bail!(
221 "plan '{}': source credentials were stripped from this artifact and cannot be \
222 recovered at apply time — the plan was created from an inline `url:` config, whose \
223 password is never persisted.\n \
224 Fix: re-plan from a config that uses `url_env: <VAR>` (or `url_file:`) so `rivet apply` \
225 can resolve credentials, e.g.\n \
226 source:\n type: {}\n url_env: DATABASE_URL\n \
227 then `export DATABASE_URL=...` before running apply.",
228 artifact.export_name,
229 match artifact.resolved_plan.source.source_type {
230 crate::config::SourceType::Postgres => "postgres",
231 crate::config::SourceType::Mysql => "mysql",
232 crate::config::SourceType::Mssql => "mssql",
233 },
234 )
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::config::{
241 CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
242 SourceType,
243 };
244 use crate::plan::{
245 ComputedPlanData, ExtractionStrategy, PlanArtifact, PlanDiagnostics, ResolvedRunPlan,
246 };
247 use crate::tuning::SourceTuning;
248 use chrono::{Duration, Utc};
249
250 fn unreachable_plan() -> ResolvedRunPlan {
251 ResolvedRunPlan {
252 export_name: "orders".into(),
253 base_query: "SELECT 1".into(),
254 strategy: ExtractionStrategy::Snapshot,
255 format: FormatType::Parquet,
256 compression: CompressionType::Zstd,
257 compression_level: None,
258 max_file_size_bytes: None,
259 skip_empty: false,
260 meta_columns: MetaColumns::default(),
261 destination: DestinationConfig {
262 destination_type: DestinationType::Local,
263 path: Some("/tmp/rivet_apply_test".into()),
264 ..Default::default()
265 },
266 quality: None,
267 tuning: SourceTuning::from_config(None),
268 tuning_profile_label: "balanced".into(),
269 validate: false,
270 reconcile: false,
271 resume: false,
272 source: SourceConfig {
280 source_type: SourceType::Postgres,
281 url: Some("postgresql://127.0.0.2:9999/nonexistent".into()),
282 url_env: None,
283 url_file: None,
284 host: None,
285 port: None,
286 user: None,
287 password: None,
288 password_env: None,
289 database: None,
290 environment: None,
291 tuning: None,
292 tls: None,
293 },
294 column_overrides: Default::default(),
295 verify: crate::config::VerifyMode::Size,
296 schema_drift_policy: Default::default(),
297 shape_drift_warn_factor: 0.0,
298 parquet: None,
299 }
300 }
301
302 fn fresh_artifact() -> PlanArtifact {
303 PlanArtifact::new(
304 "orders".into(),
305 "full".into(),
306 String::new(),
307 unreachable_plan(),
308 ComputedPlanData {
309 chunk_ranges: vec![],
310 chunk_count: 0,
311 cursor_snapshot: None,
312 row_estimate: None,
313 },
314 PlanDiagnostics {
315 verdict: "Efficient".into(),
316 warnings: vec![],
317 recommended_profile: "balanced".into(),
318 },
319 )
320 }
321
322 fn write_artifact(dir: &tempfile::TempDir, artifact: &PlanArtifact) -> String {
323 let path = dir.path().join("plan.json");
324 let json = artifact.to_json_pretty().expect("serialize");
325 std::fs::write(&path, json).expect("write plan.json");
326 path.to_str().unwrap().to_string()
327 }
328
329 #[test]
332 fn stale_error_without_force_is_rejected() {
333 let mut artifact = fresh_artifact();
334 artifact.created_at = Utc::now() - Duration::hours(25);
335 let dir = tempfile::TempDir::new().unwrap();
336 let path = write_artifact(&dir, &artifact);
337
338 let err = run_apply_command(&path, false).unwrap_err();
339 let msg = format!("{err:#}");
340 assert!(
341 msg.contains("hours old") || msg.contains("24 h"),
342 "expected staleness error: {msg}"
343 );
344 }
345
346 #[test]
349 fn cursor_drift_detected_no_prior_state() {
350 let mut artifact = fresh_artifact();
351 artifact.computed.cursor_snapshot = Some("2025-06-01T00:00:00Z".into());
353 let dir = tempfile::TempDir::new().unwrap();
354 let path = write_artifact(&dir, &artifact);
355
356 let err = run_apply_command(&path, false).unwrap_err();
357 let msg = format!("{err:#}");
358 assert!(
359 msg.contains("drifted") || msg.contains("cursor"),
360 "expected cursor drift error: {msg}"
361 );
362 }
363
364 #[test]
367 fn missing_plan_file_returns_error() {
368 let err = run_apply_command("/tmp/rivet_nonexistent_xyzxyz.json", false).unwrap_err();
369 let msg = format!("{err:#}");
370 assert!(
371 msg.contains("cannot read") || msg.contains("No such file"),
372 "expected file-not-found: {msg}"
373 );
374 }
375
376 #[test]
377 fn corrupt_plan_file_returns_parse_error() {
378 let dir = tempfile::TempDir::new().unwrap();
379 let path = dir.path().join("plan.json");
380 std::fs::write(&path, b"not valid json at all").unwrap();
381 let err = run_apply_command(path.to_str().unwrap(), false).unwrap_err();
382 let msg = format!("{err:#}");
383 assert!(
384 msg.contains("invalid plan") || msg.contains("JSON") || msg.contains("expected"),
385 "expected parse error: {msg}"
386 );
387 }
388
389 #[test]
396 fn apply_rejects_tampered_base_query() {
397 let artifact = fresh_artifact();
398 let dir = tempfile::TempDir::new().unwrap();
399 let path = write_artifact(&dir, &artifact);
400
401 let json = std::fs::read_to_string(&path).unwrap();
405 assert!(
406 json.contains("SELECT 1"),
407 "fixture must embed the planned base_query"
408 );
409 let tampered = json.replace("SELECT 1", "SELECT * FROM secrets");
410 std::fs::write(&path, &tampered).unwrap();
411
412 let err = run_apply_command(&path, false).unwrap_err();
413 let msg = format!("{err:#}");
414 assert!(
415 msg.contains("integrity check failed") && msg.contains("modified after planning"),
416 "tampered plan must be rejected at the integrity gate, got: {msg}"
417 );
418
419 let err_forced = run_apply_command(&path, true).unwrap_err();
421 let msg_forced = format!("{err_forced:#}");
422 assert!(
423 msg_forced.contains("integrity check failed"),
424 "--force must not bypass the integrity gate, got: {msg_forced}"
425 );
426 }
427
428 }