use greentic_deploy_spec::{BundleId, DeploymentId, Environment, Revision, RevisionId};
use greentic_telemetry::{RolloutEvent, TelemetryCtx, emit_rollout_event};
const LOCAL_TENANT_FALLBACK: &str = "local";
pub(crate) fn build_drain_ctx(
tenant: &str,
env_id: &str,
deployment_id: DeploymentId,
bundle_id: &BundleId,
revision_id: RevisionId,
) -> TelemetryCtx {
TelemetryCtx::new(tenant)
.with_env(env_id)
.with_deployment_id(deployment_id.to_string())
.with_bundle_id(bundle_id.to_string())
.with_revision_id(revision_id.to_string())
}
pub(crate) fn build_health_gate_ctx(env: &Environment, revision: &Revision) -> TelemetryCtx {
let tenant = env
.host_config
.tenant_org_id
.as_deref()
.unwrap_or(LOCAL_TENANT_FALLBACK);
TelemetryCtx::new(tenant)
.with_env(env.environment_id.as_str())
.with_deployment_id(revision.deployment_id.to_string())
.with_bundle_id(revision.bundle_id.to_string())
.with_revision_id(revision.revision_id.to_string())
}
pub(crate) fn emit_drain_transition(
event: RolloutEvent,
tenant: &str,
env_id: &str,
deployment_id: DeploymentId,
bundle_id: &BundleId,
revision_id: RevisionId,
) {
let ctx = build_drain_ctx(tenant, env_id, deployment_id, bundle_id, revision_id);
emit_rollout_event(event, &ctx);
}
pub(crate) fn emit_health_gate_transition(
event: RolloutEvent,
env: &Environment,
revision: &Revision,
) {
let ctx = build_health_gate_ctx(env, revision);
emit_rollout_event(event, &ctx);
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use greentic_deploy_spec::{
BundleId, DeploymentId, EnvId, Environment, EnvironmentHostConfig, PackId, PackListEntry,
Revision, RevisionId, RevisionLifecycle, SchemaVersion, SemVer,
};
use std::path::PathBuf;
fn env_with_owner(owner: Option<&str>) -> Environment {
Environment {
schema: SchemaVersion::new(SchemaVersion::ENVIRONMENT_V1),
environment_id: EnvId::try_from("prod-eu").unwrap(),
name: "prod-eu".into(),
host_config: EnvironmentHostConfig {
env_id: EnvId::try_from("prod-eu").unwrap(),
region: None,
tenant_org_id: owner.map(str::to_string),
listen_addr: None,
public_base_url: None,
},
packs: Vec::new(),
messaging_endpoints: Vec::new(),
extensions: Vec::new(),
credentials_ref: None,
bundles: Vec::new(),
revisions: Vec::new(),
traffic_splits: Vec::new(),
revocation: Default::default(),
retention: Default::default(),
health: Default::default(),
}
}
fn sample_revision() -> Revision {
Revision {
schema: SchemaVersion::new(SchemaVersion::REVISION_V1),
revision_id: RevisionId::new(),
env_id: EnvId::try_from("prod-eu").unwrap(),
bundle_id: BundleId::new("customer.support"),
deployment_id: DeploymentId::new(),
sequence: 1,
created_at: Utc.timestamp_opt(0, 0).unwrap(),
bundle_digest: "sha256:00".into(),
pack_list: vec![PackListEntry {
pack_id: PackId::new("greentic.support.pack"),
version: SemVer::new(1, 0, 0),
digest: "sha256:00".into(),
source_uri: None,
}],
pack_list_lock_ref: PathBuf::from("pack-list.lock"),
config_digest: "sha256:00".into(),
signature_sidecar_ref: PathBuf::from("rev.sig"),
lifecycle: RevisionLifecycle::Warming,
staged_at: None,
warmed_at: None,
drain_seconds: 30,
abort_metrics: Vec::new(),
}
}
fn get<'a>(kv: &'a [(&'static str, Option<&str>)], key: &str) -> Option<&'a str> {
kv.iter().find(|(k, _)| *k == key).and_then(|(_, v)| *v)
}
#[test]
fn drain_ctx_populates_tenant_env_deployment_bundle_revision() {
let deployment_id = DeploymentId::new();
let bundle_id = BundleId::new("customer.support");
let revision_id = RevisionId::new();
let ctx = build_drain_ctx("acme", "prod-eu", deployment_id, &bundle_id, revision_id);
let kv = ctx.kv();
assert_eq!(get(&kv, "gt.tenant"), Some("acme"));
assert_eq!(get(&kv, "gt.env"), Some("prod-eu"));
assert_eq!(
get(&kv, "gt.deployment_id"),
Some(deployment_id.to_string().as_str())
);
assert_eq!(get(&kv, "gt.bundle_id"), Some("customer.support"));
assert_eq!(
get(&kv, "gt.revision_id"),
Some(revision_id.to_string().as_str())
);
assert!(get(&kv, "gt.pack_id").is_none());
assert!(get(&kv, "gt.env_pack_kind").is_none());
assert!(get(&kv, "gt.generation").is_none());
}
#[test]
fn health_gate_ctx_uses_env_tenant_org_id_when_set() {
let env = env_with_owner(Some("acme"));
let rev = sample_revision();
let ctx = build_health_gate_ctx(&env, &rev);
let kv = ctx.kv();
assert_eq!(get(&kv, "gt.tenant"), Some("acme"));
assert_eq!(get(&kv, "gt.env"), Some("prod-eu"));
assert_eq!(get(&kv, "gt.bundle_id"), Some("customer.support"));
assert_eq!(
get(&kv, "gt.deployment_id"),
Some(rev.deployment_id.to_string().as_str())
);
assert_eq!(
get(&kv, "gt.revision_id"),
Some(rev.revision_id.to_string().as_str())
);
}
#[test]
fn health_gate_ctx_falls_back_to_local_tenant_when_unowned() {
let env = env_with_owner(None);
let rev = sample_revision();
let ctx = build_health_gate_ctx(&env, &rev);
assert_eq!(get(&ctx.kv(), "gt.tenant"), Some(LOCAL_TENANT_FALLBACK));
}
#[test]
fn emit_helpers_do_not_panic_without_subscriber() {
let deployment_id = DeploymentId::new();
let bundle_id = BundleId::new("customer.support");
let revision_id = RevisionId::new();
emit_drain_transition(
RolloutEvent::RevisionDraining,
"acme",
"prod-eu",
deployment_id,
&bundle_id,
revision_id,
);
emit_drain_transition(
RolloutEvent::RevisionEvicted,
"acme",
"prod-eu",
deployment_id,
&bundle_id,
revision_id,
);
let env = env_with_owner(Some("acme"));
let rev = sample_revision();
emit_health_gate_transition(RolloutEvent::HealthGatePassed, &env, &rev);
emit_health_gate_transition(RolloutEvent::HealthGateFailed, &env, &rev);
}
}