use std::path::PathBuf;
use chrono::Utc;
use greentic_deploy_spec::{
BundleId, DeploymentId, EnvId, PackId, PackListEntry, Revision, RevisionId, RevisionLifecycle,
SchemaVersion, SemVer, is_valid_transition,
};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use crate::environment::{EnvironmentStore, LocalFsStore};
use super::{AuditCtx, OpError, OpFlags, OpOutcome, audit_and_record};
const NOUN: &str = "revisions";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevisionStagePayload {
pub environment_id: String,
pub deployment_id: String,
pub bundle_digest: String,
#[serde(default)]
pub pack_list: Vec<PackListEntryPayload>,
#[serde(default = "default_pack_list_lock_ref")]
pub pack_list_lock_ref: PathBuf,
#[serde(default = "default_config_digest")]
pub config_digest: String,
#[serde(default = "default_signature_sidecar_ref")]
pub signature_sidecar_ref: PathBuf,
#[serde(default = "default_drain_seconds")]
pub drain_seconds: u32,
}
fn default_pack_list_lock_ref() -> PathBuf {
PathBuf::from("pack-list.lock")
}
fn default_config_digest() -> String {
"sha256:00".to_string()
}
fn default_signature_sidecar_ref() -> PathBuf {
PathBuf::from("rev.sig")
}
fn default_drain_seconds() -> u32 {
30
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PackListEntryPayload {
pub pack_id: String,
pub version: String,
pub digest: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_uri: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevisionTransitionPayload {
pub environment_id: String,
pub revision_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevisionSummary {
pub revision_id: String,
pub deployment_id: String,
pub bundle_id: String,
pub sequence: u64,
pub lifecycle: RevisionLifecycle,
}
impl From<&Revision> for RevisionSummary {
fn from(r: &Revision) -> Self {
Self {
revision_id: r.revision_id.to_string(),
deployment_id: r.deployment_id.to_string(),
bundle_id: r.bundle_id.as_str().to_string(),
sequence: r.sequence,
lifecycle: r.lifecycle,
}
}
}
pub fn stage(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionStagePayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "stage", stage_schema()));
}
let payload = resolve_payload::<RevisionStagePayload>(flags, payload)?;
let env_id = parse_env_id(&payload.environment_id)?;
let deployment_id = parse_deployment_id(&payload.deployment_id)?;
let pack_list = payload
.pack_list
.into_iter()
.map(|e| {
Ok::<_, OpError>(PackListEntry {
pack_id: PackId::new(e.pack_id),
version: e
.version
.parse::<SemVer>()
.map_err(|err| OpError::InvalidArgument(format!("pack version: {err}")))?,
digest: e.digest,
source_uri: e.source_uri,
})
})
.collect::<Result<Vec<_>, _>>()?;
if !is_valid_transition(RevisionLifecycle::Inactive, RevisionLifecycle::Staged) {
return Err(OpError::Conflict(
"spec rejects inactive → staged".to_string(),
));
}
let ctx = AuditCtx {
env_id: env_id.clone(),
noun: NOUN,
verb: "stage",
target: json!({
"deployment_id": deployment_id.to_string(),
"lifecycle_to": "staged",
}),
idempotency_key: None,
};
audit_and_record(store, ctx, || {
let summary = store.transact(&env_id, |locked| -> Result<RevisionSummary, OpError> {
let mut env = locked.load()?;
let deployment = env
.bundles
.iter()
.find(|b| b.deployment_id == deployment_id)
.ok_or_else(|| {
OpError::NotFound(format!(
"deployment `{deployment_id}` not found in env `{env_id}`"
))
})?
.clone();
let bundle_id = deployment.bundle_id.clone();
let next_sequence = env
.revisions
.iter()
.filter(|r| r.deployment_id == deployment_id)
.map(|r| r.sequence)
.max()
.unwrap_or(0)
+ 1;
let now = Utc::now();
let staged = Revision {
schema: SchemaVersion::new(SchemaVersion::REVISION_V1),
revision_id: crate::environment::mint_revision_id(),
env_id: env_id.clone(),
bundle_id,
deployment_id,
sequence: next_sequence,
created_at: now,
bundle_digest: payload.bundle_digest.clone(),
pack_list: pack_list.clone(),
pack_list_lock_ref: payload.pack_list_lock_ref.clone(),
config_digest: payload.config_digest.clone(),
signature_sidecar_ref: payload.signature_sidecar_ref.clone(),
lifecycle: RevisionLifecycle::Staged,
staged_at: Some(now),
warmed_at: None,
drain_seconds: payload.drain_seconds,
abort_metrics: Vec::new(),
};
let revision_id = staged.revision_id;
env.revisions.push(staged);
locked.save(&env)?;
Ok(RevisionSummary::from(
env.revisions
.iter()
.find(|r| r.revision_id == revision_id)
.expect("just pushed"),
))
})?;
let outcome = OpOutcome::new(
NOUN,
"stage",
serde_json::to_value(summary).expect("RevisionSummary is json-safe"),
);
Ok((outcome, super::AuditGens::NONE))
})
}
pub fn warm(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "warm", transition_schema()));
}
transition(
store,
flags,
payload,
"warm",
&[
(RevisionLifecycle::Staged, RevisionLifecycle::Warming),
(RevisionLifecycle::Warming, RevisionLifecycle::Ready),
],
|r| {
r.warmed_at = Some(Utc::now());
},
false,
)
}
pub fn drain(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "drain", transition_schema()));
}
transition(
store,
flags,
payload,
"drain",
&[(RevisionLifecycle::Ready, RevisionLifecycle::Draining)],
|_| {},
false,
)
}
pub fn archive(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(NOUN, "archive", transition_schema()));
}
transition(
store,
flags,
payload,
"archive",
&[
(RevisionLifecycle::Staged, RevisionLifecycle::Archived),
(RevisionLifecycle::Warming, RevisionLifecycle::Archived),
(RevisionLifecycle::Ready, RevisionLifecycle::Archived),
(RevisionLifecycle::Failed, RevisionLifecycle::Archived),
(RevisionLifecycle::Draining, RevisionLifecycle::Inactive),
(RevisionLifecycle::Inactive, RevisionLifecycle::Archived),
],
|_| {},
true,
)
}
pub fn list(store: &LocalFsStore, flags: &OpFlags, env_id: &str) -> Result<OpOutcome, OpError> {
if flags.schema_only {
return Ok(OpOutcome::new(
NOUN,
"list",
json!({"input_schema": "env_id positional"}),
));
}
let env_id = parse_env_id(env_id)?;
if !store.exists(&env_id)? {
return Err(OpError::NotFound(format!("environment `{env_id}`")));
}
let env = store.load(&env_id)?;
let revisions: Vec<RevisionSummary> = env.revisions.iter().map(RevisionSummary::from).collect();
Ok(OpOutcome::new(
NOUN,
"list",
json!({"environment_id": env_id.as_str(), "revisions": revisions}),
))
}
fn transition<F: FnOnce(&mut Revision)>(
store: &LocalFsStore,
flags: &OpFlags,
payload: Option<RevisionTransitionPayload>,
op: &'static str,
accepted_chain: &[(RevisionLifecycle, RevisionLifecycle)],
on_final: F,
prune_from_splits: bool,
) -> Result<OpOutcome, OpError> {
let payload = resolve_payload::<RevisionTransitionPayload>(flags, payload)?;
let env_id = parse_env_id(&payload.environment_id)?;
let revision_id = parse_revision_id(&payload.revision_id)?;
let lifecycle_to = accepted_chain.last().map(|(_, to)| *to);
let ctx = AuditCtx {
env_id: env_id.clone(),
noun: NOUN,
verb: op,
target: json!({
"revision_id": revision_id.to_string(),
"lifecycle_to": lifecycle_to,
}),
idempotency_key: None,
};
audit_and_record(store, ctx, || {
let revision = store.transact(&env_id, |locked| -> Result<Revision, OpError> {
crate::environment::apply_revision_transition(
locked,
revision_id,
accepted_chain,
on_final,
prune_from_splits,
)
.map_err(OpError::from)
})?;
let summary = RevisionSummary::from(&revision);
let outcome = OpOutcome::new(
NOUN,
op,
serde_json::to_value(summary).expect("RevisionSummary is json-safe"),
);
Ok((outcome, super::AuditGens::NONE))
})
}
fn resolve_payload<T: serde::de::DeserializeOwned>(
flags: &OpFlags,
payload: Option<T>,
) -> Result<T, OpError> {
if let Some(p) = payload {
return Ok(p);
}
if let Some(path) = &flags.answers {
return super::load_answers::<T>(path);
}
Err(OpError::InvalidArgument(
"no payload provided: pass --answers <path> or supply the payload directly".to_string(),
))
}
fn parse_env_id(raw: &str) -> Result<EnvId, OpError> {
EnvId::try_from(raw).map_err(|e| OpError::InvalidArgument(format!("environment_id: {e}")))
}
fn parse_deployment_id(raw: &str) -> Result<DeploymentId, OpError> {
use std::str::FromStr;
let ulid = ulid::Ulid::from_str(raw)
.map_err(|e| OpError::InvalidArgument(format!("deployment_id: {e}")))?;
Ok(DeploymentId(ulid))
}
fn parse_revision_id(raw: &str) -> Result<RevisionId, OpError> {
use std::str::FromStr;
let ulid = ulid::Ulid::from_str(raw)
.map_err(|e| OpError::InvalidArgument(format!("revision_id: {e}")))?;
Ok(RevisionId(ulid))
}
#[allow(dead_code)]
fn discard_bundle(_id: &BundleId) {
}
fn stage_schema() -> Value {
json!({
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "RevisionStagePayload",
"type": "object",
"required": ["environment_id", "deployment_id", "bundle_digest"],
"additionalProperties": false,
"properties": {
"environment_id": {"type": "string"},
"deployment_id": {"type": "string", "description": "ULID"},
"bundle_digest": {"type": "string"},
"pack_list": {"type": "array"},
"pack_list_lock_ref": {"type": "string"},
"config_digest": {"type": "string"},
"signature_sidecar_ref": {"type": "string"},
"drain_seconds": {"type": "integer", "minimum": 0}
}
})
}
fn transition_schema() -> Value {
json!({
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "RevisionTransitionPayload",
"type": "object",
"required": ["environment_id", "revision_id"],
"additionalProperties": false,
"properties": {
"environment_id": {"type": "string"},
"revision_id": {"type": "string", "description": "ULID"}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cli::tests_common::{make_bundle_deployment, make_env};
use tempfile::tempdir;
fn seed_env_with_deployment(store: &LocalFsStore) -> DeploymentId {
let mut env = make_env("local");
let deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
env.bundles.push(deployment);
store.save(&env).unwrap();
did
}
fn stage_payload(deployment_id: &DeploymentId) -> RevisionStagePayload {
RevisionStagePayload {
environment_id: "local".to_string(),
deployment_id: deployment_id.to_string(),
bundle_digest: "sha256:00".to_string(),
pack_list: vec![PackListEntryPayload {
pack_id: "greentic.test.pack".to_string(),
version: "1.0.0".to_string(),
digest: "sha256:00".to_string(),
source_uri: None,
}],
pack_list_lock_ref: default_pack_list_lock_ref(),
config_digest: default_config_digest(),
signature_sidecar_ref: default_signature_sidecar_ref(),
drain_seconds: default_drain_seconds(),
}
}
#[test]
fn stage_creates_revision_in_staged() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let outcome = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
assert_eq!(
outcome.result.get("lifecycle").and_then(|v| v.as_str()),
Some("staged")
);
assert_eq!(
outcome.result.get("sequence").and_then(|v| v.as_u64()),
Some(1)
);
}
#[test]
fn warm_advances_to_ready() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let warmed = warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
}),
)
.unwrap();
assert_eq!(
warmed.result.get("lifecycle").and_then(|v| v.as_str()),
Some("ready")
);
}
#[test]
fn drain_after_warm_succeeds() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
warm(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.clone(),
}),
)
.unwrap();
let drained = drain(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
}),
)
.unwrap();
assert_eq!(
drained.result.get("lifecycle").and_then(|v| v.as_str()),
Some("draining")
);
}
#[test]
fn drain_from_staged_errors() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
let staged = stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let rid = staged
.result
.get("revision_id")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
let err = drain(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid,
}),
)
.unwrap_err();
assert!(matches!(err, OpError::Conflict(_)), "got {err:?}");
}
#[test]
fn archive_prunes_current_revisions_when_no_live_traffic() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("local");
let mut deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
let revision = crate::cli::tests_common::make_revision(
"local",
"fast2flow",
&did,
1,
RevisionLifecycle::Ready,
);
let rid = revision.revision_id;
deployment.current_revisions.push(rid);
env.bundles.push(deployment);
env.revisions.push(revision);
store.save(&env).unwrap();
let outcome = archive(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.to_string(),
}),
)
.unwrap();
assert_eq!(
outcome.result.get("lifecycle").and_then(|v| v.as_str()),
Some("archived")
);
let env = store.load(&EnvId::try_from("local").unwrap()).unwrap();
assert!(
env.bundles[0].current_revisions.is_empty(),
"current_revisions should be pruned"
);
}
#[test]
fn archive_refuses_when_revision_is_in_live_traffic_split() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("local");
let mut deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
let revision = crate::cli::tests_common::make_revision(
"local",
"fast2flow",
&did,
1,
RevisionLifecycle::Ready,
);
let rid = revision.revision_id;
deployment.current_revisions.push(rid);
let split = crate::cli::tests_common::make_traffic_split(
"local",
"fast2flow",
&did,
&rid,
"test-key",
);
env.bundles.push(deployment);
env.revisions.push(revision);
env.traffic_splits.push(split);
store.save(&env).unwrap();
let err = archive(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.to_string(),
}),
)
.unwrap_err();
match err {
OpError::Conflict(msg) => {
assert!(
msg.contains("live traffic split")
&& msg.contains("rebalance via `gtc op traffic set`"),
"expected actionable conflict message, got: {msg}"
);
}
other => panic!("expected Conflict, got `{other:?}`"),
}
let env = store.load(&EnvId::try_from("local").unwrap()).unwrap();
assert_eq!(env.revisions[0].lifecycle, RevisionLifecycle::Ready);
assert_eq!(env.traffic_splits.len(), 1);
assert!(env.bundles[0].current_revisions.contains(&rid));
}
#[test]
fn archive_completes_a_drained_revision_through_inactive() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let mut env = make_env("local");
let deployment = make_bundle_deployment("local", "fast2flow");
let did = deployment.deployment_id;
let revision = crate::cli::tests_common::make_revision(
"local",
"fast2flow",
&did,
1,
RevisionLifecycle::Inactive,
);
let rid = revision.revision_id;
env.bundles.push(deployment);
env.revisions.push(revision);
store.save(&env).unwrap();
let outcome = archive(
&store,
&OpFlags::default(),
Some(RevisionTransitionPayload {
environment_id: "local".to_string(),
revision_id: rid.to_string(),
}),
)
.unwrap();
assert_eq!(
outcome.result.get("lifecycle").and_then(|v| v.as_str()),
Some("archived")
);
}
#[test]
fn list_reflects_stage_calls() {
let dir = tempdir().unwrap();
let store = LocalFsStore::new(dir.path());
let did = seed_env_with_deployment(&store);
stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
stage(&store, &OpFlags::default(), Some(stage_payload(&did))).unwrap();
let listed = list(&store, &OpFlags::default(), "local").unwrap();
let revs = listed
.result
.get("revisions")
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(revs.len(), 2);
let seqs: Vec<u64> = revs
.iter()
.filter_map(|r| r.get("sequence").and_then(|v| v.as_u64()))
.collect();
assert_eq!(seqs, vec![1, 2]);
}
}