use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use greentic_deploy_spec::{
BundleDeployment, RevenuePolicyDocument, RevenueShareEntry, SchemaVersion,
};
use greentic_distributor_client::signing::{
INTOTO_STATEMENT_TYPE, InTotoStatement, SigningError, Subject, sign_statement,
verify_artifact_dsse,
};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use thiserror::Error;
use super::atomic_write::AtomicWriteError;
use super::trust_root::{self, TrustRootError};
use crate::operator_key::{OperatorKey, OperatorKeyError};
const BILLING_DIR: &str = "billing-policies";
pub const REVENUE_POLICY_PREDICATE_TYPE_V1: &str = "greentic.revenue-policy-predicate.v1";
#[derive(Debug, Error)]
pub enum BundleDeploymentError {
#[error("revenue-policy spec invalid: {0}")]
Spec(#[from] greentic_deploy_spec::SpecError),
#[error("revenue-policy write {path}: {source}")]
Write {
path: PathBuf,
#[source]
source: AtomicWriteError,
},
#[error(
"unsafe path segment `{0}`: must be a single component, not `.`/`..`, and contain no path separators or NUL"
)]
UnsafeSegment(String),
#[error("revenue-policy version counter exhausted (committed ref already at the maximum)")]
VersionOverflow,
#[error("revenue-policy io on {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("revenue-policy signing: {0}")]
Sign(#[from] SigningError),
#[error("revenue-policy operator key: {0}")]
OperatorKey(#[from] OperatorKeyError),
#[error("revenue-policy trust-root: {0}")]
TrustRoot(#[from] TrustRootError),
#[error("revenue-policy serialize: {0}")]
Serialize(serde_json::Error),
#[error(
"operator key `{key_id}` is not trusted in env `{env_dir}` (not present in `trust-root.json`); run `gtc op trust-root bootstrap <env-id>` first, or restore the key via `gtc op trust-root add`"
)]
OperatorKeyNotTrusted { key_id: String, env_dir: PathBuf },
#[error(
"revenue-policy document `{path}` was corrupted after write: expected SHA-256 `{expected}`, on-disk SHA-256 `{actual}`"
)]
DocCorruptedAfterWrite {
path: PathBuf,
expected: String,
actual: String,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RevenuePolicyPredicate {
pub schema: String,
pub deployment_id: greentic_deploy_spec::DeploymentId,
pub env_id: greentic_deploy_spec::EnvId,
pub bundle_id: greentic_deploy_spec::BundleId,
pub customer_id: greentic_deploy_spec::CustomerId,
pub version: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub previous_version_ref: Option<String>,
pub signed_at: DateTime<Utc>,
}
#[derive(Clone, Debug)]
pub struct RevenuePolicyVersion {
pub policy_ref: PathBuf,
pub version: u64,
pub doc_sha256: String,
pub key_id: String,
}
fn path_to_forward_slash(p: &Path) -> String {
p.to_string_lossy().replace('\\', "/")
}
pub fn write_revenue_policy_version(
env_dir: &Path,
deployment: &BundleDeployment,
revenue_share: &[RevenueShareEntry],
created_at: DateTime<Utc>,
operator_key: &OperatorKey,
) -> Result<RevenuePolicyVersion, BundleDeploymentError> {
let trust_root = trust_root::load(env_dir)?;
let trusted = trust_root
.keys
.iter()
.any(|k| k.key_id.eq_ignore_ascii_case(&operator_key.key_id));
if !trusted {
return Err(BundleDeploymentError::OperatorKeyNotTrusted {
key_id: operator_key.key_id.clone(),
env_dir: env_dir.to_path_buf(),
});
}
let bundle_seg = safe_segment(deployment.bundle_id.as_str())?;
let customer_seg = safe_segment(deployment.customer_id.as_str())?;
let rel_dir = Path::new(BILLING_DIR).join(bundle_seg).join(customer_seg);
let abs_dir = env_dir.join(&rel_dir);
let version = next_version_from_ref(&deployment.revenue_policy_ref)?;
let previous_version_ref_path = (version > 1).then(|| rel_dir.join(sidecar_name(version - 1)));
let doc = RevenuePolicyDocument {
schema: SchemaVersion::new(SchemaVersion::REVENUE_POLICY_V1),
version,
deployment_id: deployment.deployment_id,
env_id: deployment.env_id.clone(),
bundle_id: deployment.bundle_id.clone(),
customer_id: deployment.customer_id.clone(),
revenue_share: revenue_share.to_vec(),
created_at,
previous_version_ref: previous_version_ref_path.clone(),
};
doc.validate()?;
let doc_bytes = serde_json::to_vec_pretty(&doc).map_err(BundleDeploymentError::Serialize)?;
let doc_sha256_hex = sha256_hex(&doc_bytes);
let predicate = RevenuePolicyPredicate {
schema: REVENUE_POLICY_PREDICATE_TYPE_V1.to_string(),
deployment_id: deployment.deployment_id,
env_id: deployment.env_id.clone(),
bundle_id: deployment.bundle_id.clone(),
customer_id: deployment.customer_id.clone(),
version,
previous_version_ref: previous_version_ref_path
.as_deref()
.map(path_to_forward_slash),
signed_at: created_at,
};
let predicate_value =
serde_json::to_value(&predicate).map_err(BundleDeploymentError::Serialize)?;
let mut digest = BTreeMap::new();
digest.insert("sha256".to_string(), doc_sha256_hex.clone());
let statement = InTotoStatement {
type_: INTOTO_STATEMENT_TYPE.to_string(),
subject: vec![Subject {
name: document_name(version),
digest,
}],
predicate_type: REVENUE_POLICY_PREDICATE_TYPE_V1.to_string(),
predicate: predicate_value,
};
let envelope = sign_statement(&statement, &operator_key.private_pem, &operator_key.key_id)?;
let envelope_bytes =
serde_json::to_vec_pretty(&envelope).map_err(BundleDeploymentError::Serialize)?;
std::fs::create_dir_all(&abs_dir).map_err(|source| BundleDeploymentError::Io {
path: abs_dir.clone(),
source,
})?;
let doc_rel = rel_dir.join(document_name(version));
let sig_rel = rel_dir.join(sidecar_name(version));
let doc_abs = env_dir.join(&doc_rel);
let sig_abs = env_dir.join(&sig_rel);
super::atomic_write::atomic_write_bytes(&doc_abs, &doc_bytes).map_err(|source| {
BundleDeploymentError::Write {
path: doc_abs.clone(),
source,
}
})?;
super::atomic_write::atomic_write_bytes(&sig_abs, &envelope_bytes).map_err(|source| {
BundleDeploymentError::Write {
path: sig_abs.clone(),
source,
}
})?;
let on_disk_doc = std::fs::read(&doc_abs).map_err(|source| BundleDeploymentError::Io {
path: doc_abs.clone(),
source,
})?;
let on_disk_doc_sha256 = sha256_hex(&on_disk_doc);
if on_disk_doc_sha256 != doc_sha256_hex {
return Err(BundleDeploymentError::DocCorruptedAfterWrite {
path: doc_abs.clone(),
expected: doc_sha256_hex.clone(),
actual: on_disk_doc_sha256,
});
}
let written = std::fs::read(&sig_abs).map_err(|source| BundleDeploymentError::Io {
path: sig_abs.clone(),
source,
})?;
verify_artifact_dsse(&written, &on_disk_doc_sha256, &trust_root)?;
Ok(RevenuePolicyVersion {
policy_ref: sig_rel,
version,
doc_sha256: doc_sha256_hex,
key_id: operator_key.key_id.clone(),
})
}
fn sha256_hex(bytes: &[u8]) -> String {
hex::encode(Sha256::digest(bytes))
}
fn document_name(version: u64) -> String {
format!("v{version}.json")
}
fn sidecar_name(version: u64) -> String {
format!("v{version}.json.sig")
}
fn safe_segment(seg: &str) -> Result<&str, BundleDeploymentError> {
if seg.is_empty()
|| seg == "."
|| seg == ".."
|| seg.contains('/')
|| seg.contains('\\')
|| seg.contains('\0')
{
return Err(BundleDeploymentError::UnsafeSegment(seg.to_string()));
}
Ok(seg)
}
fn next_version_from_ref(current_ref: &Path) -> Result<u64, BundleDeploymentError> {
match parse_sidecar_version(current_ref) {
Some(n) => n
.checked_add(1)
.ok_or(BundleDeploymentError::VersionOverflow),
None => Ok(1),
}
}
fn parse_sidecar_version(ref_path: &Path) -> Option<u64> {
let n = ref_path
.file_name()?
.to_str()?
.strip_prefix('v')?
.strip_suffix(".json.sig")?
.parse::<u64>()
.ok()?;
(n >= 1).then_some(n)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::operator_key::{OperatorKey, load_or_generate_at};
use greentic_deploy_spec::{
BundleDeploymentStatus, BundleId, CustomerId, DeploymentId, EnvId, PartyId, RouteBinding,
TenantSelector,
};
use greentic_distributor_client::signing::{DsseEnvelope, TrustedKey, verify_artifact_dsse};
use tempfile::{TempDir, tempdir};
fn test_operator_key(workdir: &TempDir) -> OperatorKey {
let key = test_operator_key_without_bootstrap(workdir);
trust_root::add_trusted_key(
workdir.path(),
TrustedKey {
key_id: key.key_id.clone(),
public_key_pem: key.public_pem.clone(),
},
)
.expect("seed operator key into env trust root");
key
}
fn test_operator_key_without_bootstrap(workdir: &TempDir) -> OperatorKey {
load_or_generate_at(&workdir.path().join("operator-key.pem")).expect("generate key")
}
fn deployment(bundle: &str, customer: &str) -> BundleDeployment {
BundleDeployment {
schema: SchemaVersion::new(SchemaVersion::BUNDLE_DEPLOYMENT_V1),
deployment_id: DeploymentId::new(),
env_id: EnvId::try_from("local").unwrap(),
bundle_id: BundleId::new(bundle),
customer_id: CustomerId::new(customer),
status: BundleDeploymentStatus::Active,
current_revisions: Vec::new(),
route_binding: RouteBinding {
hosts: Vec::new(),
path_prefixes: Vec::new(),
tenant_selector: TenantSelector {
tenant: "default".to_string(),
team: "default".to_string(),
},
},
revenue_share: shares(&[("greentic", 10_000)]),
revenue_policy_ref: PathBuf::from("placeholder"),
usage: None,
created_at: Utc::now(),
authorization_ref: PathBuf::from("auth.json"),
}
}
fn shares(parts: &[(&str, u32)]) -> Vec<RevenueShareEntry> {
parts
.iter()
.map(|(p, bps)| RevenueShareEntry {
party_id: PartyId::new(*p),
basis_points: *bps,
})
.collect()
}
#[test]
fn first_write_is_v1_with_files_and_no_previous() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let dep = deployment("fast2flow", "local-dev");
let v = write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
assert_eq!(v.version, 1);
assert_eq!(
v.policy_ref,
PathBuf::from("billing-policies/fast2flow/local-dev/v1.json.sig")
);
assert!(dir.path().join(&v.policy_ref).is_file());
assert!(
dir.path()
.join("billing-policies/fast2flow/local-dev/v1.json")
.is_file()
);
let doc: RevenuePolicyDocument = serde_json::from_slice(
&std::fs::read(
dir.path()
.join("billing-policies/fast2flow/local-dev/v1.json"),
)
.unwrap(),
)
.unwrap();
assert!(doc.previous_version_ref.is_none());
assert!(doc.validate().is_ok());
}
#[test]
fn second_write_increments_and_chains() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let mut dep = deployment("fast2flow", "cust-acme");
let v1 =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
dep.revenue_policy_ref = v1.policy_ref;
let v2 = write_revenue_policy_version(
dir.path(),
&dep,
&shares(&[("agency-a", 3_000), ("greentic", 7_000)]),
Utc::now(),
&op,
)
.unwrap();
assert_eq!(v2.version, 2);
let doc: RevenuePolicyDocument = serde_json::from_slice(
&std::fs::read(
dir.path()
.join("billing-policies/fast2flow/cust-acme/v2.json"),
)
.unwrap(),
)
.unwrap();
assert_eq!(
doc.previous_version_ref,
Some(PathBuf::from(
"billing-policies/fast2flow/cust-acme/v1.json.sig"
))
);
}
#[test]
fn retry_after_uncommitted_write_reuses_same_version() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let dep = deployment("fast2flow", "local-dev"); let a = write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
assert_eq!(a.version, 1);
let b = write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
assert_eq!(b.version, 1, "retry must not advance past the orphan");
assert!(
!dir.path()
.join("billing-policies/fast2flow/local-dev/v2.json")
.exists()
);
}
#[test]
fn retry_on_update_path_does_not_dangle_chain() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let mut dep = deployment("fast2flow", "cust-acme");
let v1 =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
dep.revenue_policy_ref = v1.policy_ref; write_revenue_policy_version(
dir.path(),
&dep,
&shares(&[("greentic", 10_000)]),
Utc::now(),
&op,
)
.unwrap();
let v2 = write_revenue_policy_version(
dir.path(),
&dep,
&shares(&[("greentic", 10_000)]),
Utc::now(),
&op,
)
.unwrap();
assert_eq!(v2.version, 2);
let doc: RevenuePolicyDocument = serde_json::from_slice(
&std::fs::read(
dir.path()
.join("billing-policies/fast2flow/cust-acme/v2.json"),
)
.unwrap(),
)
.unwrap();
let prev = doc.previous_version_ref.expect("v2 chains to v1");
assert!(
dir.path().join(&prev).is_file(),
"previous_version_ref must point at a real (committed) sidecar"
);
assert!(
!dir.path()
.join("billing-policies/fast2flow/cust-acme/v3.json")
.exists()
);
}
#[test]
fn sidecar_is_dsse_envelope_that_verifies_against_doc_sha256() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let dep = deployment("fast2flow", "local-dev");
let v = write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
assert_eq!(v.key_id, op.key_id);
let doc_path = dir
.path()
.join("billing-policies/fast2flow/local-dev/v1.json");
let doc_bytes = std::fs::read(&doc_path).unwrap();
let doc_sha256 = sha256_hex(&doc_bytes);
let _doc: RevenuePolicyDocument = serde_json::from_slice(&doc_bytes).unwrap();
let envelope_bytes = std::fs::read(dir.path().join(&v.policy_ref)).unwrap();
let parsed: DsseEnvelope = serde_json::from_slice(&envelope_bytes).unwrap();
assert_eq!(parsed.payload_type, "application/vnd.in-toto+json");
assert_eq!(parsed.signatures.len(), 1);
assert_eq!(parsed.signatures[0].keyid, op.key_id);
let trust = super::super::trust_root::load(dir.path()).unwrap();
assert!(!trust.is_empty(), "fixture must bootstrap trust-root.json");
verify_artifact_dsse(&envelope_bytes, &doc_sha256, &trust)
.expect("envelope must verify against the env trust root");
}
#[test]
fn previous_version_ref_in_predicate_uses_forward_slashes() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let mut dep = deployment("fast2flow", "cust-acme");
let v1 =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
dep.revenue_policy_ref = v1.policy_ref;
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
let envelope_bytes = std::fs::read(
dir.path()
.join("billing-policies/fast2flow/cust-acme/v2.json.sig"),
)
.unwrap();
let env: DsseEnvelope = serde_json::from_slice(&envelope_bytes).unwrap();
use base64::Engine;
let payload = base64::engine::general_purpose::STANDARD
.decode(&env.payload)
.unwrap();
let stmt: serde_json::Value = serde_json::from_slice(&payload).unwrap();
let prev = stmt["predicate"]["previous_version_ref"].as_str().unwrap();
assert!(
!prev.contains('\\'),
"predicate previous_version_ref must not contain backslashes; got {prev:?}"
);
assert!(
prev.starts_with("billing-policies/fast2flow/cust-acme/"),
"expected forward-slash-normalized path; got {prev:?}"
);
}
#[test]
fn writer_does_not_mutate_trust_root() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let pre = super::super::trust_root::load(dir.path()).unwrap();
let mut dep = deployment("fast2flow", "local-dev");
let v1 =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
dep.revenue_policy_ref = v1.policy_ref;
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
let post = super::super::trust_root::load(dir.path()).unwrap();
assert_eq!(pre.keys, post.keys, "writer must not mutate trust root");
}
#[test]
fn writer_refuses_when_operator_key_not_in_trust_root() {
let dir = tempdir().unwrap();
let op = test_operator_key_without_bootstrap(&dir);
let dep = deployment("fast2flow", "local-dev");
let err =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.expect_err("unbootstrapped op must be rejected");
match err {
BundleDeploymentError::OperatorKeyNotTrusted { key_id, .. } => {
assert_eq!(key_id, op.key_id);
}
other => panic!("expected OperatorKeyNotTrusted, got {other:?}"),
}
assert!(
!dir.path().join("billing-policies").exists(),
"no policy artifact must land on a failed precondition"
);
}
#[test]
fn writer_refuses_after_explicit_trust_root_remove() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let dep = deployment("fast2flow", "local-dev");
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
super::super::trust_root::remove_trusted_key(dir.path(), &op.key_id).unwrap();
let mut dep2 = dep.clone();
dep2.bundle_id = BundleId::new("post-revocation");
let err =
write_revenue_policy_version(dir.path(), &dep2, &dep2.revenue_share, Utc::now(), &op)
.expect_err("revoked op must stay revoked");
assert!(matches!(
err,
BundleDeploymentError::OperatorKeyNotTrusted { .. }
));
let trust = super::super::trust_root::load(dir.path()).unwrap();
assert!(
trust.keys.is_empty(),
"revocation must be durable; got {:?}",
trust.keys
);
}
#[test]
fn unsafe_bundle_segment_rejected() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let dep = deployment("../escape", "local-dev");
let err =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap_err();
assert!(matches!(err, BundleDeploymentError::UnsafeSegment(_)));
}
#[test]
fn unsafe_customer_segment_rejected() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let dep = deployment("fast2flow", "a/b");
let err =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap_err();
assert!(matches!(err, BundleDeploymentError::UnsafeSegment(_)));
}
#[test]
fn invalid_revenue_share_rejected_before_write() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let dep = deployment("fast2flow", "local-dev");
let err = write_revenue_policy_version(
dir.path(),
&dep,
&shares(&[("greentic", 5_000)]),
Utc::now(),
&op,
)
.unwrap_err();
assert!(matches!(err, BundleDeploymentError::Spec(_)));
assert!(!dir.path().join("billing-policies").exists());
}
#[test]
fn parse_sidecar_version_rejects_zero_and_garbage() {
assert_eq!(parse_sidecar_version(Path::new("a/b/v1.json.sig")), Some(1));
assert_eq!(
parse_sidecar_version(Path::new("a/b/v42.json.sig")),
Some(42)
);
assert_eq!(parse_sidecar_version(Path::new("a/b/v0.json.sig")), None);
assert_eq!(parse_sidecar_version(Path::new("revenue.json")), None);
assert_eq!(parse_sidecar_version(Path::new("v1.json")), None);
assert_eq!(parse_sidecar_version(Path::new("")), None);
}
#[test]
fn corrupted_v0_ref_starts_fresh_at_v1_without_chain() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let mut dep = deployment("fast2flow", "local-dev");
dep.revenue_policy_ref = PathBuf::from("billing-policies/fast2flow/local-dev/v0.json.sig");
let v = write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
assert_eq!(v.version, 1, "v0 ref must not chain; restart at v1");
let doc: RevenuePolicyDocument = serde_json::from_slice(
&std::fs::read(
dir.path()
.join("billing-policies/fast2flow/local-dev/v1.json"),
)
.unwrap(),
)
.unwrap();
assert!(doc.previous_version_ref.is_none());
}
#[test]
fn version_counter_overflow_is_an_error_not_a_panic() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let mut dep = deployment("fast2flow", "local-dev");
dep.revenue_policy_ref = PathBuf::from(format!(
"billing-policies/fast2flow/local-dev/v{}.json.sig",
u64::MAX
));
let err =
write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap_err();
assert!(matches!(err, BundleDeploymentError::VersionOverflow));
}
#[test]
fn previous_version_ref_is_reconstructed_not_copied_verbatim() {
let dir = tempdir().unwrap();
let op = test_operator_key(&dir);
let mut dep = deployment("fast2flow", "cust-acme");
dep.revenue_policy_ref =
PathBuf::from("../../other-env/billing-policies/victim/cust/v3.json.sig");
let v = write_revenue_policy_version(dir.path(), &dep, &dep.revenue_share, Utc::now(), &op)
.unwrap();
assert_eq!(v.version, 4); let doc: RevenuePolicyDocument = serde_json::from_slice(
&std::fs::read(
dir.path()
.join("billing-policies/fast2flow/cust-acme/v4.json"),
)
.unwrap(),
)
.unwrap();
assert_eq!(
doc.previous_version_ref,
Some(PathBuf::from(
"billing-policies/fast2flow/cust-acme/v3.json.sig"
)),
"previous_version_ref must be rebuilt under this deployment's dir, not the crafted ref"
);
}
}