use greentic_deploy_spec::{BundleId, DeploymentId, Environment, Revision};
use greentic_telemetry::{RolloutEvent, TelemetryCtx, emit_rollout_event};
const LOCAL_TENANT_FALLBACK: &str = "local";
pub(crate) fn build_lifecycle_ctx(env: &Environment, revision: &Revision) -> TelemetryCtx {
let tenant = env
.host_config
.tenant_org_id
.as_deref()
.unwrap_or(LOCAL_TENANT_FALLBACK);
TelemetryCtx::new(tenant)
.with_env(env.environment_id.as_str())
.with_deployment_id(revision.deployment_id.to_string())
.with_bundle_id(revision.bundle_id.to_string())
.with_revision_id(revision.revision_id.to_string())
}
pub(crate) fn build_traffic_split_ctx(
env: &Environment,
deployment_id: DeploymentId,
bundle_id: &BundleId,
new_generation: u64,
) -> TelemetryCtx {
let tenant = env
.host_config
.tenant_org_id
.as_deref()
.unwrap_or(LOCAL_TENANT_FALLBACK);
TelemetryCtx::new(tenant)
.with_env(env.environment_id.as_str())
.with_deployment_id(deployment_id.to_string())
.with_bundle_id(bundle_id.to_string())
.with_generation(new_generation)
}
pub(crate) fn emit_lifecycle_event(event: RolloutEvent, env: &Environment, revision: &Revision) {
let ctx = build_lifecycle_ctx(env, revision);
emit_rollout_event(event, &ctx);
}
pub(crate) fn emit_traffic_split_applied(
env: &Environment,
deployment_id: DeploymentId,
bundle_id: &BundleId,
new_generation: u64,
) {
let ctx = build_traffic_split_ctx(env, deployment_id, bundle_id, new_generation);
emit_rollout_event(RolloutEvent::TrafficSplitApplied, &ctx);
}
#[cfg(test)]
pub(crate) mod test_capture {
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::Once;
use tracing::Subscriber;
use tracing::field::{Field, Visit};
use tracing::span::{Attributes, Id};
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::LookupSpan;
thread_local! {
static EVENTS: RefCell<Vec<String>> = const { RefCell::new(Vec::new()) };
}
#[derive(Default)]
struct GrabEvent(HashMap<String, String>);
impl Visit for GrabEvent {
fn record_str(&mut self, field: &Field, value: &str) {
self.0.insert(field.name().to_string(), value.to_string());
}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.0
.entry(field.name().to_string())
.or_insert_with(|| format!("{value:?}"));
}
}
struct GlobalRolloutCapture;
impl<S> Layer<S> for GlobalRolloutCapture
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_new_span(&self, attrs: &Attributes<'_>, _id: &Id, _ctx: Context<'_, S>) {
let mut g = GrabEvent::default();
attrs.record(&mut g);
if let Some(ev) = g.0.remove("rollout.event") {
EVENTS.with(|e| e.borrow_mut().push(ev));
}
}
}
static INSTALL: Once = Once::new();
fn install_once() {
INSTALL.call_once(|| {
let _ = tracing_subscriber::registry()
.with(GlobalRolloutCapture)
.try_init();
});
}
pub(crate) fn capture_events<R>(f: impl FnOnce() -> R) -> (R, Vec<String>) {
install_once();
EVENTS.with(|e| e.borrow_mut().clear());
let r = f();
let events = EVENTS.with(|e| std::mem::take(&mut *e.borrow_mut()));
(r, events)
}
pub(crate) fn count(events: &[String], discriminant: &str) -> usize {
events.iter().filter(|e| e.as_str() == discriminant).count()
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use greentic_deploy_spec::{
BundleId, DeploymentId, EnvId, Environment, EnvironmentHostConfig, PackId, PackListEntry,
Revision, RevisionId, RevisionLifecycle, SchemaVersion, SemVer,
};
use std::path::PathBuf;
fn env_with_owner(owner: Option<&str>) -> Environment {
Environment {
schema: SchemaVersion::new(SchemaVersion::ENVIRONMENT_V1),
environment_id: EnvId::try_from("prod-eu").unwrap(),
name: "prod-eu".into(),
host_config: EnvironmentHostConfig {
env_id: EnvId::try_from("prod-eu").unwrap(),
region: None,
tenant_org_id: owner.map(str::to_string),
listen_addr: None,
},
packs: Vec::new(),
credentials_ref: None,
bundles: Vec::new(),
revisions: Vec::new(),
traffic_splits: Vec::new(),
messaging_endpoints: Vec::new(),
extensions: Vec::new(),
revocation: Default::default(),
retention: Default::default(),
health: Default::default(),
}
}
fn sample_revision() -> Revision {
Revision {
schema: SchemaVersion::new(SchemaVersion::REVISION_V1),
revision_id: RevisionId::new(),
env_id: EnvId::try_from("prod-eu").unwrap(),
bundle_id: BundleId::new("customer.support"),
deployment_id: DeploymentId::new(),
sequence: 1,
created_at: Utc.timestamp_opt(0, 0).unwrap(),
bundle_digest: "sha256:00".into(),
pack_list: vec![PackListEntry {
pack_id: PackId::new("greentic.support.pack"),
version: SemVer::new(1, 0, 0),
digest: "sha256:00".into(),
source_uri: None,
}],
pack_list_lock_ref: PathBuf::from("pack-list.lock"),
config_digest: "sha256:00".into(),
signature_sidecar_ref: PathBuf::from("rev.sig"),
lifecycle: RevisionLifecycle::Ready,
staged_at: None,
warmed_at: None,
drain_seconds: 30,
abort_metrics: Vec::new(),
}
}
fn get<'a>(kv: &'a [(&'static str, Option<&str>)], key: &str) -> Option<&'a str> {
kv.iter().find(|(k, _)| *k == key).and_then(|(_, v)| *v)
}
#[test]
fn lifecycle_ctx_uses_env_tenant_org_id_when_set() {
let env = env_with_owner(Some("acme"));
let rev = sample_revision();
let ctx = build_lifecycle_ctx(&env, &rev);
let kv = ctx.kv();
assert_eq!(get(&kv, "gt.tenant"), Some("acme"));
assert_eq!(get(&kv, "gt.env"), Some("prod-eu"));
assert_eq!(get(&kv, "gt.bundle_id"), Some("customer.support"));
assert_eq!(
get(&kv, "gt.deployment_id"),
Some(rev.deployment_id.to_string().as_str())
);
assert_eq!(
get(&kv, "gt.revision_id"),
Some(rev.revision_id.to_string().as_str())
);
}
#[test]
fn lifecycle_ctx_falls_back_to_local_tenant_when_unowned() {
let env = env_with_owner(None);
let rev = sample_revision();
let ctx = build_lifecycle_ctx(&env, &rev);
assert_eq!(get(&ctx.kv(), "gt.tenant"), Some(LOCAL_TENANT_FALLBACK));
}
#[test]
fn traffic_split_ctx_stamps_deployment_bundle_and_generation() {
let env = env_with_owner(Some("acme"));
let deployment_id = DeploymentId::new();
let bundle_id = BundleId::new("customer.support");
let ctx = build_traffic_split_ctx(&env, deployment_id, &bundle_id, 7);
let kv = ctx.kv();
assert_eq!(get(&kv, "gt.tenant"), Some("acme"));
assert_eq!(get(&kv, "gt.env"), Some("prod-eu"));
assert_eq!(
get(&kv, "gt.deployment_id"),
Some(deployment_id.to_string().as_str())
);
assert_eq!(get(&kv, "gt.bundle_id"), Some("customer.support"));
assert_eq!(get(&kv, "gt.generation"), Some("7"));
assert!(get(&kv, "gt.revision_id").is_none());
}
#[test]
fn emit_helpers_do_not_panic_without_subscriber() {
let env = env_with_owner(Some("acme"));
let rev = sample_revision();
emit_lifecycle_event(RolloutEvent::HealthGatePassed, &env, &rev);
emit_lifecycle_event(RolloutEvent::HealthGateFailed, &env, &rev);
emit_lifecycle_event(RolloutEvent::RevisionWarmed, &env, &rev);
emit_lifecycle_event(RolloutEvent::RevisionDraining, &env, &rev);
emit_lifecycle_event(RolloutEvent::RevisionEvicted, &env, &rev);
emit_traffic_split_applied(&env, rev.deployment_id, &rev.bundle_id, 3);
}
}