use chrono::Duration;
use std::path::Path;
use crate::error::Result;
use crate::plan::{PlanArtifact, StalenessCheck};
use crate::state::StateStore;
use super::chunked::ChunkSource;
use super::summary::ApplyContext;
pub fn run_apply_command(plan_file: &str, force: bool) -> Result<()> {
let artifact = PlanArtifact::from_file(plan_file)?;
let mut force_bypassed: Vec<String> = Vec::new();
let warn_threshold = Duration::hours(1);
let error_threshold = Duration::hours(24);
match artifact.staleness(warn_threshold, error_threshold) {
StalenessCheck::Fresh => {}
StalenessCheck::StaleWarn(age) => {
log::warn!(
"plan '{}' is {} minutes old — consider regenerating with `rivet plan`",
artifact.export_name,
age.num_minutes()
);
}
StalenessCheck::StaleError(age) => {
let age_phrase = if age.num_hours() >= 48 {
format!(
"{} days old (created {})",
age.num_days(),
artifact.created_at.format("%Y-%m-%d")
)
} else {
format!("{} hours old", age.num_hours())
};
if !force {
anyhow::bail!(
"plan '{}' is {} (limit: 24 h). Regenerate with `rivet plan` or pass --force to override.",
artifact.export_name,
age_phrase,
);
}
force_bypassed.push("staleness".into());
log::warn!(
"plan '{}': ignoring staleness ({}) because --force was passed",
artifact.export_name,
age_phrase,
);
}
}
let plan_dir = Path::new(plan_file)
.parent()
.unwrap_or_else(|| Path::new("."));
let state_dir = match artifact
.config_path
.as_deref()
.map(Path::new)
.and_then(Path::parent)
{
Some(dir) if dir.exists() => dir.to_path_buf(),
Some(dir) => {
log::warn!(
"plan '{}': original config dir '{}' no longer exists; opening state next to plan file instead. \
Cursors and manifest history from the original run will not be visible.",
artifact.export_name,
dir.display(),
);
plan_dir.to_path_buf()
}
None => {
log::warn!(
"plan '{}': artifact has no recorded config path (pre-0.7.5 plan?). \
Opening state next to the plan file; this may diverge from the state \
used by `rivet run` for the same config.",
artifact.export_name,
);
plan_dir.to_path_buf()
}
};
let state_path = state_dir.join(".rivet_state.db");
let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
if artifact.computed.cursor_snapshot.is_some() {
let current = state.get(&artifact.export_name)?.last_cursor_value;
if !artifact.cursor_matches(current.as_deref()) {
if !force {
anyhow::bail!(
"plan '{}': cursor has drifted since plan was generated \
(plan snapshot: {:?}, current: {:?}). \
Regenerate with `rivet plan` or pass --force to skip this check.",
artifact.export_name,
artifact.computed.cursor_snapshot,
current,
);
}
force_bypassed.push("cursor_drift".into());
log::warn!(
"plan '{}': cursor has drifted (plan snapshot: {:?}, current: {:?}) — \
proceeding because --force was passed",
artifact.export_name,
artifact.computed.cursor_snapshot,
current,
);
}
}
let chunk_source = if artifact.computed.chunk_ranges.is_empty() {
ChunkSource::Detect
} else {
ChunkSource::Precomputed(artifact.computed.chunk_ranges.clone())
};
let apply_context = ApplyContext {
plan_id: artifact.plan_id.clone(),
forced: force,
force_bypassed,
};
let plan = artifact.resolved_plan.clone();
super::run_export_job_with_chunk_source(
&plan,
&state,
chunk_source,
plan_file,
Some(apply_context),
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
SourceType,
};
use crate::plan::{
ComputedPlanData, ExtractionStrategy, PlanArtifact, PlanDiagnostics, ResolvedRunPlan,
};
use crate::tuning::SourceTuning;
use chrono::{Duration, Utc};
fn unreachable_plan() -> ResolvedRunPlan {
ResolvedRunPlan {
export_name: "orders".into(),
base_query: "SELECT 1".into(),
strategy: ExtractionStrategy::Snapshot,
format: FormatType::Parquet,
compression: CompressionType::Zstd,
compression_level: None,
max_file_size_bytes: None,
skip_empty: false,
meta_columns: MetaColumns::default(),
destination: DestinationConfig {
destination_type: DestinationType::Local,
path: Some("/tmp/rivet_apply_test".into()),
..Default::default()
},
quality: None,
tuning: SourceTuning::from_config(None),
tuning_profile_label: "balanced".into(),
validate: false,
reconcile: false,
resume: false,
source: SourceConfig {
source_type: SourceType::Postgres,
url: Some("postgresql://nobody:wrong@127.0.0.2:9999/nonexistent".into()),
url_env: None,
url_file: None,
host: None,
port: None,
user: None,
password: None,
password_env: None,
database: None,
environment: None,
tuning: None,
tls: None,
},
column_overrides: Default::default(),
schema_drift_policy: Default::default(),
shape_drift_warn_factor: 0.0,
parquet: None,
}
}
fn fresh_artifact() -> PlanArtifact {
PlanArtifact::new(
"orders".into(),
"full".into(),
String::new(),
unreachable_plan(),
ComputedPlanData {
chunk_ranges: vec![],
chunk_count: 0,
cursor_snapshot: None,
row_estimate: None,
},
PlanDiagnostics {
verdict: "Efficient".into(),
warnings: vec![],
recommended_profile: "balanced".into(),
},
)
}
fn write_artifact(dir: &tempfile::TempDir, artifact: &PlanArtifact) -> String {
let path = dir.path().join("plan.json");
let json = artifact.to_json_pretty().expect("serialize");
std::fs::write(&path, json).expect("write plan.json");
path.to_str().unwrap().to_string()
}
#[test]
fn stale_error_without_force_is_rejected() {
let mut artifact = fresh_artifact();
artifact.created_at = Utc::now() - Duration::hours(25);
let dir = tempfile::TempDir::new().unwrap();
let path = write_artifact(&dir, &artifact);
let err = run_apply_command(&path, false).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("hours old") || msg.contains("24 h"),
"expected staleness error: {msg}"
);
}
#[test]
fn cursor_drift_detected_no_prior_state() {
let mut artifact = fresh_artifact();
artifact.computed.cursor_snapshot = Some("2025-06-01T00:00:00Z".into());
let dir = tempfile::TempDir::new().unwrap();
let path = write_artifact(&dir, &artifact);
let err = run_apply_command(&path, false).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("drifted") || msg.contains("cursor"),
"expected cursor drift error: {msg}"
);
}
#[test]
fn missing_plan_file_returns_error() {
let err = run_apply_command("/tmp/rivet_nonexistent_xyzxyz.json", false).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("cannot read") || msg.contains("No such file"),
"expected file-not-found: {msg}"
);
}
#[test]
fn corrupt_plan_file_returns_parse_error() {
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("plan.json");
std::fs::write(&path, b"not valid json at all").unwrap();
let err = run_apply_command(path.to_str().unwrap(), false).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("invalid plan") || msg.contains("JSON") || msg.contains("expected"),
"expected parse error: {msg}"
);
}
}