use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use greentic_deploy_spec::{
BundleDeployment, RevenuePolicyDocument, RevenueShareEntry, SchemaVersion, StateIntegrity,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use super::atomic_write::{AtomicWriteError, atomic_write_json};
const BILLING_DIR: &str = "billing-policies";
pub const REVENUE_POLICY_SIGNATURE_V1: &str = "greentic.revenue-policy-signature.v1";
#[derive(Debug, Error)]
pub enum BundleDeploymentError {
#[error("revenue-policy spec invalid: {0}")]
Spec(#[from] greentic_deploy_spec::SpecError),
#[error("revenue-policy integrity: {0}")]
Integrity(#[from] greentic_deploy_spec::IntegrityError),
#[error("revenue-policy write {path}: {source}")]
Write {
path: PathBuf,
#[source]
source: AtomicWriteError,
},
#[error(
"unsafe path segment `{0}`: must be a single component, not `.`/`..`, and contain no path separators or NUL"
)]
UnsafeSegment(String),
#[error("revenue-policy version counter exhausted (committed ref already at the maximum)")]
VersionOverflow,
#[error("revenue-policy io on {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RevenuePolicySignature {
pub schema: String,
pub integrity: StateIntegrity,
pub signed_at: DateTime<Utc>,
}
#[derive(Clone, Debug)]
pub struct RevenuePolicyVersion {
pub policy_ref: PathBuf,
pub version: u64,
pub integrity: StateIntegrity,
}
pub fn write_revenue_policy_version(
env_dir: &Path,
deployment: &BundleDeployment,
revenue_share: &[RevenueShareEntry],
created_at: DateTime<Utc>,
) -> Result<RevenuePolicyVersion, BundleDeploymentError> {
let bundle_seg = safe_segment(deployment.bundle_id.as_str())?;
let customer_seg = safe_segment(deployment.customer_id.as_str())?;
let rel_dir = Path::new(BILLING_DIR).join(bundle_seg).join(customer_seg);
let abs_dir = env_dir.join(&rel_dir);
let version = next_version_from_ref(&deployment.revenue_policy_ref)?;
let previous_version_ref = (version > 1).then(|| rel_dir.join(sidecar_name(version - 1)));
let doc = RevenuePolicyDocument {
schema: SchemaVersion::new(SchemaVersion::REVENUE_POLICY_V1),
version,
deployment_id: deployment.deployment_id,
env_id: deployment.env_id.clone(),
bundle_id: deployment.bundle_id.clone(),
customer_id: deployment.customer_id.clone(),
revenue_share: revenue_share.to_vec(),
created_at,
previous_version_ref,
};
doc.validate()?;
let integrity = StateIntegrity::sha256_of(&doc)?;
let sidecar = RevenuePolicySignature {
schema: REVENUE_POLICY_SIGNATURE_V1.to_string(),
integrity: integrity.clone(),
signed_at: created_at,
};
std::fs::create_dir_all(&abs_dir).map_err(|source| BundleDeploymentError::Io {
path: abs_dir.clone(),
source,
})?;
let doc_rel = rel_dir.join(document_name(version));
let sig_rel = rel_dir.join(sidecar_name(version));
write_json(&env_dir.join(&doc_rel), &doc)?;
write_json(&env_dir.join(&sig_rel), &sidecar)?;
Ok(RevenuePolicyVersion {
policy_ref: sig_rel,
version,
integrity,
})
}
fn document_name(version: u64) -> String {
format!("v{version}.json")
}
fn sidecar_name(version: u64) -> String {
format!("v{version}.json.sig")
}
fn write_json<T: Serialize>(path: &Path, value: &T) -> Result<(), BundleDeploymentError> {
atomic_write_json(path, value).map_err(|source| BundleDeploymentError::Write {
path: path.to_path_buf(),
source,
})
}
fn safe_segment(seg: &str) -> Result<&str, BundleDeploymentError> {
if seg.is_empty()
|| seg == "."
|| seg == ".."
|| seg.contains('/')
|| seg.contains('\\')
|| seg.contains('\0')
{
return Err(BundleDeploymentError::UnsafeSegment(seg.to_string()));
}
Ok(seg)
}
fn next_version_from_ref(current_ref: &Path) -> Result<u64, BundleDeploymentError> {
match parse_sidecar_version(current_ref) {
Some(n) => n
.checked_add(1)
.ok_or(BundleDeploymentError::VersionOverflow),
None => Ok(1),
}
}
fn parse_sidecar_version(ref_path: &Path) -> Option<u64> {
let n = ref_path
.file_name()?
.to_str()?
.strip_prefix('v')?
.strip_suffix(".json.sig")?
.parse::<u64>()
.ok()?;
(n >= 1).then_some(n)
}
#[cfg(test)]
mod tests {
use super::*;
use greentic_deploy_spec::{
BundleDeploymentStatus, BundleId, CustomerId, DeploymentId, EnvId, PartyId, RouteBinding,
TenantSelector,
};
use tempfile::tempdir;
fn deployment(bundle: &str, customer: &str) -> BundleDeployment {
BundleDeployment {
schema: SchemaVersion::new(SchemaVersion::BUNDLE_DEPLOYMENT_V1),
deployment_id: DeploymentId::new(),
env_id: EnvId::try_from("local").unwrap(),
bundle_id: BundleId::new(bundle),
customer_id: CustomerId::new(customer),
status: BundleDeploymentStatus::Active,
current_revisions: Vec::new(),
route_binding: RouteBinding {
hosts: Vec::new(),
path_prefixes: Vec::new(),
tenant_selector: TenantSelector {
tenant: "default".to_string(),
team: "default".to_string(),
},
},
revenue_share: shares(&[("greentic", 10_000)]),
revenue_policy_ref: PathBuf::from("placeholder"),
usage: None,
created_at: Utc::now(),
authorization_ref: PathBuf::from("auth.json"),
}
}
fn shares(parts: &[(&str, u32)]) -> Vec<RevenueShareEntry> {
parts
.iter()
.map(|(p, bps)| RevenueShareEntry {
party_id: PartyId::new(*p),
basis_points: *bps,
})
.collect()
}
#[test]
fn first_write_is_v1_with_files_and_no_previous() {
let dir = tempdir().unwrap();
let dep = deployment("fast2flow", "local-dev");
let v =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now()).unwrap();
assert_eq!(v.version, 1);
assert_eq!(
v.policy_ref,
PathBuf::from("billing-policies/fast2flow/local-dev/v1.json.sig")
);
assert!(dir.path().join(&v.policy_ref).is_file());
assert!(
dir.path()
.join("billing-policies/fast2flow/local-dev/v1.json")
.is_file()
);
let doc: RevenuePolicyDocument = serde_json::from_slice(
&std::fs::read(
dir.path()
.join("billing-policies/fast2flow/local-dev/v1.json"),
)
.unwrap(),
)
.unwrap();
assert!(doc.previous_version_ref.is_none());
assert!(doc.validate().is_ok());
}
#[test]
fn second_write_increments_and_chains() {
let dir = tempdir().unwrap();
let mut dep = deployment("fast2flow", "cust-acme");
let v1 =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now()).unwrap();
dep.revenue_policy_ref = v1.policy_ref;
let v2 = write_revenue_policy_version(
dir.path(),
&dep,
&shares(&[("agency-a", 3_000), ("greentic", 7_000)]),
Utc::now(),
)
.unwrap();
assert_eq!(v2.version, 2);
let doc: RevenuePolicyDocument = serde_json::from_slice(
&std::fs::read(
dir.path()
.join("billing-policies/fast2flow/cust-acme/v2.json"),
)
.unwrap(),
)
.unwrap();
assert_eq!(
doc.previous_version_ref,
Some(PathBuf::from(
"billing-policies/fast2flow/cust-acme/v1.json.sig"
))
);
}
#[test]
fn retry_after_uncommitted_write_reuses_same_version() {
let dir = tempdir().unwrap();
let dep = deployment("fast2flow", "local-dev"); let a =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now()).unwrap();
assert_eq!(a.version, 1);
let b =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now()).unwrap();
assert_eq!(b.version, 1, "retry must not advance past the orphan");
assert!(
!dir.path()
.join("billing-policies/fast2flow/local-dev/v2.json")
.exists()
);
}
#[test]
fn retry_on_update_path_does_not_dangle_chain() {
let dir = tempdir().unwrap();
let mut dep = deployment("fast2flow", "cust-acme");
let v1 =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now()).unwrap();
dep.revenue_policy_ref = v1.policy_ref; write_revenue_policy_version(
dir.path(),
&dep,
&shares(&[("greentic", 10_000)]),
Utc::now(),
)
.unwrap();
let v2 = write_revenue_policy_version(
dir.path(),
&dep,
&shares(&[("greentic", 10_000)]),
Utc::now(),
)
.unwrap();
assert_eq!(v2.version, 2);
let doc: RevenuePolicyDocument = serde_json::from_slice(
&std::fs::read(
dir.path()
.join("billing-policies/fast2flow/cust-acme/v2.json"),
)
.unwrap(),
)
.unwrap();
let prev = doc.previous_version_ref.expect("v2 chains to v1");
assert!(
dir.path().join(&prev).is_file(),
"previous_version_ref must point at a real (committed) sidecar"
);
assert!(
!dir.path()
.join("billing-policies/fast2flow/cust-acme/v3.json")
.exists()
);
}
#[test]
fn sidecar_integrity_matches_document() {
let dir = tempdir().unwrap();
let dep = deployment("fast2flow", "local-dev");
let v =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now()).unwrap();
let doc: RevenuePolicyDocument = serde_json::from_slice(
&std::fs::read(
dir.path()
.join("billing-policies/fast2flow/local-dev/v1.json"),
)
.unwrap(),
)
.unwrap();
let sig: RevenuePolicySignature =
serde_json::from_slice(&std::fs::read(dir.path().join(&v.policy_ref)).unwrap())
.unwrap();
assert_eq!(sig.schema, REVENUE_POLICY_SIGNATURE_V1);
assert!(sig.integrity.verify(&doc).unwrap());
}
#[test]
fn unsafe_bundle_segment_rejected() {
let dir = tempdir().unwrap();
let dep = deployment("../escape", "local-dev");
let err = write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now())
.unwrap_err();
assert!(matches!(err, BundleDeploymentError::UnsafeSegment(_)));
}
#[test]
fn unsafe_customer_segment_rejected() {
let dir = tempdir().unwrap();
let dep = deployment("fast2flow", "a/b");
let err = write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now())
.unwrap_err();
assert!(matches!(err, BundleDeploymentError::UnsafeSegment(_)));
}
#[test]
fn invalid_revenue_share_rejected_before_write() {
let dir = tempdir().unwrap();
let dep = deployment("fast2flow", "local-dev");
let err = write_revenue_policy_version(
dir.path(),
&dep,
&shares(&[("greentic", 5_000)]),
Utc::now(),
)
.unwrap_err();
assert!(matches!(err, BundleDeploymentError::Spec(_)));
assert!(!dir.path().join("billing-policies").exists());
}
#[test]
fn parse_sidecar_version_rejects_zero_and_garbage() {
assert_eq!(parse_sidecar_version(Path::new("a/b/v1.json.sig")), Some(1));
assert_eq!(
parse_sidecar_version(Path::new("a/b/v42.json.sig")),
Some(42)
);
assert_eq!(parse_sidecar_version(Path::new("a/b/v0.json.sig")), None);
assert_eq!(parse_sidecar_version(Path::new("revenue.json")), None);
assert_eq!(parse_sidecar_version(Path::new("v1.json")), None);
assert_eq!(parse_sidecar_version(Path::new("")), None);
}
#[test]
fn corrupted_v0_ref_starts_fresh_at_v1_without_chain() {
let dir = tempdir().unwrap();
let mut dep = deployment("fast2flow", "local-dev");
dep.revenue_policy_ref = PathBuf::from("billing-policies/fast2flow/local-dev/v0.json.sig");
let v =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now()).unwrap();
assert_eq!(v.version, 1, "v0 ref must not chain; restart at v1");
let doc: RevenuePolicyDocument = serde_json::from_slice(
&std::fs::read(
dir.path()
.join("billing-policies/fast2flow/local-dev/v1.json"),
)
.unwrap(),
)
.unwrap();
assert!(doc.previous_version_ref.is_none());
}
#[test]
fn version_counter_overflow_is_an_error_not_a_panic() {
let dir = tempdir().unwrap();
let mut dep = deployment("fast2flow", "local-dev");
dep.revenue_policy_ref = PathBuf::from(format!(
"billing-policies/fast2flow/local-dev/v{}.json.sig",
u64::MAX
));
let err = write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now())
.unwrap_err();
assert!(matches!(err, BundleDeploymentError::VersionOverflow));
}
#[test]
fn previous_version_ref_is_reconstructed_not_copied_verbatim() {
let dir = tempdir().unwrap();
let mut dep = deployment("fast2flow", "cust-acme");
dep.revenue_policy_ref =
PathBuf::from("../../other-env/billing-policies/victim/cust/v3.json.sig");
let v =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now()).unwrap();
assert_eq!(v.version, 4); let doc: RevenuePolicyDocument = serde_json::from_slice(
&std::fs::read(
dir.path()
.join("billing-policies/fast2flow/cust-acme/v4.json"),
)
.unwrap(),
)
.unwrap();
assert_eq!(
doc.previous_version_ref,
Some(PathBuf::from(
"billing-policies/fast2flow/cust-acme/v3.json.sig"
)),
"previous_version_ref must be rebuilt under this deployment's dir, not the crafted ref"
);
}
}