use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::plan::ResolvedRunPlan;
use super::prioritization::{CampaignRecommendation, ExportRecommendation};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanArtifact {
pub rivet_version: String,
pub plan_id: String,
pub created_at: DateTime<Utc>,
pub expires_at: DateTime<Utc>,
pub export_name: String,
pub strategy: String,
pub plan_fingerprint: String,
#[serde(default)]
pub integrity: String,
pub resolved_plan: ResolvedRunPlan,
pub computed: ComputedPlanData,
pub diagnostics: PlanDiagnostics,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub prioritization: Option<PlanPrioritizationSnapshot>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanPrioritizationSnapshot {
pub export_recommendation: ExportRecommendation,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub campaign: Option<CampaignRecommendation>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComputedPlanData {
pub chunk_ranges: Vec<(i64, i64)>,
pub chunk_count: usize,
pub cursor_snapshot: Option<String>,
pub row_estimate: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanDiagnostics {
pub verdict: String,
pub warnings: Vec<String>,
pub recommended_profile: String,
#[serde(default)]
pub strategy_rationale: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StalenessCheck {
Fresh,
StaleWarn(Duration),
StaleError(Duration),
}
impl PlanArtifact {
pub fn new(
export_name: String,
strategy: String,
plan_fingerprint: String,
mut resolved_plan: ResolvedRunPlan,
computed: ComputedPlanData,
diagnostics: PlanDiagnostics,
) -> Self {
let (safe_source, was_redacted) = resolved_plan.source.redact_for_artifact();
resolved_plan.source = safe_source;
if was_redacted {
log::warn!(
"plan '{}': plaintext credentials stripped from artifact — \
apply time must have equivalent env/file-based auth available",
export_name
);
}
let integrity = resolved_plan_integrity(&resolved_plan);
let created_at = Utc::now();
Self {
rivet_version: env!("CARGO_PKG_VERSION").to_string(),
plan_id: new_plan_id(),
created_at,
expires_at: created_at + Duration::hours(24),
export_name,
strategy,
plan_fingerprint,
integrity,
resolved_plan,
computed,
diagnostics,
prioritization: None,
config_path: None,
}
}
pub fn verify_integrity(&self) -> Result<()> {
if self.integrity.is_empty() {
log::warn!(
"plan '{}': artifact predates the corruption checksum (no `integrity` field); \
cannot detect an accidental edit after planning. Re-run `rivet plan` to \
produce a checksummed artifact.",
self.export_name,
);
return Ok(());
}
let actual = resolved_plan_integrity(&self.resolved_plan);
if actual != self.integrity {
anyhow::bail!(
"plan artifact integrity check failed: resolved_plan was modified after planning \
(export '{}', sealed {}, recomputed {}). \
Do not hand-edit plan files — re-run `rivet plan` to regenerate.",
self.export_name,
self.integrity,
actual,
);
}
Ok(())
}
pub fn to_json_pretty(&self) -> Result<String> {
Ok(serde_json::to_string_pretty(self)?)
}
#[allow(dead_code)]
pub fn to_json_compact(&self) -> Result<String> {
Ok(serde_json::to_string(self)?)
}
pub fn from_json(s: &str) -> Result<Self> {
Ok(serde_json::from_str(s)?)
}
pub fn from_file(path: &str) -> Result<Self> {
let content = std::fs::read_to_string(path)
.map_err(|e| anyhow::anyhow!("cannot read plan file '{}': {}", path, e))?;
Self::from_json(&content).map_err(|e| {
anyhow::anyhow!(
"plan file '{}' is not a valid Rivet plan artifact.\n \
Hint: only files generated by `rivet plan` are accepted. \
Parser detail: {}",
path,
e
)
})
}
pub fn staleness(&self, warn_after: Duration, error_after: Duration) -> StalenessCheck {
let age = Utc::now().signed_duration_since(self.created_at);
if age >= error_after {
StalenessCheck::StaleError(age)
} else if age >= warn_after {
StalenessCheck::StaleWarn(age)
} else {
StalenessCheck::Fresh
}
}
pub fn cursor_matches(&self, current_cursor: Option<&str>) -> bool {
match &self.computed.cursor_snapshot {
None => true, Some(snap) => current_cursor == Some(snap.as_str()),
}
}
pub fn summary_line(&self, name_width: usize) -> String {
let wave = self
.prioritization
.as_ref()
.map(|p| p.export_recommendation.recommended_wave.to_string())
.unwrap_or_else(|| "—".to_string());
let rows = self
.computed
.row_estimate
.map(|e| format!("~{}", format_rows(e)))
.unwrap_or_else(|| "—".to_string());
let name = if self.export_name.chars().count() > name_width {
let head: String = self.export_name.chars().take(name_width - 1).collect();
format!("{head}…")
} else {
self.export_name.clone()
};
Self::compact_row(
&wave,
&name,
&self.strategy.to_string(),
&rows,
&self.diagnostics.verdict,
name_width,
)
}
pub fn summary_header(name_width: usize) -> String {
Self::compact_row("Wave", "Export", "Strategy", "Rows", "Verdict", name_width)
}
fn compact_row(
wave: &str,
export: &str,
strategy: &str,
rows: &str,
verdict: &str,
name_width: usize,
) -> String {
format!(" {wave:<5} {export:<name_width$} {strategy:<11} {rows:<12} {verdict}")
}
pub fn print_summary(&self) {
println!();
println!(" Plan ID : {}", self.plan_id);
println!(
" Created : {}",
self.created_at.format("%Y-%m-%d %H:%M:%S UTC")
);
println!(
" Expires : {}",
self.expires_at.format("%Y-%m-%d %H:%M:%S UTC")
);
println!(" Export : {}", self.export_name);
println!(" Strategy : {}", self.strategy);
if !self.plan_fingerprint.is_empty() {
println!(" Fingerprint : {}", self.plan_fingerprint);
}
if self.computed.chunk_count > 0 {
println!(" Chunks : {}", self.computed.chunk_count);
}
if let Some(est) = self.computed.row_estimate {
println!(" Row est. : ~{}", format_rows(est));
}
if let Some(ref cur) = self.computed.cursor_snapshot {
println!(" Cursor : {}", cur);
}
println!(" Verdict : {}", self.diagnostics.verdict);
println!(" Profile : {}", self.diagnostics.recommended_profile);
if !self.diagnostics.strategy_rationale.is_empty() {
println!(" Strategy :");
println!(" Why: {}", self.diagnostics.strategy_rationale);
}
if !self.diagnostics.warnings.is_empty() {
println!(" Warnings :");
for w in &self.diagnostics.warnings {
println!(" • {}", w);
}
}
if let Some(ref p) = self.prioritization {
println!(
" Priority : score {} — {:?} (wave {})",
p.export_recommendation.priority_score,
p.export_recommendation.priority_class,
p.export_recommendation.recommended_wave
);
if p.export_recommendation.isolate_on_source {
println!(" Isolate : yes (shared source advisory)");
}
if !p.export_recommendation.reasons.is_empty() {
println!(" Prioritize :");
for r in &p.export_recommendation.reasons {
println!(" • [{}] {}", r.kind.as_str(), r.message);
}
}
if let Some(ref c) = p.campaign {
if c.source_group_warnings.is_empty() {
println!(
" Campaign : {} export(s) ordered by advisory score",
c.ordered_exports.len()
);
} else {
println!(" Campaign :");
for w in &c.source_group_warnings {
println!(" • {}", w);
}
}
}
}
let res = self.resolved_plan.tuning.resource_summary();
println!(" Resources:");
if let Some(mem_mb) = res.batch_size_memory_mb {
println!(" Batch size : adaptive (target {} MB/batch)", mem_mb);
} else {
println!(
" Batch size : {:>7} rows",
format_number(res.batch_size)
);
}
println!(
" Batch memory : ~{:.0} MB (narrow) – ~{:.0} MB (wide)",
res.batch_narrow_mb, res.batch_wide_mb
);
if res.memory_threshold_mb > 0 {
println!(
" RSS guard : {} MB",
format_number(res.memory_threshold_mb)
);
}
if res.throttle_ms > 0 {
println!(" Throttle : {} ms between batches", res.throttle_ms);
}
if res.wide_table_risk {
println!(
" ⚠ Wide tables may use up to {:.0} MB/batch — consider batch_size_memory_mb or a lower batch_size",
res.batch_wide_mb
);
}
let dest = &self.resolved_plan.destination;
let dest_label = dest.destination_type.label();
let dest_location = dest
.path
.as_deref()
.or(dest.bucket.as_deref())
.unwrap_or("(default)");
println!(" Output : {} → {}", dest_label, dest_location);
let fmt = self.resolved_plan.format.label();
let comp = self.resolved_plan.compression.label();
if let Some(level) = self.resolved_plan.compression_level {
println!(" Format : {} + {} (level {})", fmt, comp, level);
} else {
println!(" Format : {} + {}", fmt, comp);
}
if let (crate::config::FormatType::Parquet, Some(pc)) =
(self.resolved_plan.format, &self.resolved_plan.parquet)
{
let strategy = pc.row_group_strategy.unwrap_or_default();
match strategy {
crate::config::RowGroupStrategy::FixedRows => {
if let Some(rows) = pc.row_group_rows {
println!(" Row group: fixed {} rows", format_number(rows));
}
}
_ => {
let target_mb = pc
.target_row_group_mb
.unwrap_or(crate::config::ParquetConfig::DEFAULT_TARGET_ROW_GROUP_MB);
println!(
" Row group: {:?} (target {} MB{})",
strategy,
target_mb,
pc.max_row_group_mb
.map(|m| format!(", max {} MB", m))
.unwrap_or_default(),
);
}
}
}
println!();
}
}
fn resolved_plan_integrity(plan: &ResolvedRunPlan) -> String {
use xxhash_rust::xxh3::xxh3_64;
let encoded = serde_json::to_value(plan).and_then(|mut v| {
canonicalize_value(&mut v);
serde_json::to_vec(&v)
});
match encoded {
Ok(bytes) => format!("xxh3:{:016x}", xxh3_64(&bytes)),
Err(e) => {
log::error!(
"plan integrity checksum: failed to canonicalize resolved_plan ({e}); \
emitting a non-matching value so apply rejects this artifact"
);
"xxh3:unserializable".to_string()
}
}
}
fn canonicalize_value(value: &mut serde_json::Value) {
match value {
serde_json::Value::Object(map) => {
let mut entries: Vec<(String, serde_json::Value)> =
map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
entries.sort_by(|a, b| a.0.cmp(&b.0));
map.clear();
for (k, mut v) in entries {
canonicalize_value(&mut v);
map.insert(k, v);
}
}
serde_json::Value::Array(items) => {
for item in items {
canonicalize_value(item);
}
}
_ => {}
}
}
fn new_plan_id() -> String {
let hi = rand::random::<u64>();
let lo = rand::random::<u64>();
format!("{:016x}{:016x}", hi, lo)
}
fn format_number(n: usize) -> String {
let s = n.to_string();
let bytes = s.as_bytes();
let mut out = String::with_capacity(s.len() + s.len() / 3);
for (i, &b) in bytes.iter().enumerate() {
if i > 0 && (bytes.len() - i).is_multiple_of(3) {
out.push(',');
}
out.push(b as char);
}
out
}
fn format_rows(n: i64) -> String {
let s = n.to_string();
let bytes = s.as_bytes();
let mut out = String::with_capacity(s.len() + s.len() / 3);
for (i, &b) in bytes.iter().enumerate() {
if i > 0 && (bytes.len() - i).is_multiple_of(3) {
out.push(',');
}
out.push(b as char);
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
SourceType,
};
use crate::plan::{ChunkedPlan, ExtractionStrategy};
use crate::tuning::SourceTuning;
fn minimal_plan() -> ResolvedRunPlan {
ResolvedRunPlan {
export_name: "orders".into(),
base_query: "SELECT * FROM orders".into(),
strategy: ExtractionStrategy::Snapshot,
format: FormatType::Parquet,
compression: CompressionType::Zstd,
compression_level: None,
max_file_size_bytes: None,
skip_empty: false,
meta_columns: MetaColumns::default(),
destination: DestinationConfig {
destination_type: DestinationType::Local,
path: Some("./out".into()),
..Default::default()
},
quality: None,
tuning: SourceTuning::from_config(None),
tuning_profile_label: "balanced (default)".into(),
validate: false,
reconcile: false,
resume: false,
source: SourceConfig {
source_type: SourceType::Postgres,
url: Some("postgresql://localhost/test".into()),
url_env: None,
url_file: None,
host: None,
port: None,
user: None,
password: None,
password_env: None,
database: None,
environment: None,
tuning: None,
tls: None,
},
column_overrides: Default::default(),
verify: crate::config::VerifyMode::Size,
schema_drift_policy: Default::default(),
shape_drift_warn_factor: 2.0,
parquet: None,
}
}
fn minimal_artifact() -> PlanArtifact {
PlanArtifact::new(
"orders".into(),
"full".into(),
String::new(),
minimal_plan(),
ComputedPlanData {
chunk_ranges: vec![],
chunk_count: 0,
cursor_snapshot: None,
row_estimate: Some(42),
},
PlanDiagnostics {
verdict: "Efficient".into(),
warnings: vec![],
recommended_profile: "balanced".into(),
strategy_rationale: String::new(),
},
)
}
#[test]
fn round_trip_json() {
let artifact = minimal_artifact();
let json = artifact.to_json_pretty().unwrap();
let restored: PlanArtifact = PlanArtifact::from_json(&json).unwrap();
assert_eq!(restored.export_name, artifact.export_name);
assert_eq!(restored.plan_id, artifact.plan_id);
assert_eq!(restored.computed.row_estimate, Some(42));
assert_eq!(restored.diagnostics.verdict, "Efficient");
}
#[test]
fn strategy_rationale_serializes_and_round_trips() {
let mut artifact = minimal_artifact();
let why = "Mode full: ~42 rows, below the threshold. Risk — resumable: no.";
artifact.diagnostics.strategy_rationale = why.to_string();
let json = artifact.to_json_pretty().unwrap();
assert!(
json.contains("strategy_rationale"),
"the rationale field must appear in the JSON: {json}"
);
assert!(
json.contains("Mode full"),
"the rationale text must appear in the JSON: {json}"
);
let restored = PlanArtifact::from_json(&json).unwrap();
assert_eq!(
restored.diagnostics.strategy_rationale, why,
"rationale must survive the round trip unchanged"
);
}
#[test]
fn legacy_artifact_without_rationale_deserializes_to_empty() {
let mut artifact = minimal_artifact();
artifact.diagnostics.strategy_rationale = "should be dropped".into();
let mut value: serde_json::Value =
serde_json::from_str(&artifact.to_json_pretty().unwrap()).unwrap();
value["diagnostics"]
.as_object_mut()
.unwrap()
.remove("strategy_rationale");
let legacy_json = serde_json::to_string(&value).unwrap();
assert!(!legacy_json.contains("strategy_rationale"));
let restored = PlanArtifact::from_json(&legacy_json)
.expect("legacy artifact without the rationale field must deserialize");
assert_eq!(
restored.diagnostics.strategy_rationale, "",
"missing rationale must default to empty, not fail"
);
}
#[test]
fn round_trip_chunked() {
let mut plan = minimal_plan();
plan.strategy = ExtractionStrategy::Chunked(ChunkedPlan {
column: "id".into(),
chunk_size: 50_000,
chunk_count: None,
parallel: 4,
dense: false,
by_days: None,
checkpoint: true,
max_attempts: 3,
});
let artifact = PlanArtifact::new(
"orders".into(),
"chunked".into(),
"abc123".into(),
plan,
ComputedPlanData {
chunk_ranges: vec![(1, 50000), (50001, 100000)],
chunk_count: 2,
cursor_snapshot: None,
row_estimate: Some(100000),
},
PlanDiagnostics {
verdict: "Acceptable".into(),
warnings: vec!["sparse range".into()],
recommended_profile: "balanced".into(),
strategy_rationale: String::new(),
},
);
let json = artifact.to_json_compact().unwrap();
let restored = PlanArtifact::from_json(&json).unwrap();
assert_eq!(
restored.computed.chunk_ranges,
vec![(1, 50000), (50001, 100000)]
);
assert_eq!(restored.computed.chunk_count, 2);
assert_eq!(restored.plan_fingerprint, "abc123");
}
#[test]
fn staleness_fresh() {
let artifact = minimal_artifact();
let result = artifact.staleness(Duration::hours(1), Duration::hours(24));
assert_eq!(result, StalenessCheck::Fresh);
}
#[test]
fn staleness_warn_between_thresholds() {
let mut artifact = minimal_artifact();
artifact.created_at = Utc::now() - Duration::hours(2);
let result = artifact.staleness(Duration::hours(1), Duration::hours(24));
assert!(
matches!(result, StalenessCheck::StaleWarn(_)),
"expected StaleWarn, got {result:?}"
);
}
#[test]
fn staleness_expired_artifact() {
let mut artifact = minimal_artifact();
artifact.created_at = Utc::now() - Duration::hours(25);
let result = artifact.staleness(Duration::hours(1), Duration::hours(24));
assert!(matches!(result, StalenessCheck::StaleError(_)));
}
#[test]
fn staleness_exactly_at_warn_threshold() {
let mut artifact = minimal_artifact();
artifact.created_at = Utc::now() - Duration::hours(1) - Duration::seconds(1);
let result = artifact.staleness(Duration::hours(1), Duration::hours(24));
assert!(matches!(result, StalenessCheck::StaleWarn(_)));
}
#[test]
fn staleness_exactly_at_error_threshold() {
let mut artifact = minimal_artifact();
artifact.created_at = Utc::now() - Duration::hours(24) - Duration::seconds(1);
let result = artifact.staleness(Duration::hours(1), Duration::hours(24));
assert!(matches!(result, StalenessCheck::StaleError(_)));
}
#[test]
fn cursor_matches_none_snapshot() {
let artifact = minimal_artifact();
assert!(artifact.cursor_matches(None));
assert!(artifact.cursor_matches(Some("anything")));
}
#[test]
fn cursor_matches_incremental() {
let mut artifact = minimal_artifact();
artifact.computed.cursor_snapshot = Some("2026-01-01T00:00:00Z".into());
assert!(artifact.cursor_matches(Some("2026-01-01T00:00:00Z")));
assert!(!artifact.cursor_matches(Some("2026-01-02T00:00:00Z")));
assert!(!artifact.cursor_matches(None));
}
#[test]
fn format_rows_basic() {
assert_eq!(super::format_rows(1000), "1,000");
assert_eq!(super::format_rows(1_000_000), "1,000,000");
assert_eq!(super::format_rows(42), "42");
}
#[test]
fn artifact_strips_plaintext_password_from_source() {
let mut plan = minimal_plan();
plan.source.password = Some("s3cret!".into());
let artifact = PlanArtifact::new(
"orders".into(),
"full".into(),
String::new(),
plan,
ComputedPlanData {
chunk_ranges: vec![],
chunk_count: 0,
cursor_snapshot: None,
row_estimate: None,
},
PlanDiagnostics {
verdict: "Efficient".into(),
warnings: vec![],
recommended_profile: "balanced".into(),
strategy_rationale: String::new(),
},
);
assert_eq!(
artifact.resolved_plan.source.password, None,
"plaintext password must never leave the process"
);
let json = artifact.to_json_pretty().unwrap();
assert!(
!json.contains("s3cret"),
"secret must not appear anywhere in the JSON"
);
}
#[test]
fn artifact_strips_credentials_from_url() {
let mut plan = minimal_plan();
plan.source.url = Some("postgresql://rivet:s3cret@db.example.com/prod".into());
let artifact = PlanArtifact::new(
"orders".into(),
"full".into(),
String::new(),
plan,
ComputedPlanData {
chunk_ranges: vec![],
chunk_count: 0,
cursor_snapshot: None,
row_estimate: None,
},
PlanDiagnostics {
verdict: "Efficient".into(),
warnings: vec![],
recommended_profile: "balanced".into(),
strategy_rationale: String::new(),
},
);
let url = artifact.resolved_plan.source.url.as_deref().unwrap();
assert!(
!url.contains("s3cret"),
"password must not remain in URL: {url}"
);
assert!(url.contains("REDACTED@db.example.com/prod"));
}
#[test]
fn integrity_seal_accepts_untouched_artifact() {
let artifact = minimal_artifact();
assert!(
!artifact.integrity.is_empty(),
"new artifact must be sealed"
);
artifact
.verify_integrity()
.expect("untouched artifact must pass integrity check");
let json = artifact.to_json_pretty().unwrap();
let restored = PlanArtifact::from_json(&json).unwrap();
restored
.verify_integrity()
.expect("round-tripped artifact must still verify");
}
#[test]
fn integrity_seal_rejects_tampered_base_query() {
let mut artifact = minimal_artifact();
artifact.resolved_plan.base_query = "SELECT * FROM users".into();
let err = artifact.verify_integrity().unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("integrity check failed") && msg.contains("modified after planning"),
"tamper must be reported with the PA10 message, got: {msg}"
);
}
#[test]
fn integrity_seal_rejects_tamper_in_any_execution_field() {
let mut artifact = minimal_artifact();
artifact.resolved_plan.destination.path = Some("/etc/evil".into());
assert!(
artifact.verify_integrity().is_err(),
"editing the destination path must be detected as tampering"
);
}
#[test]
fn integrity_seal_legacy_empty_is_accepted_with_warning() {
let mut artifact = minimal_artifact();
artifact.integrity = String::new();
artifact
.verify_integrity()
.expect("unsealed (legacy) artifact must be accepted, not failed closed");
}
#[test]
fn integrity_checksum_is_not_tamper_protection() {
let mut artifact = minimal_artifact();
artifact.resolved_plan.base_query = "SELECT * FROM users".into();
artifact.integrity = resolved_plan_integrity(&artifact.resolved_plan);
artifact.verify_integrity().expect(
"an unkeyed checksum cannot detect a deliberate edit + recompute — \
this is accidental-corruption detection, not tamper protection",
);
}
#[test]
fn integrity_seal_distinguishes_a_real_override_change() {
let mut plan_a = minimal_plan();
plan_a.column_overrides = [("alpha".to_string(), crate::types::RivetType::String)]
.into_iter()
.collect();
let mut plan_b = minimal_plan();
plan_b.column_overrides = [("alpha".to_string(), crate::types::RivetType::Int64)]
.into_iter()
.collect();
assert_ne!(
resolved_plan_integrity(&plan_a),
resolved_plan_integrity(&plan_b),
"a real override type change must change the seal"
);
}
#[test]
fn canonicalize_makes_seal_independent_of_object_key_order() {
let mut a = serde_json::json!({});
let mut b = serde_json::json!({});
if let (serde_json::Value::Object(ma), serde_json::Value::Object(mb)) = (&mut a, &mut b) {
for k in ["zeta", "alpha", "mu"] {
ma.insert(k.to_string(), serde_json::json!(k));
}
for k in ["mu", "zeta", "alpha"] {
mb.insert(k.to_string(), serde_json::json!(k));
}
}
canonicalize_value(&mut a);
canonicalize_value(&mut b);
assert_eq!(
serde_json::to_vec(&a).unwrap(),
serde_json::to_vec(&b).unwrap(),
"canonicalized bytes must not depend on object key insertion order"
);
}
#[test]
fn canonicalize_value_sorts_nested_keys_preserves_arrays() {
let mut v = serde_json::json!({
"b": 1,
"a": { "y": [3, 2, 1], "x": 0 },
});
canonicalize_value(&mut v);
let s = serde_json::to_string(&v).unwrap();
assert_eq!(s, r#"{"a":{"x":0,"y":[3,2,1]},"b":1}"#);
}
}