use std::path::PathBuf;
use chrono::Utc;
use greentic_deploy_spec::{
BundleId, DeploymentId, EnvId, PackId, PackListEntry, Revision, RevisionId, RevisionLifecycle,
SchemaVersion, SemVer, is_valid_transition,
};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use crate::environment::{EnvironmentStore, LocalFsStore};
use crate::rollout_telemetry::emit_lifecycle_event;
use greentic_deploy_spec::Environment;
use greentic_telemetry::RolloutEvent;
use super::{AuditCtx, OpError, OpFlags, OpOutcome, audit_and_record};
const NOUN: &str = "revisions";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevisionStagePayload {
pub environment_id: String,
pub deployment_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bundle_path: Option<PathBuf>,
#[serde(default = "default_bundle_digest")]
pub bundle_digest: String,
#[serde(default)]
pub pack_list: Vec<PackListEntryPayload>,
#[serde(default)]
pub pack_list_lock_ref: PathBuf,
#[serde(default = "default_config_digest")]
pub config_digest: String,
#[serde(default = "default_signature_sidecar_ref")]
pub signature_sidecar_ref: PathBuf,
#[serde(default = "default_drain_seconds")]
pub drain_seconds: u32,
}
pub(super) fn default_bundle_digest() -> String {
"sha256:00".to_string()
}
pub(super) fn default_config_digest() -> String {
"sha256:00".to_string()
}
pub(super) fn default_signature_sidecar_ref() -> PathBuf {
PathBuf::from("rev.sig")
}
pub(super) fn default_drain_seconds() -> u32 {
30
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PackListEntryPayload {
pub pack_id: String,
pub version: String,
pub digest: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_uri: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevisionTransitionPayload {
pub environment_id: String,
pub revision_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevisionSummary {
pub revision_id: String,
pub deployment_id: String,
pub bundle_id: String,
pub sequence: u64,
pub lifecycle: RevisionLifecycle,
}
impl From<&Revision> for RevisionSummary {
fn from(r: &Revision) -> Self {
Self {
revision_id: r.revision_id.to_string(),
deployment_id: r.deployment_id.to_string(),
bundle_id: r.bundle_id.as_str().to_string(),
sequence: r.sequence,
lifecycle: r.lifecycle,
}
}
}
pub fn stage(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionStagePayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "stage", stage_schema()));
}
let payload = resolve_payload::<RevisionStagePayload>(flags, payload)?;
let env_id = parse_env_id(&payload.environment_id)?;
let deployment_id = parse_deployment_id(&payload.deployment_id)?;
let pack_list = if payload.bundle_path.is_some() {
Vec::new()
} else {
payload
.pack_list
.into_iter()
.map(|e| {
Ok::<_, OpError>(PackListEntry {
pack_id: PackId::new(e.pack_id),
version: e
.version
.parse::<SemVer>()
.map_err(|err| OpError::InvalidArgument(format!("pack version: {err}")))?,
digest: e.digest,
source_uri: e.source_uri,
})
})
.collect::<Result<Vec<_>, _>>()?
};
if !is_valid_transition(RevisionLifecycle::Inactive, RevisionLifecycle::Staged) {
return Err(OpError::Conflict(
"spec rejects inactive → staged".to_string(),
));
}
let ctx = AuditCtx {
env_id: env_id.clone(),
noun: NOUN,
verb: "stage",
target: json!({
"deployment_id": deployment_id.to_string(),
"lifecycle_to": "staged",
}),
idempotency_key: None,
};
audit_and_record(store, ctx, |_committed| {
let summary = store.transact(&env_id, |locked| -> Result<RevisionSummary, OpError> {
let mut env = locked.load()?;
let deployment = env
.bundles
.iter()
.find(|b| b.deployment_id == deployment_id)
.ok_or_else(|| {
OpError::NotFound(format!(
"deployment `{deployment_id}` not found in env `{env_id}`"
))
})?
.clone();
let bundle_id = deployment.bundle_id.clone();
let next_sequence = env
.revisions
.iter()
.filter(|r| r.deployment_id == deployment_id)
.map(|r| r.sequence)
.max()
.unwrap_or(0)
+ 1;
let now = Utc::now();
let revision_id = crate::environment::mint_revision_id();
let (bundle_digest, revision_pack_list, pack_list_lock_ref) = match &payload.bundle_path
{
Some(bundle_path) => {
let env_dir = store.env_dir(&env_id)?;
let staged = super::bundle_stage::stage_local_bundle(
&env_dir,
revision_id,
bundle_path,
)?;
(staged.bundle_digest, Vec::new(), staged.pack_list_lock_ref)
}
None => (
payload.bundle_digest.clone(),
pack_list.clone(),
payload.pack_list_lock_ref.clone(),
),
};
let staged = Revision {
schema: SchemaVersion::new(SchemaVersion::REVISION_V1),
revision_id,
env_id: env_id.clone(),
bundle_id,
deployment_id,
sequence: next_sequence,
created_at: now,
bundle_digest,
pack_list: revision_pack_list,
pack_list_lock_ref,
config_digest: payload.config_digest.clone(),
signature_sidecar_ref: payload.signature_sidecar_ref.clone(),
lifecycle: RevisionLifecycle::Staged,
staged_at: Some(now),
warmed_at: None,
drain_seconds: payload.drain_seconds,
abort_metrics: Vec::new(),
};
env.revisions.push(staged);
locked.save(&env)?;
Ok(RevisionSummary::from(
env.revisions
.iter()
.find(|r| r.revision_id == revision_id)
.expect("just pushed"),
))
})?;
let outcome = OpOutcome::new(
NOUN,
"stage",
serde_json::to_value(summary).expect("RevisionSummary is json-safe"),
);
Ok((outcome, super::AuditGens::NONE))
})
}
pub fn warm(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
) -> Result<OpOutcome, OpError> {
warm_with_health_gate(store, flags, payload, |_env, _revision| Ok(()))
}
pub fn warm_with_health_gate<G>(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
health_gate: G,
) -> Result<OpOutcome, OpError>
where
G: FnOnce(
&greentic_deploy_spec::Environment,
&Revision,
) -> Result<(), crate::environment::HealthGateFailure>,
{
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "warm", transition_schema()));
}
transition_with_health_gate(
store,
flags,
payload,
"warm",
&[
(RevisionLifecycle::Staged, RevisionLifecycle::Warming),
(RevisionLifecycle::Warming, RevisionLifecycle::Ready),
],
|r| {
r.warmed_at = Some(Utc::now());
},
false,
health_gate,
)
}
pub fn drain(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "drain", transition_schema()));
}
transition(
store,
flags,
payload,
"drain",
&[(RevisionLifecycle::Ready, RevisionLifecycle::Draining)],
|_| {},
false,
)
}
pub fn archive(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "archive", transition_schema()));
}
transition(
store,
flags,
payload,
"archive",
&[
(RevisionLifecycle::Staged, RevisionLifecycle::Archived),
(RevisionLifecycle::Warming, RevisionLifecycle::Archived),
(RevisionLifecycle::Ready, RevisionLifecycle::Archived),
(RevisionLifecycle::Failed, RevisionLifecycle::Archived),
(RevisionLifecycle::Draining, RevisionLifecycle::Inactive),
(RevisionLifecycle::Inactive, RevisionLifecycle::Archived),
],
|_| {},
true,
)
}
pub fn list(store: &LocalFsStore, flags: &OpFlags, env_id: &str) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(
NOUN,
"list",
json!({"input_schema": "env_id positional"}),
));
}
let env_id = parse_env_id(env_id)?;
if !store.exists(&env_id)? {
return Err(OpError::NotFound(format!("environment `{env_id}`")));
}
let env = store.load(&env_id)?;
let revisions: Vec<RevisionSummary> = env.revisions.iter().map(RevisionSummary::from).collect();
Ok(OpOutcome::new(
NOUN,
"list",
json!({"environment_id": env_id.as_str(), "revisions": revisions}),
))
}
fn transition<F: FnOnce(&mut Revision)>(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
op: &'static str,
accepted_chain: &[(RevisionLifecycle, RevisionLifecycle)],
on_final: F,
prune_from_splits: bool,
) -> Result<OpOutcome, OpError> {
transition_with_health_gate(
store,
flags,
payload,
op,
accepted_chain,
on_final,
prune_from_splits,
|_env, _revision| Ok(()),
)
}
#[allow(clippy::too_many_arguments)]
fn transition_with_health_gate<F, G>(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
op: &'static str,
accepted_chain: &[(RevisionLifecycle, RevisionLifecycle)],
on_final: F,
prune_from_splits: bool,
health_gate: G,
) -> Result<OpOutcome, OpError>
where
F: FnOnce(&mut Revision),
G: FnOnce(
&greentic_deploy_spec::Environment,
&Revision,
) -> Result<(), crate::environment::HealthGateFailure>,
{
let payload = resolve_payload::<RevisionTransitionPayload>(flags, payload)?;
let env_id = parse_env_id(&payload.environment_id)?;
let revision_id = parse_revision_id(&payload.revision_id)?;
let lifecycle_to = accepted_chain.last().map(|(_, to)| *to);
let ctx = AuditCtx {
env_id: env_id.clone(),
noun: NOUN,
verb: op,
target: json!({
"revision_id": revision_id.to_string(),
"lifecycle_to": lifecycle_to,
}),
idempotency_key: None,
};
audit_and_record(store, ctx, |committed| {
let (revision, env, starting_lifecycle) = store.transact(
&env_id,
|locked| -> Result<(Revision, Environment, Option<RevisionLifecycle>), OpError> {
let starting_lifecycle = match locked.load() {
Ok(e) => e
.revisions
.iter()
.find(|r| r.revision_id == revision_id)
.map(|r| r.lifecycle),
Err(err) => {
tracing::warn!(
op = op,
env_id = %env_id,
revision_id = %revision_id,
error = %err,
"C5.3: failed to capture starting lifecycle; an `archive` \
eviction emit may be skipped"
);
None
}
};
let revision = match crate::environment::apply_revision_transition_with_health_gate(
locked,
revision_id,
accepted_chain,
on_final,
prune_from_splits,
health_gate,
) {
Ok(r) => {
committed.mark_committed();
r
}
Err(e @ crate::environment::LifecycleError::HealthGateFailed { .. }) => {
committed.mark_committed();
if let Ok(env_for_emit) = locked.load()
&& let Some(rev_for_emit) = env_for_emit
.revisions
.iter()
.find(|r| r.revision_id == revision_id)
{
emit_for_op(op, true, None, &env_for_emit, rev_for_emit);
}
return Err(OpError::from(e));
}
Err(other) => return Err(OpError::from(other)),
};
let env = locked.load()?;
locked.refresh_runtime_config(&env)?;
Ok((revision, env, starting_lifecycle))
},
)?;
emit_for_op(op, false, starting_lifecycle, &env, &revision);
let summary = RevisionSummary::from(&revision);
let outcome = OpOutcome::new(
NOUN,
op,
serde_json::to_value(summary).expect("RevisionSummary is json-safe"),
);
Ok((outcome, super::AuditGens::NONE))
})
}
fn emit_for_op(
op: &'static str,
gate_failed: bool,
starting_lifecycle: Option<RevisionLifecycle>,
env: &Environment,
revision: &Revision,
) {
match (op, gate_failed) {
("warm", false) => {
emit_lifecycle_event(RolloutEvent::HealthGatePassed, env, revision);
emit_lifecycle_event(RolloutEvent::RevisionWarmed, env, revision);
}
("warm", true) => {
emit_lifecycle_event(RolloutEvent::HealthGateFailed, env, revision);
}
("drain", false) => {
emit_lifecycle_event(RolloutEvent::RevisionDraining, env, revision);
}
("archive", false) if starting_lifecycle == Some(RevisionLifecycle::Draining) => {
emit_lifecycle_event(RolloutEvent::RevisionEvicted, env, revision);
}
_ => {}
}
}
pub fn payload_from_stage_args(
args: super::dispatch::RevisionStageArgs,
) -> Result<Option<RevisionStagePayload>, OpError> {
let super::dispatch::RevisionStageArgs {
env_id,
deployment,
bundle,
} = args;
if env_id.is_none() && deployment.is_none() && bundle.is_none() {
return Ok(None);
}
let environment_id = env_id.ok_or_else(|| {
OpError::InvalidArgument("revisions stage: missing positional `<env_id>`".to_string())
})?;
let deployment_id = deployment.ok_or_else(|| {
OpError::InvalidArgument("revisions stage: missing `--deployment <ULID>`".to_string())
})?;
let bundle_path = bundle.ok_or_else(|| {
OpError::InvalidArgument(
"revisions stage: missing `--bundle <PATH>`. The direct CLI path stages a local \
.gtbundle; use `--answers <file>` for the legacy verbatim path."
.to_string(),
)
})?;
Ok(Some(RevisionStagePayload {
environment_id,
deployment_id,
bundle_path: Some(bundle_path),
bundle_digest: default_bundle_digest(),
pack_list: Vec::new(),
pack_list_lock_ref: PathBuf::new(),
config_digest: default_config_digest(),
signature_sidecar_ref: default_signature_sidecar_ref(),
drain_seconds: default_drain_seconds(),
}))
}
fn resolve_payload<T: serde::de::DeserializeOwned>(
flags: &OpFlags,
payload: Option<T>,
) -> Result<T, OpError> {
if let Some(p) = payload {
return Ok(p);
}
if let Some(path) = &flags.answers {
return super::load_answers::<T>(path);
}
Err(OpError::InvalidArgument(
"no payload provided: pass --answers <path> or supply the payload directly".to_string(),
))
}
fn parse_env_id(raw: &str) -> Result<EnvId, OpError> {
EnvId::try_from(raw).map_err(|e| OpError::InvalidArgument(format!("environment_id: {e}")))
}
fn parse_deployment_id(raw: &str) -> Result<DeploymentId, OpError> {
use std::str::FromStr;
let ulid = ulid::Ulid::from_str(raw)
.map_err(|e| OpError::InvalidArgument(format!("deployment_id: {e}")))?;
Ok(DeploymentId(ulid))
}
fn parse_revision_id(raw: &str) -> Result<RevisionId, OpError> {
use std::str::FromStr;
let ulid = ulid::Ulid::from_str(raw)
.map_err(|e| OpError::InvalidArgument(format!("revision_id: {e}")))?;
Ok(RevisionId(ulid))
}
#[allow(dead_code)]
fn discard_bundle(_id: &BundleId) {
}
fn stage_schema() -> Value {
json!({
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "RevisionStagePayload",
"type": "object",
"required": ["environment_id", "deployment_id"],
"additionalProperties": false,
"properties": {
"environment_id": {"type": "string"},
"deployment_id": {"type": "string", "description": "ULID"},
"bundle_path": {"type": "string", "description": "Local .gtbundle to extract + pin; derives bundle_digest/pack_list/pack_list_lock_ref"},
"bundle_digest": {"type": "string"},
"pack_list": {"type": "array"},
"pack_list_lock_ref": {"type": "string"},
"config_digest": {"type": "string"},
"signature_sidecar_ref": {"type": "string"},
"drain_seconds": {"type": "integer", "minimum": 0}
}
})
}
fn transition_schema() -> Value {
json!({
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "RevisionTransitionPayload",
"type": "object",
"required": ["environment_id", "revision_id"],
"additionalProperties": false,
"properties": {
"environment_id": {"type": "string"},
"revision_id": {"type": "string", "description": "ULID"}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cli::tests_common::{make_bundle_deployment, make_env};
use tempfile::tempdir;
fn seed_env_with_deployment(store: &LocalFsStore) -> DeploymentId {
let mut env = make_env("local");
let deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
env.bundles.push(deployment);
store.save(&env).unwrap();
did
}
fn stage_payload(deployment_id: &DeploymentId) -> RevisionStagePayload {
RevisionStagePayload {
environment_id: "local".to_string(),
deployment_id: deployment_id.to_string(),
bundle_path: None,
bundle_digest: "sha256:00".to_string(),
pack_list: vec![PackListEntryPayload {
pack_id: "greentic.test.pack".to_string(),
version: "1.0.0".to_string(),
digest: "sha256:00".to_string(),
source_uri: None,
}],
pack_list_lock_ref: PathBuf::new(),
config_digest: default_config_digest(),
signature_sidecar_ref: default_signature_sidecar_ref(),
drain_seconds: default_drain_seconds(),
}
}
#[test]
fn stage_creates_revision_in_staged() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let outcome = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
assert_eq!(
outcome.result.get("lifecycle").and_then(|v| v.as_str()),
Some("staged")
);
assert_eq!(
outcome.result.get("sequence").and_then(|v| v.as_u64()),
Some(1)
);
}
#[test]
fn stage_with_local_bundle_pins_packs_into_lockfile() {
use greentic_deploy_spec::PackListLock;
use sha2::{Digest, Sha256};
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let fixture = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("testdata/bundles/perf-smoke-bundle.gtbundle");
let mut payload = stage_payload(&did);
payload.bundle_path = Some(fixture);
payload.pack_list = vec![PackListEntryPayload {
pack_id: "should.be.ignored".to_string(),
version: "9.9.9".to_string(),
digest: "sha256:ff".to_string(),
source_uri: None,
}];
let outcome = stage(&store, &OpFlags::default(), Some(payload)).unwrap();
assert_eq!(
outcome.result.get("lifecycle").and_then(|v| v.as_str()),
Some("staged")
);
let rid = outcome
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let env_id = EnvId::try_from("local").unwrap();
let env = store.load(&env_id).unwrap();
let revision = env
.revisions
.iter()
.find(|r| r.revision_id.to_string() == rid)
.expect("revision persisted");
assert!(
revision.bundle_digest.starts_with("sha256:") && revision.bundle_digest != "sha256:00",
"bundle_digest should be the real archive hash, got {}",
revision.bundle_digest
);
assert!(revision.pack_list.is_empty());
let env_dir = store.env_dir(&env_id).unwrap();
let lock_path = env_dir.join(&revision.pack_list_lock_ref);
assert!(lock_path.is_file(), "pack-list.lock must be a regular file");
let lock: PackListLock =
serde_json::from_slice(&std::fs::read(&lock_path).unwrap()).unwrap();
assert_eq!(lock.revision_id, revision.revision_id);
assert!(!lock.packs.is_empty(), "fixture bundle has a .gtpack");
for pack in &lock.packs {
assert!(pack.path.is_relative(), "lock path must be env-relative");
let pack_path = env_dir.join(&pack.path);
assert!(
pack_path.is_file(),
"extracted .gtpack must exist: {}",
pack_path.display()
);
let bytes = std::fs::read(&pack_path).unwrap();
let expected = format!("sha256:{}", hex::encode(Sha256::digest(&bytes)));
assert_eq!(pack.digest, expected, "lock digest must match the file");
}
}
#[test]
fn stage_args_without_bundle_is_rejected() {
let did = DeploymentId::new();
let args = crate::cli::dispatch::RevisionStageArgs {
env_id: Some("local".to_string()),
deployment: Some(did.to_string()),
bundle: None,
};
let err = payload_from_stage_args(args).unwrap_err();
let msg = format!("{err}");
assert!(
matches!(err, OpError::InvalidArgument(_)) && msg.contains("--bundle"),
"expected a missing --bundle error, got: {msg}"
);
}
#[test]
fn stage_args_empty_defers_to_answers() {
let args = crate::cli::dispatch::RevisionStageArgs {
env_id: None,
deployment: None,
bundle: None,
};
assert!(payload_from_stage_args(args).unwrap().is_none());
}
#[test]
fn stage_bundle_digest_is_bound_to_staged_copy_not_input() {
use sha2::{Digest, Sha256};
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let fixture = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("testdata/bundles/perf-smoke-bundle.gtbundle");
let input = dir.path().join("input.gtbundle");
std::fs::copy(&fixture, &input).unwrap();
let mut payload = stage_payload(&did);
payload.bundle_path = Some(input.clone());
let outcome = stage(&store, &OpFlags::default(), Some(payload)).unwrap();
let rid = outcome
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let env_id = EnvId::try_from("local").unwrap();
let env = store.load(&env_id).unwrap();
let revision = env
.revisions
.iter()
.find(|r| r.revision_id.to_string() == rid)
.unwrap();
let env_dir = store.env_dir(&env_id).unwrap();
let staged = env_dir.join("revisions").join(&rid).join("bundle.gtbundle");
assert!(staged.is_file(), "staged bundle copy must persist");
let staged_digest = format!(
"sha256:{}",
hex::encode(Sha256::digest(std::fs::read(&staged).unwrap()))
);
assert_eq!(revision.bundle_digest, staged_digest);
std::fs::write(&input, b"tampered-after-stage").unwrap();
let staged_digest_after = format!(
"sha256:{}",
hex::encode(Sha256::digest(std::fs::read(&staged).unwrap()))
);
assert_eq!(
revision.bundle_digest, staged_digest_after,
"input mutation must not change the staged artifact's digest"
);
}
#[test]
fn warm_advances_to_ready() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let warmed = warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
}),
)
.unwrap();
assert_eq!(
warmed.result.get("lifecycle").and_then(|v| v.as_str()),
Some("ready")
);
}
#[test]
fn drain_after_warm_succeeds() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.clone(),
}),
)
.unwrap();
let drained = drain(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
}),
)
.unwrap();
assert_eq!(
drained.result.get("lifecycle").and_then(|v| v.as_str()),
Some("draining")
);
}
#[test]
fn drain_from_staged_errors() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let err = drain(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
}),
)
.unwrap_err();
assert!(matches!(err, OpError::Conflict(_)), "got {err:?}");
}
#[test]
fn archive_prunes_current_revisions_when_no_live_traffic() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("local");
let mut deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
let revision = crate::cli::tests_common::make_revision(
"local",
"fast2flow",
&did,
1,
RevisionLifecycle::Ready,
);
let rid = revision.revision_id;
deployment.current_revisions.push(rid);
env.bundles.push(deployment);
env.revisions.push(revision);
store.save(&env).unwrap();
let outcome = archive(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.to_string(),
}),
)
.unwrap();
assert_eq!(
outcome.result.get("lifecycle").and_then(|v| v.as_str()),
Some("archived")
);
let env = store.load(&EnvId::try_from("local").unwrap()).unwrap();
assert!(
env.bundles[0].current_revisions.is_empty(),
"current_revisions should be pruned"
);
}
#[test]
fn archive_refuses_when_revision_is_in_live_traffic_split() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("local");
let mut deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
let revision = crate::cli::tests_common::make_revision(
"local",
"fast2flow",
&did,
1,
RevisionLifecycle::Ready,
);
let rid = revision.revision_id;
deployment.current_revisions.push(rid);
let split = crate::cli::tests_common::make_traffic_split(
"local",
"fast2flow",
&did,
&rid,
"test-key",
);
env.bundles.push(deployment);
env.revisions.push(revision);
env.traffic_splits.push(split);
store.save(&env).unwrap();
let err = archive(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.to_string(),
}),
)
.unwrap_err();
match err {
OpError::Conflict(msg) => {
assert!(
msg.contains("live traffic split")
&& msg.contains("rebalance via `gtc op traffic set`"),
"expected actionable conflict message, got: {msg}"
);
}
other => panic!("expected Conflict, got `{other:?}`"),
}
let env = store.load(&EnvId::try_from("local").unwrap()).unwrap();
assert_eq!(env.revisions[0].lifecycle, RevisionLifecycle::Ready);
assert_eq!(env.traffic_splits.len(), 1);
assert!(env.bundles[0].current_revisions.contains(&rid));
}
#[test]
fn archive_completes_a_drained_revision_through_inactive() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("local");
let deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
let revision = crate::cli::tests_common::make_revision(
"local",
"fast2flow",
&did,
1,
RevisionLifecycle::Inactive,
);
let rid = revision.revision_id;
env.bundles.push(deployment);
env.revisions.push(revision);
store.save(&env).unwrap();
let outcome = archive(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.to_string(),
}),
)
.unwrap();
assert_eq!(
outcome.result.get("lifecycle").and_then(|v| v.as_str()),
Some("archived")
);
}
#[test]
fn list_reflects_stage_calls() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let listed = list(&store, &OpFlags::default(), "local").unwrap();
let revs = listed
.result
.get("revisions")
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(revs.len(), 2);
let seqs: Vec<u64> = revs
.iter()
.filter_map(|r| r.get("sequence").and_then(|v| v.as_u64()))
.collect();
assert_eq!(seqs, vec![1, 2]);
}
#[test]
fn warm_with_passing_gate_lands_ready() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let warmed = warm_with_health_gate(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
}),
|_env, _revision| Ok(()),
)
.unwrap();
assert_eq!(
warmed.result.get("lifecycle").and_then(|v| v.as_str()),
Some("ready")
);
}
#[test]
fn warm_with_failing_gate_persists_failed_and_returns_conflict() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid_str = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let err = warm_with_health_gate(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid_str.clone(),
}),
|_env, _revision| {
Err(crate::environment::HealthGateFailure {
failed_checks: vec![crate::environment::HealthCheckId::RuntimeConfig],
message: "runtime-config.json missing".to_string(),
})
},
)
.unwrap_err();
assert!(matches!(err, OpError::Conflict(_)), "got {err:?}");
let msg = format!("{err}");
assert!(msg.contains("warm/ready health gate"), "msg: {msg}");
assert!(msg.contains("RuntimeConfig"), "msg: {msg}");
let env_id = EnvId::try_from("local").unwrap();
let env = store.load(&env_id).unwrap();
assert_eq!(env.revisions.len(), 1);
assert_eq!(env.revisions[0].lifecycle, RevisionLifecycle::Failed);
}
#[test]
fn warm_failing_gate_with_audit_failure_returns_audit_error() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid_str = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let env_id = EnvId::try_from("local").unwrap();
let env_dir = store.env_dir(&env_id).unwrap();
let events_path = env_dir.join("audit").join("events.jsonl");
let _ = std::fs::remove_file(&events_path);
std::fs::create_dir(&events_path).unwrap();
let err = warm_with_health_gate(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid_str.clone(),
}),
|_env, _revision| {
Err(crate::environment::HealthGateFailure {
failed_checks: vec![crate::environment::HealthCheckId::RuntimeConfig],
message: "runtime-config.json missing".to_string(),
})
},
)
.unwrap_err();
match &err {
OpError::Audit(_) => {}
other => panic!("expected OpError::Audit (fail-closed); got `{other:?}`"),
}
let env = store.load(&env_id).unwrap();
assert_eq!(env.revisions[0].lifecycle, RevisionLifecycle::Failed);
}
#[test]
fn warm_uncommitted_error_with_audit_failure_returns_original_error() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let _did = seed_env_with_deployment(&store);
let env_id = EnvId::try_from("local").unwrap();
let env_dir = store.env_dir(&env_id).unwrap();
std::fs::write(env_dir.join("audit"), b"audit-blocker").unwrap();
let phantom_rid = ulid::Ulid::new().to_string();
let err = warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: phantom_rid,
}),
)
.unwrap_err();
match &err {
OpError::NotFound(_) => {}
other => panic!("expected OpError::NotFound (audit demoted); got `{other:?}`"),
}
}
#[test]
fn warm_ok_with_refresh_failure_and_audit_failure_returns_audit_error() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid_str = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let env_id = EnvId::try_from("local").unwrap();
let env_dir = store.env_dir(&env_id).unwrap();
std::fs::create_dir(env_dir.join("runtime-config.json")).unwrap();
let events_path = env_dir.join("audit").join("events.jsonl");
let _ = std::fs::remove_file(&events_path);
std::fs::create_dir(&events_path).unwrap();
let err = warm_with_health_gate(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid_str,
}),
|_env, _revision| Ok(()),
)
.unwrap_err();
match &err {
OpError::Audit(_) => {}
other => panic!("expected OpError::Audit (fail-closed); got `{other:?}`"),
}
let env = store.load(&env_id).unwrap();
assert_eq!(env.revisions[0].lifecycle, RevisionLifecycle::Ready);
}
use crate::rollout_telemetry::test_capture::capture_events;
use std::collections::BTreeSet;
fn observed(events: &[String]) -> BTreeSet<String> {
events.iter().cloned().collect()
}
#[test]
fn warm_emits_health_gate_passed_and_revision_warmed() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let (result, events) = capture_events(|| {
warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
}),
)
});
result.unwrap();
let observed = observed(&events);
assert!(
observed.contains("rollout.health_gate.passed"),
"observed events: {observed:?}"
);
assert!(
observed.contains("rollout.revision.warmed"),
"observed events: {observed:?}"
);
assert!(!observed.contains("rollout.health_gate.failed"));
}
#[test]
fn warm_with_failing_gate_emits_health_gate_failed() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let (result, events) = capture_events(|| {
warm_with_health_gate(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
}),
|_env, _revision| {
Err(crate::environment::HealthGateFailure {
failed_checks: vec![crate::environment::HealthCheckId::RuntimeConfig],
message: "synthetic gate failure".to_string(),
})
},
)
});
result.unwrap_err();
let observed = observed(&events);
assert!(
observed.contains("rollout.health_gate.failed"),
"observed events: {observed:?}"
);
assert!(!observed.contains("rollout.health_gate.passed"));
assert!(!observed.contains("rollout.revision.warmed"));
}
#[test]
fn drain_emits_revision_draining() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.clone(),
}),
)
.unwrap();
let (result, events) = capture_events(|| {
drain(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
}),
)
});
result.unwrap();
let observed = observed(&events);
assert!(
observed.contains("rollout.revision.draining"),
"observed events: {observed:?}"
);
}
#[test]
fn archive_emits_revision_evicted_on_draining_to_inactive() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.clone(),
}),
)
.unwrap();
drain(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.clone(),
}),
)
.unwrap();
let (result, events) = capture_events(|| {
archive(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
}),
)
});
result.unwrap();
let observed = observed(&events);
assert!(
observed.contains("rollout.revision.evicted"),
"observed events: {observed:?}"
);
}
}