1use chrono::Duration;
13use std::path::Path;
14
15use crate::error::Result;
16use crate::plan::{PlanArtifact, StalenessCheck};
17use crate::state::StateStore;
18
19use super::chunked::ChunkSource;
20use super::summary::ApplyContext;
21
22pub fn run_apply_command(plan_file: &str, force: bool, parallel: bool, resume: bool) -> Result<()> {
24 if plan_file.ends_with(".yaml") || plan_file.ends_with(".yml") {
28 return super::run::run_waves(plan_file, force, parallel, resume);
29 }
30 if parallel || resume {
31 log::warn!(
32 "--parallel-export-processes / --resume apply only to wave-ordered config execution; ignored for a sealed plan artifact"
33 );
34 }
35
36 let artifact = PlanArtifact::from_file(plan_file)?;
38
39 artifact.verify_integrity()?;
45
46 reject_unrecoverable_inline_url(&artifact)?;
52
53 let mut force_bypassed: Vec<String> = Vec::new();
57
58 let warn_threshold = Duration::hours(1);
60 let error_threshold = Duration::hours(24);
61 match artifact.staleness(warn_threshold, error_threshold) {
62 StalenessCheck::Fresh => {}
63 StalenessCheck::StaleWarn(age) => {
64 log::warn!(
65 "plan '{}' is {} minutes old — consider regenerating with `rivet plan`",
66 artifact.export_name,
67 age.num_minutes()
68 );
69 }
70 StalenessCheck::StaleError(age) => {
71 let age_phrase = if age.num_hours() >= 48 {
76 format!(
77 "{} days old (created {})",
78 age.num_days(),
79 artifact.created_at.format("%Y-%m-%d")
80 )
81 } else {
82 format!("{} hours old", age.num_hours())
83 };
84 if !force {
85 anyhow::bail!(
86 "plan '{}' is {} (limit: 24 h). Regenerate with `rivet plan` or pass --force to override.",
87 artifact.export_name,
88 age_phrase,
89 );
90 }
91 force_bypassed.push("staleness".into());
92 log::warn!(
93 "plan '{}': ignoring staleness ({}) because --force was passed",
94 artifact.export_name,
95 age_phrase,
96 );
97 }
98 }
99
100 let plan_dir = Path::new(plan_file)
114 .parent()
115 .unwrap_or_else(|| Path::new("."));
116 let state_dir = match artifact
117 .config_path
118 .as_deref()
119 .map(Path::new)
120 .and_then(Path::parent)
121 {
122 Some(dir) if dir.exists() => dir.to_path_buf(),
123 Some(dir) => {
124 log::warn!(
125 "plan '{}': original config dir '{}' no longer exists; opening state next to plan file instead. \
126 Cursors and manifest history from the original run will not be visible.",
127 artifact.export_name,
128 dir.display(),
129 );
130 plan_dir.to_path_buf()
131 }
132 None => {
133 log::warn!(
134 "plan '{}': artifact has no recorded config path (pre-0.7.5 plan?). \
135 Opening state next to the plan file; this may diverge from the state \
136 used by `rivet run` for the same config.",
137 artifact.export_name,
138 );
139 plan_dir.to_path_buf()
140 }
141 };
142 let state_path = state_dir.join(".rivet_state.db");
143 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
144
145 if artifact.computed.cursor_snapshot.is_some() {
153 let current = state.get(&artifact.export_name)?.last_cursor_value;
154 if !artifact.cursor_matches(current.as_deref()) {
155 if !force {
156 anyhow::bail!(
157 "plan '{}': cursor has drifted since plan was generated \
158 (plan snapshot: {:?}, current: {:?}). \
159 Regenerate with `rivet plan` or pass --force to skip this check.",
160 artifact.export_name,
161 artifact.computed.cursor_snapshot,
162 current,
163 );
164 }
165 force_bypassed.push("cursor_drift".into());
166 log::warn!(
167 "plan '{}': cursor has drifted (plan snapshot: {:?}, current: {:?}) — \
168 proceeding because --force was passed",
169 artifact.export_name,
170 artifact.computed.cursor_snapshot,
171 current,
172 );
173 }
174 }
175
176 let chunk_source = if artifact.computed.chunk_ranges.is_empty() {
178 ChunkSource::Detect
179 } else {
180 ChunkSource::Precomputed(artifact.computed.chunk_ranges.clone())
181 };
182
183 let apply_context = ApplyContext {
186 plan_id: artifact.plan_id.clone(),
187 forced: force,
188 force_bypassed,
189 };
190 let plan = artifact.resolved_plan.clone();
191 super::run_export_job_with_chunk_source(
192 &plan,
193 &state,
194 chunk_source,
195 plan_file,
196 Some(apply_context),
197 )
198}
199
200fn reject_unrecoverable_inline_url(artifact: &PlanArtifact) -> Result<()> {
211 let source = &artifact.resolved_plan.source;
212
213 let url_redacted = source
214 .url
215 .as_deref()
216 .is_some_and(|u| u.contains("REDACTED@"));
217 if !url_redacted {
218 return Ok(());
219 }
220
221 let has_recovery = source.url_env.is_some()
225 || source.url_file.is_some()
226 || source.password_env.is_some()
227 || (source.host.is_some() && source.user.is_some());
228 if has_recovery {
229 return Ok(());
230 }
231
232 anyhow::bail!(
233 "plan '{}': source credentials were stripped from this artifact and cannot be \
234 recovered at apply time — the plan was created from an inline `url:` config, whose \
235 password is never persisted.\n \
236 Fix: re-plan from a config that uses `url_env: <VAR>` (or `url_file:`) so `rivet apply` \
237 can resolve credentials, e.g.\n \
238 source:\n type: {}\n url_env: DATABASE_URL\n \
239 then `export DATABASE_URL=...` before running apply.",
240 artifact.export_name,
241 match artifact.resolved_plan.source.source_type {
242 crate::config::SourceType::Postgres => "postgres",
243 crate::config::SourceType::Mysql => "mysql",
244 crate::config::SourceType::Mssql => "mssql",
245 },
246 )
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252 use crate::config::{
253 CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
254 SourceType,
255 };
256 use crate::plan::{
257 ComputedPlanData, ExtractionStrategy, PlanArtifact, PlanDiagnostics, ResolvedRunPlan,
258 };
259 use crate::tuning::SourceTuning;
260 use chrono::{Duration, Utc};
261
262 fn unreachable_plan() -> ResolvedRunPlan {
263 ResolvedRunPlan {
264 export_name: "orders".into(),
265 base_query: "SELECT 1".into(),
266 strategy: ExtractionStrategy::Snapshot,
267 format: FormatType::Parquet,
268 compression: CompressionType::Zstd,
269 compression_level: None,
270 max_file_size_bytes: None,
271 skip_empty: false,
272 meta_columns: MetaColumns::default(),
273 destination: DestinationConfig {
274 destination_type: DestinationType::Local,
275 path: Some("/tmp/rivet_apply_test".into()),
276 ..Default::default()
277 },
278 quality: None,
279 tuning: SourceTuning::from_config(None),
280 tuning_profile_label: "balanced".into(),
281 validate: false,
282 reconcile: false,
283 resume: false,
284 source: SourceConfig {
292 source_type: SourceType::Postgres,
293 url: Some("postgresql://127.0.0.2:9999/nonexistent".into()),
294 url_env: None,
295 url_file: None,
296 host: None,
297 port: None,
298 user: None,
299 password: None,
300 password_env: None,
301 database: None,
302 environment: None,
303 tuning: None,
304 tls: None,
305 },
306 column_overrides: Default::default(),
307 verify: crate::config::VerifyMode::Size,
308 schema_drift_policy: Default::default(),
309 shape_drift_warn_factor: 0.0,
310 parquet: None,
311 }
312 }
313
314 fn fresh_artifact() -> PlanArtifact {
315 PlanArtifact::new(
316 "orders".into(),
317 "full".into(),
318 String::new(),
319 unreachable_plan(),
320 ComputedPlanData {
321 chunk_ranges: vec![],
322 chunk_count: 0,
323 cursor_snapshot: None,
324 row_estimate: None,
325 },
326 PlanDiagnostics {
327 verdict: "Efficient".into(),
328 warnings: vec![],
329 recommended_profile: "balanced".into(),
330 strategy_rationale: String::new(),
331 },
332 )
333 }
334
335 fn write_artifact(dir: &tempfile::TempDir, artifact: &PlanArtifact) -> String {
336 let path = dir.path().join("plan.json");
337 let json = artifact.to_json_pretty().expect("serialize");
338 std::fs::write(&path, json).expect("write plan.json");
339 path.to_str().unwrap().to_string()
340 }
341
342 #[test]
345 fn stale_error_without_force_is_rejected() {
346 let mut artifact = fresh_artifact();
347 artifact.created_at = Utc::now() - Duration::hours(25);
348 let dir = tempfile::TempDir::new().unwrap();
349 let path = write_artifact(&dir, &artifact);
350
351 let err = run_apply_command(&path, false, false, false).unwrap_err();
352 let msg = format!("{err:#}");
353 assert!(
354 msg.contains("hours old") || msg.contains("24 h"),
355 "expected staleness error: {msg}"
356 );
357 }
358
359 #[test]
362 fn cursor_drift_detected_no_prior_state() {
363 let mut artifact = fresh_artifact();
364 artifact.computed.cursor_snapshot = Some("2025-06-01T00:00:00Z".into());
366 let dir = tempfile::TempDir::new().unwrap();
367 let path = write_artifact(&dir, &artifact);
368
369 let err = run_apply_command(&path, false, false, false).unwrap_err();
370 let msg = format!("{err:#}");
371 assert!(
372 msg.contains("drifted") || msg.contains("cursor"),
373 "expected cursor drift error: {msg}"
374 );
375 }
376
377 #[test]
380 fn missing_plan_file_returns_error() {
381 let err = run_apply_command("/tmp/rivet_nonexistent_xyzxyz.json", false, false, false)
382 .unwrap_err();
383 let msg = format!("{err:#}");
384 assert!(
385 msg.contains("cannot read") || msg.contains("No such file"),
386 "expected file-not-found: {msg}"
387 );
388 }
389
390 #[test]
391 fn corrupt_plan_file_returns_parse_error() {
392 let dir = tempfile::TempDir::new().unwrap();
393 let path = dir.path().join("plan.json");
394 std::fs::write(&path, b"not valid json at all").unwrap();
395 let err = run_apply_command(path.to_str().unwrap(), false, false, false).unwrap_err();
396 let msg = format!("{err:#}");
397 assert!(
398 msg.contains("invalid plan") || msg.contains("JSON") || msg.contains("expected"),
399 "expected parse error: {msg}"
400 );
401 }
402
403 #[test]
410 fn apply_rejects_tampered_base_query() {
411 let artifact = fresh_artifact();
412 let dir = tempfile::TempDir::new().unwrap();
413 let path = write_artifact(&dir, &artifact);
414
415 let json = std::fs::read_to_string(&path).unwrap();
419 assert!(
420 json.contains("SELECT 1"),
421 "fixture must embed the planned base_query"
422 );
423 let tampered = json.replace("SELECT 1", "SELECT * FROM secrets");
424 std::fs::write(&path, &tampered).unwrap();
425
426 let err = run_apply_command(&path, false, false, false).unwrap_err();
427 let msg = format!("{err:#}");
428 assert!(
429 msg.contains("integrity check failed") && msg.contains("modified after planning"),
430 "tampered plan must be rejected at the integrity gate, got: {msg}"
431 );
432
433 let err_forced = run_apply_command(&path, true, false, false).unwrap_err();
435 let msg_forced = format!("{err_forced:#}");
436 assert!(
437 msg_forced.contains("integrity check failed"),
438 "--force must not bypass the integrity gate, got: {msg_forced}"
439 );
440 }
441
442 }