use std::path::PathBuf;
use chrono::Utc;
use greentic_deploy_spec::{
BundleId, DeploymentId, EnvId, Environment, RevisionId, RevisionLifecycle, SchemaVersion,
TrafficSplit, TrafficSplitEntry,
};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use crate::environment::{EnvironmentStore, LocalFsStore};
use crate::rollout_telemetry::emit_traffic_split_applied;
use super::dispatch::{TrafficSetArgs, TrafficTargetArgs};
use super::{AuditCtx, OpError, OpFlags, OpOutcome, audit_and_record};
const NOUN: &str = "traffic";
const PREV_PREFIX: &str = "inline://";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrafficSetPayload {
pub environment_id: String,
pub deployment_id: String,
pub entries: Vec<TrafficSetEntryPayload>,
#[serde(default = "default_updated_by")]
pub updated_by: String,
pub idempotency_key: String,
#[serde(default = "default_authorization_ref")]
pub authorization_ref: PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrafficSetEntryPayload {
pub revision_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub weight_bps: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub weight_percent: Option<u32>,
}
pub(super) fn default_updated_by() -> String {
"operator".to_string()
}
pub(super) fn default_authorization_ref() -> PathBuf {
PathBuf::from("auth.json")
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrafficSummary {
pub environment_id: String,
pub deployment_id: String,
pub bundle_id: String,
pub generation: u64,
pub entries: Vec<TrafficSummaryEntry>,
pub has_previous: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrafficSummaryEntry {
pub revision_id: String,
pub weight_bps: u32,
}
impl TrafficSummary {
fn from(env_id: &EnvId, split: &TrafficSplit) -> Self {
Self {
environment_id: env_id.as_str().to_string(),
deployment_id: split.deployment_id.to_string(),
bundle_id: split.bundle_id.as_str().to_string(),
generation: split.generation,
entries: split
.entries
.iter()
.map(|e| TrafficSummaryEntry {
revision_id: e.revision_id.to_string(),
weight_bps: e.weight_bps,
})
.collect(),
has_previous: split.previous_split_ref.is_some(),
}
}
}
pub fn set(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<TrafficSetPayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "set", set_schema()));
}
let payload = resolve_payload::<TrafficSetPayload>(flags, payload)?;
let env_id = parse_env_id(&payload.environment_id)?;
let deployment_id = parse_deployment_id(&payload.deployment_id)?;
let parsed_entries = parse_entries(&payload.entries)?;
let ctx = AuditCtx {
env_id: env_id.clone(),
noun: NOUN,
verb: "set",
target: json!({"deployment_id": deployment_id.to_string()}),
idempotency_key: Some(payload.idempotency_key.clone()),
};
audit_and_record(store, ctx, |_committed| {
let (split, gens) = store.transact(&env_id, |locked| {
let mut env = locked.load()?;
let deployment = env
.bundles
.iter()
.find(|b| b.deployment_id == deployment_id)
.ok_or_else(|| {
OpError::NotFound(format!(
"deployment `{deployment_id}` not found in env `{env_id}`"
))
})?;
let bundle_id: BundleId = deployment.bundle_id.clone();
for entry in &parsed_entries {
let rev = env
.revisions
.iter()
.find(|r| r.revision_id == entry.revision_id)
.ok_or_else(|| {
OpError::NotFound(format!(
"revision `{}` not found in env `{env_id}`",
entry.revision_id
))
})?;
if rev.deployment_id != deployment_id {
return Err(OpError::InvalidArgument(format!(
"revision `{}` belongs to deployment `{}`, not `{}`",
entry.revision_id, rev.deployment_id, deployment_id,
)));
}
}
let prev_split_idx = env
.traffic_splits
.iter()
.position(|s| s.deployment_id == deployment_id);
if let Some(idx) = prev_split_idx {
let prev = &env.traffic_splits[idx];
if prev.idempotency_key == payload.idempotency_key {
if entries_match(&prev.entries, &parsed_entries) {
locked.refresh_runtime_config(&env)?;
return Ok((prev.clone(), super::AuditGens::NONE));
}
return Err(OpError::Conflict(format!(
"idempotency key `{}` already used for deployment `{}` with different entries",
payload.idempotency_key, deployment_id
)));
}
}
assert_entries_all_ready(&env, &parsed_entries, &env_id)?;
let (generation, previous_split_ref, prev_gen) = match prev_split_idx {
Some(idx) => {
let prev = &env.traffic_splits[idx];
let snapshot = serde_json::to_value(prev).map_err(|e| {
OpError::InvalidArgument(format!("snapshot prior split: {e}"))
})?;
(
prev.generation + 1,
Some(stash_inline(snapshot)),
Some(prev.generation),
)
}
None => (0, None, None),
};
let split = TrafficSplit {
schema: SchemaVersion::new(SchemaVersion::TRAFFIC_SPLIT_V1),
env_id: env_id.clone(),
deployment_id,
bundle_id,
generation,
entries: parsed_entries.clone(),
updated_at: Utc::now(),
updated_by: payload.updated_by.clone(),
idempotency_key: payload.idempotency_key.clone(),
authorization_ref: payload.authorization_ref.clone(),
previous_split_ref,
};
split.validate().map_err(OpError::Spec)?;
match prev_split_idx {
Some(idx) => env.traffic_splits[idx] = split.clone(),
None => env.traffic_splits.push(split.clone()),
}
locked.save(&env)?;
locked.refresh_runtime_config(&env)?;
emit_traffic_split_applied(
&env,
split.deployment_id,
&split.bundle_id,
split.generation,
);
let gens = super::AuditGens {
previous: prev_gen,
new: Some(generation),
};
Ok::<_, OpError>((split, gens))
})?;
let outcome = OpOutcome::new(
NOUN,
"set",
serde_json::to_value(TrafficSummary::from(&env_id, &split))
.expect("TrafficSummary is json-safe"),
);
Ok((outcome, gens))
})
}
pub fn payload_from_set_args(args: TrafficSetArgs) -> Result<Option<TrafficSetPayload>, OpError> {
let TrafficSetArgs {
env_id,
entries,
deployment,
idempotency_key,
updated_by,
authorization_ref,
} = args;
if env_id.is_none() && deployment.is_none() && entries.is_empty() {
return Ok(None);
}
let environment_id = env_id.ok_or_else(|| {
OpError::InvalidArgument("traffic set: missing positional `<env_id>`".to_string())
})?;
let deployment_id = deployment.ok_or_else(|| {
OpError::InvalidArgument("traffic set: missing `--deployment <ULID>`".to_string())
})?;
if entries.is_empty() {
return Err(OpError::InvalidArgument(
"traffic set: at least one `<revision_id>=<weight>` entry is required".to_string(),
));
}
let idempotency_key = idempotency_key.ok_or_else(|| {
OpError::InvalidArgument(
"traffic set: missing `--idempotency-key <KEY>`. Pass any stable string \
(ULID, UUID, ticket id) — re-running the same command with the same key \
is a no-op replay; a different key (or omitting it) destroys the one-step \
rollback target."
.to_string(),
)
})?;
let entries = entries
.iter()
.map(|raw| parse_entry_arg(raw))
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(TrafficSetPayload {
environment_id,
deployment_id,
entries,
updated_by: updated_by.unwrap_or_else(default_updated_by),
idempotency_key,
authorization_ref: authorization_ref.unwrap_or_else(default_authorization_ref),
}))
}
pub fn payload_from_target_args(
args: TrafficTargetArgs,
) -> Result<Option<TrafficShowPayload>, OpError> {
let TrafficTargetArgs { env_id, deployment } = args;
if env_id.is_none() && deployment.is_none() {
return Ok(None);
}
let environment_id = env_id.ok_or_else(|| {
OpError::InvalidArgument("traffic: missing positional `<env_id>`".to_string())
})?;
let deployment_id = deployment.ok_or_else(|| {
OpError::InvalidArgument("traffic: missing `--deployment <ULID>`".to_string())
})?;
Ok(Some(TrafficShowPayload {
environment_id,
deployment_id,
}))
}
fn parse_entry_arg(raw: &str) -> Result<TrafficSetEntryPayload, OpError> {
let (rid, weight) = raw.split_once('=').ok_or_else(|| {
OpError::InvalidArgument(format!(
"entry `{raw}` must be `<revision_id>=<percent>` or `<revision_id>=<N>bps`"
))
})?;
if rid.is_empty() {
return Err(OpError::InvalidArgument(format!(
"entry `{raw}` has an empty revision_id"
)));
}
let weight = weight.trim();
if weight.is_empty() {
return Err(OpError::InvalidArgument(format!(
"entry `{raw}` has an empty weight"
)));
}
let (weight_bps, weight_percent) = if let Some(rest) = weight.strip_suffix("bps") {
let bps: u32 = rest.trim().parse().map_err(|e| {
OpError::InvalidArgument(format!("entry `{raw}`: parse basis points: {e}"))
})?;
(Some(bps), None)
} else {
let pct_str = weight.strip_suffix('%').unwrap_or(weight).trim();
let pct: u32 = pct_str
.parse()
.map_err(|e| OpError::InvalidArgument(format!("entry `{raw}`: parse percent: {e}")))?;
(None, Some(pct))
};
Ok(TrafficSetEntryPayload {
revision_id: rid.to_string(),
weight_bps,
weight_percent,
})
}
fn parse_entries(entries: &[TrafficSetEntryPayload]) -> Result<Vec<TrafficSplitEntry>, OpError> {
let mut out = Vec::with_capacity(entries.len());
for entry in entries {
let bps = match (entry.weight_bps, entry.weight_percent) {
(Some(bps), _) => bps,
(None, Some(pct)) => {
if pct > 100 {
return Err(OpError::InvalidArgument(format!(
"weight_percent {pct} > 100"
)));
}
pct.saturating_mul(100)
}
(None, None) => {
return Err(OpError::InvalidArgument(
"each entry must set weight_bps or weight_percent".to_string(),
));
}
};
let revision_id = parse_revision_id(&entry.revision_id)?;
out.push(TrafficSplitEntry {
revision_id,
weight_bps: bps,
});
}
Ok(out)
}
fn assert_entries_all_ready(
env: &Environment,
entries: &[TrafficSplitEntry],
env_id: &EnvId,
) -> Result<(), OpError> {
for entry in entries {
let rev = env
.revisions
.iter()
.find(|r| r.revision_id == entry.revision_id)
.ok_or_else(|| {
OpError::Conflict(format!(
"revision `{}` not found in env `{env_id}`",
entry.revision_id
))
})?;
if rev.lifecycle != RevisionLifecycle::Ready {
return Err(OpError::Conflict(format!(
"revision `{}` is `{:?}`; only `Ready` revisions may receive traffic",
entry.revision_id, rev.lifecycle
)));
}
}
Ok(())
}
fn entries_match(a: &[TrafficSplitEntry], b: &[TrafficSplitEntry]) -> bool {
if a.len() != b.len() {
return false;
}
let mut a_sorted: Vec<(&RevisionId, u32)> =
a.iter().map(|e| (&e.revision_id, e.weight_bps)).collect();
let mut b_sorted: Vec<(&RevisionId, u32)> =
b.iter().map(|e| (&e.revision_id, e.weight_bps)).collect();
a_sorted.sort_by_key(|(r, _)| r.to_string());
b_sorted.sort_by_key(|(r, _)| r.to_string());
a_sorted == b_sorted
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrafficShowPayload {
pub environment_id: String,
pub deployment_id: String,
}
pub fn show(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<TrafficShowPayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "show", show_schema()));
}
let payload = resolve_payload::<TrafficShowPayload>(flags, payload)?;
let env_id = parse_env_id(&payload.environment_id)?;
let env = store.load(&env_id)?;
let deployment_id = parse_deployment_id(&payload.deployment_id)?;
let split = env
.traffic_splits
.iter()
.find(|s| s.deployment_id == deployment_id)
.ok_or_else(|| {
OpError::NotFound(format!(
"no traffic split for deployment `{deployment_id}` in env `{env_id}`"
))
})?;
Ok(OpOutcome::new(
NOUN,
"show",
serde_json::to_value(TrafficSummary::from(&env_id, split))
.expect("TrafficSummary is json-safe"),
))
}
pub fn rollback(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<TrafficShowPayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "rollback", show_schema()));
}
let payload = resolve_payload::<TrafficShowPayload>(flags, payload)?;
let env_id = parse_env_id(&payload.environment_id)?;
let deployment_id = parse_deployment_id(&payload.deployment_id)?;
let ctx = AuditCtx {
env_id: env_id.clone(),
noun: NOUN,
verb: "rollback",
target: json!({"deployment_id": deployment_id.to_string()}),
idempotency_key: None,
};
audit_and_record(store, ctx, |_committed| {
let (restored, gens) = store.transact(&env_id, |locked| {
let mut env = locked.load()?;
let idx = env
.traffic_splits
.iter()
.position(|s| s.deployment_id == deployment_id)
.ok_or_else(|| {
OpError::NotFound(format!(
"no traffic split for deployment `{deployment_id}` in env `{env_id}`"
))
})?;
let prev_split_generation = env.traffic_splits[idx].generation;
let prev_ref = env.traffic_splits[idx]
.previous_split_ref
.clone()
.ok_or_else(|| {
OpError::Conflict(format!(
"traffic split for `{deployment_id}` has no prior version to roll back to"
))
})?;
let prev_value = load_inline(&prev_ref).ok_or_else(|| {
OpError::NotFound(format!(
"previous split payload `{}` missing",
prev_ref.display()
))
})?;
let mut restored: TrafficSplit = serde_json::from_value(prev_value).map_err(|e| {
OpError::InvalidArgument(format!("deserialise previous split: {e}"))
})?;
restored.generation = prev_split_generation + 1;
restored.previous_split_ref = None;
restored.updated_at = Utc::now();
restored.idempotency_key =
format!("rollback-{}", env.traffic_splits[idx].idempotency_key);
restored.validate().map_err(OpError::Spec)?;
assert_entries_all_ready(&env, &restored.entries, &env_id)?;
env.traffic_splits[idx] = restored.clone();
locked.save(&env)?;
locked.refresh_runtime_config(&env)?;
emit_traffic_split_applied(
&env,
restored.deployment_id,
&restored.bundle_id,
restored.generation,
);
let gens = super::AuditGens {
previous: Some(prev_split_generation),
new: Some(prev_split_generation + 1),
};
Ok::<_, OpError>((restored, gens))
})?;
let outcome = OpOutcome::new(
NOUN,
"rollback",
serde_json::to_value(TrafficSummary::from(&env_id, &restored))
.expect("TrafficSummary is json-safe"),
);
Ok((outcome, gens))
})
}
fn resolve_payload<T: serde::de::DeserializeOwned>(
flags: &OpFlags,
payload: Option<T>,
) -> Result<T, OpError> {
if let Some(p) = payload {
return Ok(p);
}
if let Some(path) = &flags.answers {
return super::load_answers::<T>(path);
}
Err(OpError::InvalidArgument(
"no payload provided: pass --answers <path> or supply the payload directly".to_string(),
))
}
fn parse_env_id(raw: &str) -> Result<EnvId, OpError> {
EnvId::try_from(raw).map_err(|e| OpError::InvalidArgument(format!("environment_id: {e}")))
}
fn parse_deployment_id(raw: &str) -> Result<DeploymentId, OpError> {
use std::str::FromStr;
let ulid = ulid::Ulid::from_str(raw)
.map_err(|e| OpError::InvalidArgument(format!("deployment_id: {e}")))?;
Ok(DeploymentId(ulid))
}
fn parse_revision_id(raw: &str) -> Result<RevisionId, OpError> {
use std::str::FromStr;
let ulid = ulid::Ulid::from_str(raw)
.map_err(|e| OpError::InvalidArgument(format!("revision_id: {e}")))?;
Ok(RevisionId(ulid))
}
fn set_schema() -> Value {
json!({
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "TrafficSetPayload",
"type": "object",
"required": ["environment_id", "deployment_id", "entries", "idempotency_key"],
"additionalProperties": false,
"properties": {
"environment_id": {"type": "string"},
"deployment_id": {"type": "string", "description": "ULID"},
"entries": {
"type": "array",
"items": {
"type": "object",
"required": ["revision_id"],
"properties": {
"revision_id": {"type": "string", "description": "ULID"},
"weight_bps": {"type": "integer", "minimum": 0, "maximum": 10000},
"weight_percent": {"type": "integer", "minimum": 0, "maximum": 100}
}
}
},
"updated_by": {"type": "string", "default": "operator"},
"idempotency_key": {"type": "string"},
"authorization_ref": {"type": "string"}
}
})
}
fn show_schema() -> Value {
json!({
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "TrafficShowPayload",
"type": "object",
"required": ["environment_id", "deployment_id"],
"additionalProperties": false,
"properties": {
"environment_id": {"type": "string"},
"deployment_id": {"type": "string", "description": "ULID"}
}
})
}
fn stash_inline(snapshot: Value) -> PathBuf {
let mut encoded = String::from(PREV_PREFIX);
let raw = serde_json::to_string(&snapshot).expect("Value re-serialises");
encoded.push_str(&crate::cli::env_packs::base64_encode_public(raw.as_bytes()));
PathBuf::from(encoded)
}
fn load_inline(prev_ref: &std::path::Path) -> Option<Value> {
let token = prev_ref.to_str()?;
let encoded = token.strip_prefix(PREV_PREFIX)?;
let bytes = crate::cli::env_packs::base64_decode_public(encoded)?;
let raw = std::str::from_utf8(&bytes).ok()?;
serde_json::from_str(raw).ok()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cli::tests_common::{make_bundle_deployment, make_env, make_revision};
use greentic_deploy_spec::RevisionLifecycle;
use tempfile::tempdir;
fn seed_env(store: &LocalFsStore) -> (DeploymentId, RevisionId, RevisionId) {
let mut env = make_env("local");
let deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
let r1 = make_revision("local", "fast2flow", &did, 1, RevisionLifecycle::Ready);
let r2 = make_revision("local", "fast2flow", &did, 2, RevisionLifecycle::Ready);
let rid1 = r1.revision_id;
let rid2 = r2.revision_id;
env.bundles.push(deployment);
env.revisions.push(r1);
env.revisions.push(r2);
store.save(&env).unwrap();
(did, rid1, rid2)
}
#[test]
fn set_then_show_returns_split() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, _) = seed_env(&store);
let outcome = set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_bps: Some(10_000),
weight_percent: None,
}],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap();
assert_eq!(
outcome.result.get("generation").and_then(|v| v.as_u64()),
Some(0)
);
let shown = show(
&store,
&OpFlags::default(),
Some(TrafficShowPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
}),
)
.unwrap();
let entries = shown
.result
.get("entries")
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(entries.len(), 1);
}
#[test]
fn set_rejects_sum_not_10000() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, rid2) = seed_env(&store);
let err = set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![
TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_percent: Some(60),
weight_bps: None,
},
TrafficSetEntryPayload {
revision_id: rid2.to_string(),
weight_percent: Some(30),
weight_bps: None,
},
],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap_err();
assert!(matches!(err, OpError::Spec(_)), "got {err:?}");
}
#[test]
fn set_then_rollback_restores_previous() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, rid2) = seed_env(&store);
set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_percent: Some(100),
weight_bps: None,
}],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap();
set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![
TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_percent: Some(50),
weight_bps: None,
},
TrafficSetEntryPayload {
revision_id: rid2.to_string(),
weight_percent: Some(50),
weight_bps: None,
},
],
updated_by: "test".to_string(),
idempotency_key: "k2".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap();
let rolled = rollback(
&store,
&OpFlags::default(),
Some(TrafficShowPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
}),
)
.unwrap();
let entries = rolled
.result
.get("entries")
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(
entries[0].get("weight_bps").and_then(|v| v.as_u64()),
Some(10_000)
);
}
#[test]
fn set_rejects_revision_from_other_deployment() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("local");
let d1 = make_bundle_deployment("local", "fast2flow");
let did1 = d1.deployment_id;
let mut d2 = make_bundle_deployment("local", "llm-router");
d2.customer_id = greentic_deploy_spec::CustomerId::new("local-dev");
let did2 = d2.deployment_id;
let r1 = make_revision("local", "fast2flow", &did1, 1, RevisionLifecycle::Ready);
let r2 = make_revision("local", "llm-router", &did2, 1, RevisionLifecycle::Ready);
let rid2 = r2.revision_id;
env.bundles.push(d1);
env.bundles.push(d2);
env.revisions.push(r1);
env.revisions.push(r2);
store.save(&env).unwrap();
let err = set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did1.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid2.to_string(),
weight_percent: Some(100),
weight_bps: None,
}],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap_err();
assert!(matches!(err, OpError::InvalidArgument(_)), "got {err:?}");
}
#[test]
fn set_same_idempotency_key_same_payload_is_no_op() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, _) = seed_env(&store);
let payload = TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_bps: Some(10_000),
weight_percent: None,
}],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
};
let first = set(&store, &OpFlags::default(), Some(payload.clone())).unwrap();
assert_eq!(
first.result.get("generation").and_then(|v| v.as_u64()),
Some(0)
);
let retry = set(&store, &OpFlags::default(), Some(payload)).unwrap();
assert_eq!(
retry.result.get("generation").and_then(|v| v.as_u64()),
Some(0),
"generation must stay at 0 on idempotent retry"
);
assert_eq!(
retry.result.get("has_previous").and_then(|v| v.as_bool()),
Some(false),
"previous_split_ref must stay empty on idempotent retry"
);
}
#[test]
fn set_same_idempotency_key_different_payload_conflicts() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, rid2) = seed_env(&store);
let p1 = TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_bps: Some(10_000),
weight_percent: None,
}],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
};
set(&store, &OpFlags::default(), Some(p1)).unwrap();
let p2 = TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![
TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_percent: Some(50),
weight_bps: None,
},
TrafficSetEntryPayload {
revision_id: rid2.to_string(),
weight_percent: Some(50),
weight_bps: None,
},
],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
};
let err = set(&store, &OpFlags::default(), Some(p2)).unwrap_err();
assert!(matches!(err, OpError::Conflict(_)), "got {err:?}");
}
#[test]
fn set_retry_preserves_rollback_target() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, rid2) = seed_env(&store);
set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_bps: Some(10_000),
weight_percent: None,
}],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap();
let k2_payload = TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![
TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_percent: Some(50),
weight_bps: None,
},
TrafficSetEntryPayload {
revision_id: rid2.to_string(),
weight_percent: Some(50),
weight_bps: None,
},
],
updated_by: "test".to_string(),
idempotency_key: "k2".to_string(),
authorization_ref: default_authorization_ref(),
};
set(&store, &OpFlags::default(), Some(k2_payload.clone())).unwrap();
set(&store, &OpFlags::default(), Some(k2_payload)).unwrap();
let rolled = rollback(
&store,
&OpFlags::default(),
Some(TrafficShowPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
}),
)
.unwrap();
let entries = rolled
.result
.get("entries")
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(
entries.len(),
1,
"rollback must restore the single-entry k1 split, not the retried k2"
);
assert_eq!(
entries[0].get("weight_bps").and_then(|v| v.as_u64()),
Some(10_000)
);
}
#[test]
fn set_records_idempotency_key_and_generation_in_audit() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, _) = seed_env(&store);
set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_bps: Some(10_000),
weight_percent: None,
}],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap();
let log = dir.path().join("local").join("audit").join("events.jsonl");
let raw = std::fs::read_to_string(&log).unwrap();
let event: crate::environment::AuditEvent = serde_json::from_str(raw.trim_end()).unwrap();
assert_eq!(event.noun, "traffic");
assert_eq!(event.verb, "set");
assert_eq!(event.idempotency_key.as_deref(), Some("k1"));
assert_eq!(event.previous_generation, None);
assert_eq!(event.new_generation, Some(0));
}
#[test]
fn set_materializes_runtime_config_on_disk() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, rid2) = seed_env(&store);
set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![
TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_percent: Some(90),
weight_bps: None,
},
TrafficSetEntryPayload {
revision_id: rid2.to_string(),
weight_percent: Some(10),
weight_bps: None,
},
],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap();
let path = dir.path().join("local").join("runtime-config.json");
assert!(path.exists(), "runtime-config.json must be materialized");
let cfg: greentic_deploy_spec::RuntimeConfig =
serde_json::from_slice(&std::fs::read(&path).unwrap()).unwrap();
assert_eq!(cfg.schema.as_str(), SchemaVersion::RUNTIME_CONFIG_V1);
assert_eq!(cfg.env_id.as_str(), "local");
assert_eq!(cfg.revisions.len(), 2);
let sum: u32 = cfg.revisions.iter().map(|b| b.weight_bps).sum();
assert_eq!(sum, 10_000);
}
#[test]
fn refresh_deletes_runtime_config_when_no_splits_remain() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, _) = seed_env(&store);
set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_bps: Some(10_000),
weight_percent: None,
}],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap();
let path = dir.path().join("local").join("runtime-config.json");
assert!(path.exists());
let env_id = EnvId::try_from("local").unwrap();
let mut env = store.load(&env_id).unwrap();
env.traffic_splits.clear();
store.save(&env).unwrap();
store
.transact(&env_id, |locked| {
let env = locked.load()?;
locked.refresh_runtime_config(&env)
})
.unwrap();
assert!(
!path.exists(),
"runtime-config.json must be removed when no split routes a revision"
);
}
#[test]
fn refresh_is_noop_when_projection_unchanged() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, _) = seed_env(&store);
set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_bps: Some(10_000),
weight_percent: None,
}],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap();
let path = dir.path().join("local").join("runtime-config.json");
let before = std::fs::read(&path).unwrap();
let env_id = EnvId::try_from("local").unwrap();
store
.transact(&env_id, |locked| {
let env = locked.load()?;
locked.refresh_runtime_config(&env)
})
.unwrap();
assert_eq!(std::fs::read(&path).unwrap(), before);
let backups = dir.path().join("local").join("backups");
let backup_count = std::fs::read_dir(&backups)
.map(|rd| {
rd.filter_map(Result::ok)
.filter(|e| {
e.file_name()
.to_string_lossy()
.starts_with("runtime-config.json")
})
.count()
})
.unwrap_or(0);
assert_eq!(backup_count, 0, "no-op refresh must not back up the config");
}
#[test]
fn set_rejects_non_ready_revision_and_materializes_nothing() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("local");
let dep = make_bundle_deployment("local", "fast2flow");
let did = dep.deployment_id;
let staged = make_revision("local", "fast2flow", &did, 1, RevisionLifecycle::Staged);
let rid = staged.revision_id;
env.bundles.push(dep);
env.revisions.push(staged);
store.save(&env).unwrap();
let err = set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid.to_string(),
weight_bps: Some(10_000),
weight_percent: None,
}],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap_err();
assert!(matches!(err, OpError::Conflict(_)), "got {err:?}");
assert!(
!dir.path()
.join("local")
.join("runtime-config.json")
.exists(),
"a rejected set must not leave a runtime-config behind"
);
}
#[test]
fn rollback_refuses_when_restored_revision_no_longer_ready() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, rid2) = seed_env(&store);
let base = |key: &str, rid: &RevisionId| TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid.to_string(),
weight_bps: Some(10_000),
weight_percent: None,
}],
updated_by: "test".to_string(),
idempotency_key: key.to_string(),
authorization_ref: default_authorization_ref(),
};
set(&store, &OpFlags::default(), Some(base("k1", &rid1))).unwrap();
set(&store, &OpFlags::default(), Some(base("k2", &rid2))).unwrap();
let env_id = EnvId::try_from("local").unwrap();
let mut env = store.load(&env_id).unwrap();
let i = env
.revisions
.iter()
.position(|r| r.revision_id == rid1)
.unwrap();
env.revisions[i].lifecycle = RevisionLifecycle::Archived;
store.save(&env).unwrap();
let err = rollback(
&store,
&OpFlags::default(),
Some(TrafficShowPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
}),
)
.unwrap_err();
assert!(matches!(err, OpError::Conflict(_)), "got {err:?}");
}
#[test]
fn idempotent_retry_reconciles_stale_runtime_config() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, _) = seed_env(&store);
let payload = TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_bps: Some(10_000),
weight_percent: None,
}],
updated_by: "test".to_string(),
idempotency_key: "k1".to_string(),
authorization_ref: default_authorization_ref(),
};
set(&store, &OpFlags::default(), Some(payload.clone())).unwrap();
let path = dir.path().join("local").join("runtime-config.json");
assert!(path.exists());
std::fs::remove_file(&path).unwrap();
set(&store, &OpFlags::default(), Some(payload)).unwrap();
assert!(
path.exists(),
"idempotent retry must reconcile a stale runtime-config"
);
}
#[test]
fn parse_entry_arg_accepts_plain_percent() {
let e = parse_entry_arg("01H000000000000000000000R1=99").unwrap();
assert_eq!(e.revision_id, "01H000000000000000000000R1");
assert_eq!(e.weight_bps, None);
assert_eq!(e.weight_percent, Some(99));
}
#[test]
fn parse_entry_arg_accepts_percent_suffix() {
let e = parse_entry_arg("rev1=50%").unwrap();
assert_eq!(e.weight_percent, Some(50));
assert_eq!(e.weight_bps, None);
}
#[test]
fn parse_entry_arg_accepts_basis_points() {
let e = parse_entry_arg("rev1=2500bps").unwrap();
assert_eq!(e.weight_bps, Some(2500));
assert_eq!(e.weight_percent, None);
}
#[test]
fn parse_entry_arg_rejects_missing_separator() {
let err = parse_entry_arg("rev1-99").unwrap_err();
assert!(matches!(err, OpError::InvalidArgument(_)), "got {err:?}");
}
#[test]
fn parse_entry_arg_rejects_empty_revision_id() {
let err = parse_entry_arg("=99").unwrap_err();
assert!(matches!(err, OpError::InvalidArgument(_)), "got {err:?}");
}
#[test]
fn parse_entry_arg_rejects_empty_weight() {
let err = parse_entry_arg("rev1=").unwrap_err();
assert!(matches!(err, OpError::InvalidArgument(_)), "got {err:?}");
}
#[test]
fn parse_entry_arg_rejects_non_numeric_weight() {
let err = parse_entry_arg("rev1=many").unwrap_err();
assert!(matches!(err, OpError::InvalidArgument(_)), "got {err:?}");
}
#[test]
fn parse_entry_arg_rejects_non_numeric_bps() {
let err = parse_entry_arg("rev1=manybps").unwrap_err();
assert!(matches!(err, OpError::InvalidArgument(_)), "got {err:?}");
}
#[test]
fn payload_from_set_args_returns_none_when_blank() {
let args = TrafficSetArgs {
env_id: None,
entries: Vec::new(),
deployment: None,
idempotency_key: None,
updated_by: None,
authorization_ref: None,
};
assert!(payload_from_set_args(args).unwrap().is_none());
}
#[test]
fn payload_from_set_args_requires_deployment_when_env_present() {
let args = TrafficSetArgs {
env_id: Some("local".to_string()),
entries: vec!["rev1=100".to_string()],
deployment: None,
idempotency_key: None,
updated_by: None,
authorization_ref: None,
};
let err = payload_from_set_args(args).unwrap_err();
assert!(matches!(err, OpError::InvalidArgument(_)), "got {err:?}");
}
#[test]
fn payload_from_set_args_requires_entries() {
let args = TrafficSetArgs {
env_id: Some("local".to_string()),
entries: Vec::new(),
deployment: Some(DeploymentId::new().to_string()),
idempotency_key: None,
updated_by: None,
authorization_ref: None,
};
let err = payload_from_set_args(args).unwrap_err();
assert!(matches!(err, OpError::InvalidArgument(_)), "got {err:?}");
}
#[test]
fn payload_from_set_args_requires_idempotency_key() {
let did = DeploymentId::new();
let args = TrafficSetArgs {
env_id: Some("local".to_string()),
entries: vec!["rev1=100".to_string()],
deployment: Some(did.to_string()),
idempotency_key: None,
updated_by: None,
authorization_ref: None,
};
let err = payload_from_set_args(args).unwrap_err();
assert!(matches!(err, OpError::InvalidArgument(_)), "got {err:?}");
let OpError::InvalidArgument(msg) = err else {
unreachable!()
};
assert!(
msg.contains("--idempotency-key"),
"error must mention the flag, got `{msg}`"
);
}
#[test]
fn payload_from_set_args_builds_full_payload_with_defaults() {
let did = DeploymentId::new();
let args = TrafficSetArgs {
env_id: Some("local".to_string()),
entries: vec!["rev1=90".to_string(), "rev2=10bps".to_string()],
deployment: Some(did.to_string()),
idempotency_key: Some("k-test".to_string()),
updated_by: None,
authorization_ref: None,
};
let payload = payload_from_set_args(args).unwrap().unwrap();
assert_eq!(payload.environment_id, "local");
assert_eq!(payload.deployment_id, did.to_string());
assert_eq!(payload.entries.len(), 2);
assert_eq!(payload.entries[0].weight_percent, Some(90));
assert_eq!(payload.entries[1].weight_bps, Some(10));
assert_eq!(payload.updated_by, "operator", "default actor");
assert_eq!(
payload.authorization_ref,
PathBuf::from("auth.json"),
"default authorization ref"
);
assert_eq!(payload.idempotency_key, "k-test");
}
#[test]
fn payload_from_set_args_honors_explicit_overrides() {
let did = DeploymentId::new();
let args = TrafficSetArgs {
env_id: Some("local".to_string()),
entries: vec!["rev1=100".to_string()],
deployment: Some(did.to_string()),
idempotency_key: Some("k-explicit".to_string()),
updated_by: Some("ci".to_string()),
authorization_ref: Some(PathBuf::from("custom.json")),
};
let payload = payload_from_set_args(args).unwrap().unwrap();
assert_eq!(payload.idempotency_key, "k-explicit");
assert_eq!(payload.updated_by, "ci");
assert_eq!(payload.authorization_ref, PathBuf::from("custom.json"));
}
#[test]
fn payload_from_target_args_returns_none_when_blank() {
let args = TrafficTargetArgs {
env_id: None,
deployment: None,
};
assert!(payload_from_target_args(args).unwrap().is_none());
}
#[test]
fn payload_from_target_args_requires_both_when_either_present() {
let args = TrafficTargetArgs {
env_id: Some("local".to_string()),
deployment: None,
};
let err = payload_from_target_args(args).unwrap_err();
assert!(matches!(err, OpError::InvalidArgument(_)), "got {err:?}");
}
#[test]
fn payload_from_target_args_builds_payload() {
let did = DeploymentId::new();
let args = TrafficTargetArgs {
env_id: Some("local".to_string()),
deployment: Some(did.to_string()),
};
let payload = payload_from_target_args(args).unwrap().unwrap();
assert_eq!(payload.environment_id, "local");
assert_eq!(payload.deployment_id, did.to_string());
}
#[test]
fn clap_set_payload_drives_real_traffic_set() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, rid2) = seed_env(&store);
let args = TrafficSetArgs {
env_id: Some("local".to_string()),
entries: vec![format!("{rid1}=90"), format!("{rid2}=10")],
deployment: Some(did.to_string()),
idempotency_key: Some("k-clap".to_string()),
updated_by: Some("test".to_string()),
authorization_ref: None,
};
let payload = payload_from_set_args(args).unwrap();
let outcome = set(&store, &OpFlags::default(), payload).unwrap();
let entries = outcome
.result
.get("entries")
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(entries.len(), 2);
let total: u64 = entries
.iter()
.map(|e| e.get("weight_bps").and_then(|v| v.as_u64()).unwrap())
.sum();
assert_eq!(total, 10_000, "clap-built payload must satisfy spec sum");
}
#[test]
fn clap_retry_preserves_rollback_target() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, rid2) = seed_env(&store);
let a_args = TrafficSetArgs {
env_id: Some("local".to_string()),
entries: vec![format!("{rid1}=100")],
deployment: Some(did.to_string()),
idempotency_key: Some("k1".to_string()),
updated_by: Some("test".to_string()),
authorization_ref: None,
};
set(
&store,
&OpFlags::default(),
payload_from_set_args(a_args).unwrap(),
)
.unwrap();
let b_args = || TrafficSetArgs {
env_id: Some("local".to_string()),
entries: vec![format!("{rid1}=50"), format!("{rid2}=50")],
deployment: Some(did.to_string()),
idempotency_key: Some("k2".to_string()),
updated_by: Some("test".to_string()),
authorization_ref: None,
};
set(
&store,
&OpFlags::default(),
payload_from_set_args(b_args()).unwrap(),
)
.unwrap();
let retry = set(
&store,
&OpFlags::default(),
payload_from_set_args(b_args()).unwrap(),
)
.unwrap();
assert_eq!(
retry.result.get("generation").and_then(|v| v.as_u64()),
Some(1),
"retry must replay B's generation, not advance"
);
let rolled = rollback(
&store,
&OpFlags::default(),
Some(TrafficShowPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
}),
)
.unwrap();
let entries = rolled
.result
.get("entries")
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(
entries.len(),
1,
"rollback must land on A (single entry), not the retried B"
);
assert_eq!(
entries[0].get("weight_bps").and_then(|v| v.as_u64()),
Some(10_000),
"rollback must restore 100% rev1"
);
}
use crate::rollout_telemetry::test_capture::{capture_events, count};
use std::collections::BTreeSet;
fn observed(events: &[String]) -> BTreeSet<String> {
events.iter().cloned().collect()
}
fn set_payload(did: &DeploymentId, rid: &RevisionId, key: &str) -> TrafficSetPayload {
TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![TrafficSetEntryPayload {
revision_id: rid.to_string(),
weight_bps: Some(10_000),
weight_percent: None,
}],
updated_by: "test".to_string(),
idempotency_key: key.to_string(),
authorization_ref: default_authorization_ref(),
}
}
#[test]
fn set_emits_traffic_split_applied() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, _) = seed_env(&store);
let (res, events) = capture_events(|| {
set(
&store,
&OpFlags::default(),
Some(set_payload(&did, &rid1, "k1")),
)
});
res.unwrap();
let observed = observed(&events);
assert!(
observed.contains("rollout.traffic_split.applied"),
"observed events: {observed:?}"
);
}
#[test]
fn set_idempotent_replay_does_not_double_emit() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, _) = seed_env(&store);
let (res, events) = capture_events(|| {
set(
&store,
&OpFlags::default(),
Some(set_payload(&did, &rid1, "k1")),
)
.unwrap();
set(
&store,
&OpFlags::default(),
Some(set_payload(&did, &rid1, "k1")),
)
});
res.unwrap();
assert_eq!(
count(&events, "rollout.traffic_split.applied"),
1,
"expected exactly one TrafficSplitApplied event across set + replay; \
captured: {:?}",
observed(&events)
);
}
#[test]
fn rollback_emits_traffic_split_applied() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let (did, rid1, rid2) = seed_env(&store);
set(
&store,
&OpFlags::default(),
Some(set_payload(&did, &rid1, "k1")),
)
.unwrap();
set(
&store,
&OpFlags::default(),
Some(TrafficSetPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
entries: vec![
TrafficSetEntryPayload {
revision_id: rid1.to_string(),
weight_bps: Some(5_000),
weight_percent: None,
},
TrafficSetEntryPayload {
revision_id: rid2.to_string(),
weight_bps: Some(5_000),
weight_percent: None,
},
],
updated_by: "test".to_string(),
idempotency_key: "k2".to_string(),
authorization_ref: default_authorization_ref(),
}),
)
.unwrap();
let (res, events) = capture_events(|| {
rollback(
&store,
&OpFlags::default(),
Some(TrafficShowPayload {
environment_id: "local".to_string(),
deployment_id: did.to_string(),
}),
)
});
res.unwrap();
assert_eq!(
count(&events, "rollout.traffic_split.applied"),
1,
"rollback must emit exactly one TrafficSplitApplied; \
captured: {:?}",
observed(&events)
);
}
}