use std::path::PathBuf;
use greentic_deploy_spec::{
BundleId, DeploymentId, EnvId, PackId, PackListEntry, Revision, RevisionId, RevisionLifecycle,
SemVer, is_valid_transition,
};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use crate::environment::{
EnvironmentStore, LocalFsStore, StageRevisionPayload, WarmRevisionPayload,
};
use crate::rollout_telemetry::emit_lifecycle_event;
use greentic_deploy_spec::Environment;
use greentic_telemetry::RolloutEvent;
use super::{
AuditCtx, OpError, OpFlags, OpOutcome, audit_and_record, map_store_err_preserving_noun,
mint_idempotency_key,
};
const NOUN: &str = "revisions";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevisionStagePayload {
pub environment_id: String,
pub deployment_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bundle_path: Option<PathBuf>,
#[serde(default = "default_bundle_digest")]
pub bundle_digest: String,
#[serde(default)]
pub pack_list: Vec<PackListEntryPayload>,
#[serde(default)]
pub pack_list_lock_ref: PathBuf,
#[serde(default = "default_config_digest")]
pub config_digest: String,
#[serde(default = "default_signature_sidecar_ref")]
pub signature_sidecar_ref: PathBuf,
#[serde(default = "default_drain_seconds")]
pub drain_seconds: u32,
}
pub(super) fn default_bundle_digest() -> String {
"sha256:00".to_string()
}
pub(super) fn default_config_digest() -> String {
"sha256:00".to_string()
}
pub(super) fn default_signature_sidecar_ref() -> PathBuf {
PathBuf::from("rev.sig")
}
pub(super) fn default_drain_seconds() -> u32 {
30
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PackListEntryPayload {
pub pack_id: String,
pub version: String,
pub digest: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_uri: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevisionTransitionPayload {
pub environment_id: String,
pub revision_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub idempotency_key: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevisionSummary {
pub revision_id: String,
pub deployment_id: String,
pub bundle_id: String,
pub sequence: u64,
pub lifecycle: RevisionLifecycle,
}
impl From<&Revision> for RevisionSummary {
fn from(r: &Revision) -> Self {
Self {
revision_id: r.revision_id.to_string(),
deployment_id: r.deployment_id.to_string(),
bundle_id: r.bundle_id.as_str().to_string(),
sequence: r.sequence,
lifecycle: r.lifecycle,
}
}
}
pub fn stage(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionStagePayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "stage", stage_schema()));
}
let payload = resolve_payload::<RevisionStagePayload>(flags, payload)?;
let env_id = parse_env_id(&payload.environment_id)?;
let deployment_id = parse_deployment_id(&payload.deployment_id)?;
let pack_list = if payload.bundle_path.is_some() {
Vec::new()
} else {
payload
.pack_list
.into_iter()
.map(|e| {
Ok::<_, OpError>(PackListEntry {
pack_id: PackId::new(e.pack_id),
version: e
.version
.parse::<SemVer>()
.map_err(|err| OpError::InvalidArgument(format!("pack version: {err}")))?,
digest: e.digest,
source_uri: e.source_uri,
})
})
.collect::<Result<Vec<_>, _>>()?
};
if !is_valid_transition(RevisionLifecycle::Inactive, RevisionLifecycle::Staged) {
return Err(OpError::Conflict(
"spec rejects inactive → staged".to_string(),
));
}
let ctx = AuditCtx {
env_id: env_id.clone(),
noun: NOUN,
verb: "stage",
target: json!({
"deployment_id": deployment_id.to_string(),
"lifecycle_to": "staged",
}),
idempotency_key: None,
};
let RevisionStagePayload {
bundle_path,
bundle_digest: payload_bundle_digest,
pack_list_lock_ref: payload_pack_list_lock_ref,
config_digest,
signature_sidecar_ref,
drain_seconds,
..
} = payload;
audit_and_record(store, ctx, |_committed| {
let env = store.load(&env_id).map_err(map_store_err_preserving_noun)?;
let bundle_id = env
.bundles
.iter()
.find(|b| b.deployment_id == deployment_id)
.map(|b| b.bundle_id.clone())
.ok_or_else(|| {
OpError::NotFound(format!(
"deployment `{deployment_id}` not found in env `{env_id}`"
))
})?;
let revision_id = crate::environment::mint_revision_id();
let env_dir = store.env_dir(&env_id)?;
let drop_rev_dir = || {
let rev_dir = env_dir.join("revisions").join(revision_id.to_string());
let _ = std::fs::remove_dir_all(&rev_dir);
};
let has_bundle = bundle_path.is_some();
let (bundle_digest, revision_pack_list, pack_list_lock_ref, pack_config_refs) =
match bundle_path {
Some(bundle_path) => {
let staged = super::bundle_stage::stage_local_bundle(
&env_dir,
revision_id,
&bundle_path,
)?;
let mut lock_derived_pack_list: Vec<PackListEntry> =
Vec::with_capacity(staged.lock.packs.len());
let mut pinned_pack_ids: std::collections::HashSet<String> =
std::collections::HashSet::with_capacity(staged.lock.packs.len());
for lp in &staged.lock.packs {
let pack_id = lp.pack_id.clone();
pinned_pack_ids.insert(pack_id.as_str().to_string());
lock_derived_pack_list.push(PackListEntry::from_lock_primitives(
pack_id,
lp.digest.clone(),
));
}
let rev_dir = env_dir.join("revisions").join(revision_id.to_string());
let pack_config_refs = super::pack_config_stage::materialize_pack_configs(
&env_dir,
&rev_dir,
revision_id,
&env_id,
&bundle_id,
&pinned_pack_ids,
)
.inspect_err(|_| drop_rev_dir())?;
(
staged.bundle_digest,
lock_derived_pack_list,
staged.pack_list_lock_ref,
pack_config_refs,
)
}
None => (
payload_bundle_digest,
pack_list,
payload_pack_list_lock_ref,
Vec::new(),
),
};
let store_payload = StageRevisionPayload {
revision_id,
deployment_id,
bundle_digest,
pack_list: revision_pack_list,
pack_list_lock_ref,
pack_config_refs,
config_digest,
signature_sidecar_ref,
drain_seconds,
};
let revision = store
.stage_revision(&env_id, store_payload, mint_idempotency_key())
.inspect_err(|_| {
if has_bundle {
drop_rev_dir();
}
})
.map_err(map_store_err_preserving_noun)?;
let outcome = OpOutcome::new(
NOUN,
"stage",
serde_json::to_value(RevisionSummary::from(&revision))
.expect("RevisionSummary is json-safe"),
);
Ok((outcome, super::AuditGens::NONE))
})
}
pub fn warm(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
) -> Result<OpOutcome, OpError> {
warm_with_health_gate(store, flags, payload, |_env, _revision| Ok(()))
}
pub fn warm_with_health_gate<G>(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
health_gate: G,
) -> Result<OpOutcome, OpError>
where
G: FnOnce(
&greentic_deploy_spec::Environment,
&Revision,
) -> Result<(), crate::environment::HealthGateFailure>,
{
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "warm", transition_schema()));
}
let payload = resolve_payload::<RevisionTransitionPayload>(flags, payload)?;
let env_id = parse_env_id(&payload.environment_id)?;
let revision_id = parse_revision_id(&payload.revision_id)?;
let idempotency_key = super::resolve_idempotency_key(payload.idempotency_key)?;
let env = store.load(&env_id)?;
let current_revision = env
.revisions
.iter()
.find(|r| r.revision_id == revision_id)
.ok_or_else(|| {
OpError::NotFound(format!(
"revision `{revision_id}` not found in env `{env_id}`"
))
})?;
let current_lifecycle = current_revision.lifecycle;
let mut synthesized = current_revision.clone();
synthesized.lifecycle = RevisionLifecycle::Ready;
let gate_result = health_gate(&env, &synthesized);
let op = "warm";
let ctx = AuditCtx {
env_id: env_id.clone(),
noun: NOUN,
verb: op,
target: json!({
"revision_id": revision_id.to_string(),
"lifecycle_to": RevisionLifecycle::Ready,
}),
idempotency_key: Some(idempotency_key.as_str().to_string()),
};
audit_and_record(store, ctx, |committed| {
let store_result = store
.warm_revision(
&env_id,
WarmRevisionPayload {
revision_id,
health_gate: gate_result,
expected_lifecycle: current_lifecycle,
},
idempotency_key,
)
.inspect_err(|err| {
if err.is_committed_after_save() {
committed.mark_committed();
}
});
match &store_result {
Err(crate::environment::StoreError::Lifecycle(inner))
if matches!(
inner.as_ref(),
crate::environment::LifecycleError::HealthGateFailed { .. }
) =>
{
committed.mark_committed();
if let Ok(env_for_emit) = store.load(&env_id)
&& let Some(rev_for_emit) = env_for_emit
.revisions
.iter()
.find(|r| r.revision_id == revision_id)
{
emit_for_op(op, true, None, &env_for_emit, rev_for_emit);
}
return Err(map_store_err_preserving_noun(store_result.unwrap_err()));
}
_ => {}
}
let outcome = store_result.map_err(map_store_err_preserving_noun)?;
committed.mark_committed();
emit_for_op(
op,
false,
Some(outcome.starting_lifecycle),
&outcome.environment,
&outcome.revision,
);
let summary = RevisionSummary::from(&outcome.revision);
let op_outcome = OpOutcome::new(
NOUN,
op,
serde_json::to_value(summary).expect("RevisionSummary is json-safe"),
);
Ok((op_outcome, super::AuditGens::NONE))
})
}
pub fn drain(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "drain", transition_schema()));
}
typed_transition(
store,
flags,
payload,
"drain",
RevisionLifecycle::Draining,
|env_id, revision_id, key| store.drain_revision(env_id, revision_id, key),
)
}
pub fn archive(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "archive", transition_schema()));
}
typed_transition(
store,
flags,
payload,
"archive",
RevisionLifecycle::Archived,
|env_id, revision_id, key| store.archive_revision(env_id, revision_id, key),
)
}
pub fn list(store: &LocalFsStore, flags: &OpFlags, env_id: &str) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(
NOUN,
"list",
json!({"input_schema": "env_id positional"}),
));
}
let env_id = parse_env_id(env_id)?;
if !store.exists(&env_id)? {
return Err(OpError::NotFound(format!("environment `{env_id}`")));
}
let env = store.load(&env_id)?;
let revisions: Vec<RevisionSummary> = env.revisions.iter().map(RevisionSummary::from).collect();
Ok(OpOutcome::new(
NOUN,
"list",
json!({"environment_id": env_id.as_str(), "revisions": revisions}),
))
}
fn typed_transition<F>(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
op: &'static str,
lifecycle_to: RevisionLifecycle,
call_verb: F,
) -> Result<OpOutcome, OpError>
where
F: FnOnce(
&greentic_deploy_spec::EnvId,
greentic_deploy_spec::RevisionId,
greentic_deploy_spec::IdempotencyKey,
) -> Result<
crate::environment::RevisionTransitionOutcome,
crate::environment::StoreError,
>,
{
let payload = resolve_payload::<RevisionTransitionPayload>(flags, payload)?;
let env_id = parse_env_id(&payload.environment_id)?;
let revision_id = parse_revision_id(&payload.revision_id)?;
let idempotency_key = super::resolve_idempotency_key(payload.idempotency_key)?;
let ctx = AuditCtx {
env_id: env_id.clone(),
noun: NOUN,
verb: op,
target: json!({
"revision_id": revision_id.to_string(),
"lifecycle_to": lifecycle_to,
}),
idempotency_key: Some(idempotency_key.as_str().to_string()),
};
audit_and_record(store, ctx, |committed| {
let outcome = call_verb(&env_id, revision_id, idempotency_key)
.inspect_err(|err| {
if err.is_committed_after_save() {
committed.mark_committed();
}
})
.map_err(map_store_err_preserving_noun)?;
committed.mark_committed();
emit_for_op(
op,
false,
Some(outcome.starting_lifecycle),
&outcome.environment,
&outcome.revision,
);
let summary = RevisionSummary::from(&outcome.revision);
let op_outcome = OpOutcome::new(
NOUN,
op,
serde_json::to_value(summary).expect("RevisionSummary is json-safe"),
);
Ok((op_outcome, super::AuditGens::NONE))
})
}
pub(crate) fn emit_for_op(
op: &'static str,
gate_failed: bool,
starting_lifecycle: Option<RevisionLifecycle>,
env: &Environment,
revision: &Revision,
) {
match (op, gate_failed) {
("warm", false) => {
emit_lifecycle_event(RolloutEvent::HealthGatePassed, env, revision);
emit_lifecycle_event(RolloutEvent::RevisionWarmed, env, revision);
}
("warm", true) => {
emit_lifecycle_event(RolloutEvent::HealthGateFailed, env, revision);
}
("drain", false) => {
emit_lifecycle_event(RolloutEvent::RevisionDraining, env, revision);
}
("archive", false) if starting_lifecycle == Some(RevisionLifecycle::Draining) => {
emit_lifecycle_event(RolloutEvent::RevisionEvicted, env, revision);
}
_ => {}
}
}
pub fn payload_from_stage_args(
args: super::dispatch::RevisionStageArgs,
) -> Result<Option<RevisionStagePayload>, OpError> {
let super::dispatch::RevisionStageArgs {
env_id,
deployment,
bundle,
} = args;
if env_id.is_none() && deployment.is_none() && bundle.is_none() {
return Ok(None);
}
let environment_id = env_id.ok_or_else(|| {
OpError::InvalidArgument("revisions stage: missing positional `<env_id>`".to_string())
})?;
let deployment_id = deployment.ok_or_else(|| {
OpError::InvalidArgument("revisions stage: missing `--deployment <ULID>`".to_string())
})?;
let bundle_path = bundle.ok_or_else(|| {
OpError::InvalidArgument(
"revisions stage: missing `--bundle <PATH>`. The direct CLI path stages a local \
.gtbundle; use `--answers <file>` for the legacy verbatim path."
.to_string(),
)
})?;
Ok(Some(RevisionStagePayload {
environment_id,
deployment_id,
bundle_path: Some(bundle_path),
bundle_digest: default_bundle_digest(),
pack_list: Vec::new(),
pack_list_lock_ref: PathBuf::new(),
config_digest: default_config_digest(),
signature_sidecar_ref: default_signature_sidecar_ref(),
drain_seconds: default_drain_seconds(),
}))
}
fn resolve_payload<T: serde::de::DeserializeOwned>(
flags: &OpFlags,
payload: Option<T>,
) -> Result<T, OpError> {
if let Some(p) = payload {
return Ok(p);
}
if let Some(path) = &flags.answers {
return super::load_answers::<T>(path);
}
Err(OpError::InvalidArgument(
"no payload provided: pass --answers <path> or supply the payload directly".to_string(),
))
}
fn parse_env_id(raw: &str) -> Result<EnvId, OpError> {
EnvId::try_from(raw).map_err(|e| OpError::InvalidArgument(format!("environment_id: {e}")))
}
fn parse_deployment_id(raw: &str) -> Result<DeploymentId, OpError> {
use std::str::FromStr;
let ulid = ulid::Ulid::from_str(raw)
.map_err(|e| OpError::InvalidArgument(format!("deployment_id: {e}")))?;
Ok(DeploymentId(ulid))
}
fn parse_revision_id(raw: &str) -> Result<RevisionId, OpError> {
use std::str::FromStr;
let ulid = ulid::Ulid::from_str(raw)
.map_err(|e| OpError::InvalidArgument(format!("revision_id: {e}")))?;
Ok(RevisionId(ulid))
}
#[allow(dead_code)]
fn discard_bundle(_id: &BundleId) {
}
fn stage_schema() -> Value {
json!({
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "RevisionStagePayload",
"type": "object",
"required": ["environment_id", "deployment_id"],
"additionalProperties": false,
"properties": {
"environment_id": {"type": "string"},
"deployment_id": {"type": "string", "description": "ULID"},
"bundle_path": {"type": "string", "description": "Local .gtbundle to extract + pin; derives bundle_digest/pack_list/pack_list_lock_ref"},
"bundle_digest": {"type": "string"},
"pack_list": {"type": "array"},
"pack_list_lock_ref": {"type": "string"},
"config_digest": {"type": "string"},
"signature_sidecar_ref": {"type": "string"},
"drain_seconds": {"type": "integer", "minimum": 0}
}
})
}
fn transition_schema() -> Value {
json!({
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "RevisionTransitionPayload",
"type": "object",
"required": ["environment_id", "revision_id"],
"additionalProperties": false,
"properties": {
"environment_id": {"type": "string"},
"revision_id": {"type": "string", "description": "ULID"},
"idempotency_key": {
"type": "string",
"description": "Optional A8 §2 caller-supplied key for safe retry replay; minted per-invocation when omitted."
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cli::tests_common::{make_bundle_deployment, make_env};
use tempfile::tempdir;
#[test]
fn transition_schema_lists_idempotency_key() {
let schema = transition_schema();
assert!(
schema.pointer("/properties/idempotency_key").is_some(),
"transition_schema must list `idempotency_key` so --schema-driven \
callers can supply the A8 retry key (schema: {schema:#})"
);
}
fn seed_env_with_deployment(store: &LocalFsStore) -> DeploymentId {
let mut env = make_env("local");
let deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
env.bundles.push(deployment);
store.save(&env).unwrap();
did
}
fn stage_payload(deployment_id: &DeploymentId) -> RevisionStagePayload {
RevisionStagePayload {
environment_id: "local".to_string(),
deployment_id: deployment_id.to_string(),
bundle_path: None,
bundle_digest: "sha256:00".to_string(),
pack_list: vec![PackListEntryPayload {
pack_id: "greentic.test.pack".to_string(),
version: "1.0.0".to_string(),
digest: "sha256:00".to_string(),
source_uri: None,
}],
pack_list_lock_ref: PathBuf::new(),
config_digest: default_config_digest(),
signature_sidecar_ref: default_signature_sidecar_ref(),
drain_seconds: default_drain_seconds(),
}
}
#[test]
fn stage_creates_revision_in_staged() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let outcome = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
assert_eq!(
outcome.result.get("lifecycle").and_then(|v| v.as_str()),
Some("staged")
);
assert_eq!(
outcome.result.get("sequence").and_then(|v| v.as_u64()),
Some(1)
);
}
#[test]
fn stage_with_bundle_on_non_local_env_rejects_before_writing_files() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("prod");
let deployment = make_bundle_deployment("prod", "fast2flow");
let did = deployment.deployment_id;
env.bundles.push(deployment);
store.save(&env).unwrap();
let fixture = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("testdata/bundles/perf-smoke-bundle.gtbundle");
let mut payload = stage_payload(&did);
payload.environment_id = "prod".to_string();
payload.bundle_path = Some(fixture);
let err = stage(&store, &OpFlags::default(), Some(payload)).unwrap_err();
assert!(
matches!(err, OpError::Unauthorized { .. }),
"non-local env stage must be denied, got: {err:?}"
);
let rev_root = dir.path().join("prod").join("revisions");
assert!(
!rev_root.exists()
|| std::fs::read_dir(&rev_root)
.map(|d| d.count() == 0)
.unwrap_or(true),
"denied stage must not write under `{}`",
rev_root.display()
);
}
#[test]
fn stage_with_local_bundle_pins_packs_into_lockfile() {
use greentic_deploy_spec::PackListLock;
use sha2::{Digest, Sha256};
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let fixture = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("testdata/bundles/perf-smoke-bundle.gtbundle");
let mut payload = stage_payload(&did);
payload.bundle_path = Some(fixture);
payload.pack_list = vec![PackListEntryPayload {
pack_id: "should.be.ignored".to_string(),
version: "9.9.9".to_string(),
digest: "sha256:ff".to_string(),
source_uri: None,
}];
let outcome = stage(&store, &OpFlags::default(), Some(payload)).unwrap();
assert_eq!(
outcome.result.get("lifecycle").and_then(|v| v.as_str()),
Some("staged")
);
let rid = outcome
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let env_id = EnvId::try_from("local").unwrap();
let env = store.load(&env_id).unwrap();
let revision = env
.revisions
.iter()
.find(|r| r.revision_id.to_string() == rid)
.expect("revision persisted");
assert!(
revision.bundle_digest.starts_with("sha256:") && revision.bundle_digest != "sha256:00",
"bundle_digest should be the real archive hash, got {}",
revision.bundle_digest
);
assert!(
!revision.pack_list.is_empty(),
"pack_list should be populated from the lock"
);
let env_dir = store.env_dir(&env_id).unwrap();
let lock_path = env_dir.join(&revision.pack_list_lock_ref);
assert!(lock_path.is_file(), "pack-list.lock must be a regular file");
let lock: PackListLock =
serde_json::from_slice(&std::fs::read(&lock_path).unwrap()).unwrap();
assert_eq!(lock.revision_id, revision.revision_id);
assert!(!lock.packs.is_empty(), "fixture bundle has a .gtpack");
for pack in &lock.packs {
assert!(pack.path.is_relative(), "lock path must be env-relative");
let pack_path = env_dir.join(&pack.path);
assert!(
pack_path.is_file(),
"extracted .gtpack must exist: {}",
pack_path.display()
);
let bytes = std::fs::read(&pack_path).unwrap();
let expected = format!("sha256:{}", hex::encode(Sha256::digest(&bytes)));
assert_eq!(pack.digest, expected, "lock digest must match the file");
}
}
#[test]
fn stage_args_without_bundle_is_rejected() {
let did = DeploymentId::new();
let args = crate::cli::dispatch::RevisionStageArgs {
env_id: Some("local".to_string()),
deployment: Some(did.to_string()),
bundle: None,
};
let err = payload_from_stage_args(args).unwrap_err();
let msg = format!("{err}");
assert!(
matches!(err, OpError::InvalidArgument(_)) && msg.contains("--bundle"),
"expected a missing --bundle error, got: {msg}"
);
}
#[test]
fn stage_args_empty_defers_to_answers() {
let args = crate::cli::dispatch::RevisionStageArgs {
env_id: None,
deployment: None,
bundle: None,
};
assert!(payload_from_stage_args(args).unwrap().is_none());
}
#[test]
fn stage_bundle_digest_is_bound_to_staged_copy_not_input() {
use sha2::{Digest, Sha256};
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let fixture = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("testdata/bundles/perf-smoke-bundle.gtbundle");
let input = dir.path().join("input.gtbundle");
std::fs::copy(&fixture, &input).unwrap();
let mut payload = stage_payload(&did);
payload.bundle_path = Some(input.clone());
let outcome = stage(&store, &OpFlags::default(), Some(payload)).unwrap();
let rid = outcome
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let env_id = EnvId::try_from("local").unwrap();
let env = store.load(&env_id).unwrap();
let revision = env
.revisions
.iter()
.find(|r| r.revision_id.to_string() == rid)
.unwrap();
let env_dir = store.env_dir(&env_id).unwrap();
let staged = env_dir.join("revisions").join(&rid).join("bundle.gtbundle");
assert!(staged.is_file(), "staged bundle copy must persist");
let staged_digest = format!(
"sha256:{}",
hex::encode(Sha256::digest(std::fs::read(&staged).unwrap()))
);
assert_eq!(revision.bundle_digest, staged_digest);
std::fs::write(&input, b"tampered-after-stage").unwrap();
let staged_digest_after = format!(
"sha256:{}",
hex::encode(Sha256::digest(std::fs::read(&staged).unwrap()))
);
assert_eq!(
revision.bundle_digest, staged_digest_after,
"input mutation must not change the staged artifact's digest"
);
}
#[test]
fn warm_advances_to_ready() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let warmed = warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
idempotency_key: None,
}),
)
.unwrap();
assert_eq!(
warmed.result.get("lifecycle").and_then(|v| v.as_str()),
Some("ready")
);
}
#[test]
fn drain_after_warm_succeeds() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.clone(),
idempotency_key: None,
}),
)
.unwrap();
let drained = drain(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
idempotency_key: None,
}),
)
.unwrap();
assert_eq!(
drained.result.get("lifecycle").and_then(|v| v.as_str()),
Some("draining")
);
}
#[test]
fn drain_from_staged_errors() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let err = drain(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
idempotency_key: None,
}),
)
.unwrap_err();
assert!(matches!(err, OpError::Conflict(_)), "got {err:?}");
}
#[test]
fn archive_prunes_current_revisions_when_no_live_traffic() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("local");
let mut deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
let revision = crate::cli::tests_common::make_revision(
"local",
"fast2flow",
&did,
1,
RevisionLifecycle::Ready,
);
let rid = revision.revision_id;
deployment.current_revisions.push(rid);
env.bundles.push(deployment);
env.revisions.push(revision);
store.save(&env).unwrap();
let outcome = archive(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.to_string(),
idempotency_key: None,
}),
)
.unwrap();
assert_eq!(
outcome.result.get("lifecycle").and_then(|v| v.as_str()),
Some("archived")
);
let env = store.load(&EnvId::try_from("local").unwrap()).unwrap();
assert!(
env.bundles[0].current_revisions.is_empty(),
"current_revisions should be pruned"
);
}
#[test]
fn archive_refuses_when_revision_is_in_live_traffic_split() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("local");
let mut deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
let revision = crate::cli::tests_common::make_revision(
"local",
"fast2flow",
&did,
1,
RevisionLifecycle::Ready,
);
let rid = revision.revision_id;
deployment.current_revisions.push(rid);
let split = crate::cli::tests_common::make_traffic_split(
"local",
"fast2flow",
&did,
&rid,
"test-key",
);
env.bundles.push(deployment);
env.revisions.push(revision);
env.traffic_splits.push(split);
store.save(&env).unwrap();
let err = archive(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.to_string(),
idempotency_key: None,
}),
)
.unwrap_err();
match err {
OpError::Conflict(msg) => {
assert!(
msg.contains("live traffic split")
&& msg.contains("rebalance via `gtc op traffic set`"),
"expected actionable conflict message, got: {msg}"
);
}
other => panic!("expected Conflict, got `{other:?}`"),
}
let env = store.load(&EnvId::try_from("local").unwrap()).unwrap();
assert_eq!(env.revisions[0].lifecycle, RevisionLifecycle::Ready);
assert_eq!(env.traffic_splits.len(), 1);
assert!(env.bundles[0].current_revisions.contains(&rid));
}
#[test]
fn archive_completes_a_drained_revision_through_inactive() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("local");
let deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
let revision = crate::cli::tests_common::make_revision(
"local",
"fast2flow",
&did,
1,
RevisionLifecycle::Inactive,
);
let rid = revision.revision_id;
env.bundles.push(deployment);
env.revisions.push(revision);
store.save(&env).unwrap();
let outcome = archive(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.to_string(),
idempotency_key: None,
}),
)
.unwrap();
assert_eq!(
outcome.result.get("lifecycle").and_then(|v| v.as_str()),
Some("archived")
);
}
#[test]
fn list_reflects_stage_calls() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let listed = list(&store, &OpFlags::default(), "local").unwrap();
let revs = listed
.result
.get("revisions")
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(revs.len(), 2);
let seqs: Vec<u64> = revs
.iter()
.filter_map(|r| r.get("sequence").and_then(|v| v.as_u64()))
.collect();
assert_eq!(seqs, vec![1, 2]);
}
#[test]
fn warm_with_passing_gate_lands_ready() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let warmed = warm_with_health_gate(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
idempotency_key: None,
}),
|_env, _revision| Ok(()),
)
.unwrap();
assert_eq!(
warmed.result.get("lifecycle").and_then(|v| v.as_str()),
Some("ready")
);
}
#[test]
fn warm_with_failing_gate_persists_failed_and_returns_conflict() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid_str = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let err = warm_with_health_gate(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid_str.clone(),
idempotency_key: None,
}),
|_env, _revision| {
Err(crate::environment::HealthGateFailure {
failed_checks: vec![crate::environment::HealthCheckId::RuntimeConfig],
message: "runtime-config.json missing".to_string(),
})
},
)
.unwrap_err();
assert!(matches!(err, OpError::Conflict(_)), "got {err:?}");
let msg = format!("{err}");
assert!(msg.contains("warm/ready health gate"), "msg: {msg}");
assert!(msg.contains("RuntimeConfig"), "msg: {msg}");
let env_id = EnvId::try_from("local").unwrap();
let env = store.load(&env_id).unwrap();
assert_eq!(env.revisions.len(), 1);
assert_eq!(env.revisions[0].lifecycle, RevisionLifecycle::Failed);
}
#[test]
fn warm_failing_gate_with_audit_failure_returns_audit_error() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid_str = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let env_id = EnvId::try_from("local").unwrap();
let env_dir = store.env_dir(&env_id).unwrap();
let events_path = env_dir.join("audit").join("events.jsonl");
let _ = std::fs::remove_file(&events_path);
std::fs::create_dir(&events_path).unwrap();
let err = warm_with_health_gate(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid_str.clone(),
idempotency_key: None,
}),
|_env, _revision| {
Err(crate::environment::HealthGateFailure {
failed_checks: vec![crate::environment::HealthCheckId::RuntimeConfig],
message: "runtime-config.json missing".to_string(),
})
},
)
.unwrap_err();
match &err {
OpError::Audit(_) => {}
other => panic!("expected OpError::Audit (fail-closed); got `{other:?}`"),
}
let env = store.load(&env_id).unwrap();
assert_eq!(env.revisions[0].lifecycle, RevisionLifecycle::Failed);
}
#[test]
fn warm_uncommitted_error_with_audit_failure_returns_original_error() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let _did = seed_env_with_deployment(&store);
let env_id = EnvId::try_from("local").unwrap();
let env_dir = store.env_dir(&env_id).unwrap();
std::fs::write(env_dir.join("audit"), b"audit-blocker").unwrap();
let phantom_rid = ulid::Ulid::new().to_string();
let err = warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: phantom_rid,
idempotency_key: None,
}),
)
.unwrap_err();
match &err {
OpError::NotFound(_) => {}
other => panic!("expected OpError::NotFound (audit demoted); got `{other:?}`"),
}
}
#[test]
fn warm_ok_with_refresh_failure_and_audit_failure_returns_audit_error() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid_str = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let env_id = EnvId::try_from("local").unwrap();
let env_dir = store.env_dir(&env_id).unwrap();
std::fs::create_dir(env_dir.join("runtime-config.json")).unwrap();
let events_path = env_dir.join("audit").join("events.jsonl");
let _ = std::fs::remove_file(&events_path);
std::fs::create_dir(&events_path).unwrap();
let err = warm_with_health_gate(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid_str,
idempotency_key: None,
}),
|_env, _revision| Ok(()),
)
.unwrap_err();
match &err {
OpError::Audit(_) => {}
other => panic!("expected OpError::Audit (fail-closed); got `{other:?}`"),
}
let env = store.load(&env_id).unwrap();
assert_eq!(env.revisions[0].lifecycle, RevisionLifecycle::Ready);
}
#[test]
fn drain_ok_with_refresh_failure_and_audit_failure_returns_audit_error() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid_str = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let env_id = EnvId::try_from("local").unwrap();
let mut env = store.load(&env_id).unwrap();
env.revisions[0].lifecycle = RevisionLifecycle::Ready;
store.save(&env).unwrap();
let env_dir = store.env_dir(&env_id).unwrap();
let _ = std::fs::remove_file(env_dir.join("runtime-config.json"));
std::fs::create_dir(env_dir.join("runtime-config.json")).unwrap();
let events_path = env_dir.join("audit").join("events.jsonl");
let _ = std::fs::remove_file(&events_path);
std::fs::create_dir(&events_path).unwrap();
let err = drain(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid_str,
idempotency_key: None,
}),
)
.unwrap_err();
match &err {
OpError::Audit(_) => {}
other => panic!("expected OpError::Audit (fail-closed); got `{other:?}`"),
}
let env = store.load(&env_id).unwrap();
assert_eq!(env.revisions[0].lifecycle, RevisionLifecycle::Draining);
}
use crate::rollout_telemetry::test_capture::capture_events;
use std::collections::BTreeSet;
fn observed(events: &[String]) -> BTreeSet<String> {
events.iter().cloned().collect()
}
#[test]
fn warm_emits_health_gate_passed_and_revision_warmed() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let (result, events) = capture_events(|| {
warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
idempotency_key: None,
}),
)
});
result.unwrap();
let observed = observed(&events);
assert!(
observed.contains("rollout.health_gate.passed"),
"observed events: {observed:?}"
);
assert!(
observed.contains("rollout.revision.warmed"),
"observed events: {observed:?}"
);
assert!(!observed.contains("rollout.health_gate.failed"));
}
#[test]
fn warm_with_failing_gate_emits_health_gate_failed() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let (result, events) = capture_events(|| {
warm_with_health_gate(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
idempotency_key: None,
}),
|_env, _revision| {
Err(crate::environment::HealthGateFailure {
failed_checks: vec![crate::environment::HealthCheckId::RuntimeConfig],
message: "synthetic gate failure".to_string(),
})
},
)
});
result.unwrap_err();
let observed = observed(&events);
assert!(
observed.contains("rollout.health_gate.failed"),
"observed events: {observed:?}"
);
assert!(!observed.contains("rollout.health_gate.passed"));
assert!(!observed.contains("rollout.revision.warmed"));
}
#[test]
fn drain_emits_revision_draining() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.clone(),
idempotency_key: None,
}),
)
.unwrap();
let (result, events) = capture_events(|| {
drain(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
idempotency_key: None,
}),
)
});
result.unwrap();
let observed = observed(&events);
assert!(
observed.contains("rollout.revision.draining"),
"observed events: {observed:?}"
);
}
#[test]
fn archive_emits_revision_evicted_on_draining_to_inactive() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.clone(),
idempotency_key: None,
}),
)
.unwrap();
drain(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.clone(),
idempotency_key: None,
}),
)
.unwrap();
let (result, events) = capture_events(|| {
archive(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
idempotency_key: None,
}),
)
});
result.unwrap();
let observed = observed(&events);
assert!(
observed.contains("rollout.revision.evicted"),
"observed events: {observed:?}"
);
}
}