use std::fmt;
use serde::{Serialize, Serializer};
use serde_json::{json, Map, Value};
use crate::policy::PolicyViolation;
use crate::{
CloudEventV1, DnsAuthorityDnssecFailed, DnsAuthorityDrift, DnsAuthorityRebindRejected,
DnsAuthorityRebindThreshold, DnsQueryEvent, ExecutionCellSpec, ExportReceipt,
NetworkFlowDecision, WorkloadIdentity,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum LifecycleDestroyOutcome {
Succeeded,
Failed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum LifecycleTerminalState {
Clean,
Forced,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum LifecycleReason {
Oom,
TtlExceeded,
VmmCrashed,
BootFailed,
SignalKilled,
InitCrashed,
KernelCannotMountRoot,
Other(String),
}
impl LifecycleReason {
pub fn as_wire_str(&self) -> &str {
match self {
LifecycleReason::Oom => "oom",
LifecycleReason::TtlExceeded => "ttl_exceeded",
LifecycleReason::VmmCrashed => "vmm_crashed",
LifecycleReason::BootFailed => "boot_failed",
LifecycleReason::SignalKilled => "signal_killed",
LifecycleReason::InitCrashed => "init_crashed",
LifecycleReason::KernelCannotMountRoot => "kernel_cannot_mount_root",
LifecycleReason::Other(s) => s.as_str(),
}
}
}
impl fmt::Display for LifecycleReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_wire_str())
}
}
impl Serialize for LifecycleReason {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(self.as_wire_str())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum IdentityFailureOperation {
Materialize,
Revoke,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Provenance {
pub parent: String,
pub parent_type: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
#[serde(transparent)]
pub struct SubjectUrn(String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SubjectUrnError {
MissingUrnScheme,
TooFewSegments,
EmptySegment,
InvalidToolOrKindCharset,
ControlOrWhitespace,
}
impl std::fmt::Display for SubjectUrnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SubjectUrnError::MissingUrnScheme => f.write_str("subject URN must start with `urn:`"),
SubjectUrnError::TooFewSegments => {
f.write_str("subject URN must have shape `urn:<tool>:<kind>:<id>`")
}
SubjectUrnError::EmptySegment => {
f.write_str("subject URN tool / kind / id segments must each be non-empty")
}
SubjectUrnError::InvalidToolOrKindCharset => {
f.write_str("subject URN tool and kind must match charset [a-z0-9-]")
}
SubjectUrnError::ControlOrWhitespace => {
f.write_str("subject URN must not contain ASCII control characters or whitespace")
}
}
}
}
impl std::error::Error for SubjectUrnError {}
impl SubjectUrn {
pub fn parse(s: impl Into<String>) -> Result<Self, SubjectUrnError> {
let s = s.into();
if s.bytes()
.any(|b| b.is_ascii_control() || (b as char).is_whitespace())
{
return Err(SubjectUrnError::ControlOrWhitespace);
}
let rest = match s.strip_prefix("urn:") {
Some(r) => r,
None => return Err(SubjectUrnError::MissingUrnScheme),
};
let mut parts = rest.splitn(3, ':');
let tool = parts.next().ok_or(SubjectUrnError::TooFewSegments)?;
let kind = parts.next().ok_or(SubjectUrnError::TooFewSegments)?;
let id = parts.next().ok_or(SubjectUrnError::TooFewSegments)?;
if tool.is_empty() || kind.is_empty() || id.is_empty() {
return Err(SubjectUrnError::EmptySegment);
}
let ok_segment = |seg: &str| {
seg.bytes()
.all(|b| matches!(b, b'a'..=b'z' | b'0'..=b'9' | b'-'))
};
if !ok_segment(tool) || !ok_segment(kind) {
return Err(SubjectUrnError::InvalidToolOrKindCharset);
}
Ok(SubjectUrn(s))
}
pub fn as_str(&self) -> &str {
&self.0
}
pub fn into_inner(self) -> String {
self.0
}
}
impl AsRef<str> for SubjectUrn {
fn as_ref(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for SubjectUrn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
pub fn cell_subject_urn(cell_id: &str) -> Result<SubjectUrn, SubjectUrnError> {
SubjectUrn::parse(format!("urn:cellos:cell:{cell_id}"))
}
#[allow(clippy::too_many_arguments)]
pub fn lifecycle_started_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
derivation_verified: Option<bool>,
role_root: Option<&str>,
parent_run_id: Option<&str>,
spec_hash: Option<&str>,
kernel_digest_sha256: Option<&str>,
rootfs_digest_sha256: Option<&str>,
firecracker_digest_sha256: Option<&str>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("ttlSeconds".to_string(), json!(spec.lifetime.ttl_seconds));
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(verified) = derivation_verified {
m.insert("derivationVerified".to_string(), json!(verified));
}
if let Some(role) = role_root {
m.insert("roleRoot".to_string(), json!(role));
}
if let Some(parent) = parent_run_id {
m.insert("parentRunId".to_string(), json!(parent));
}
if let Some(hash) = spec_hash {
m.insert("specHash".to_string(), json!(hash));
}
if let Some(d) = kernel_digest_sha256 {
m.insert("kernelDigestSha256".to_string(), json!(d));
}
if let Some(d) = rootfs_digest_sha256 {
m.insert("rootfsDigestSha256".to_string(), json!(d));
}
if let Some(d) = firecracker_digest_sha256 {
m.insert("firecrackerDigestSha256".to_string(), json!(d));
}
if let Some(placement) = &spec.placement {
let mut placement_map = Map::new();
if let Some(pool_id) = &placement.pool_id {
placement_map.insert("poolId".to_string(), json!(pool_id));
}
if let Some(namespace) = &placement.kubernetes_namespace {
placement_map.insert("kubernetesNamespace".to_string(), json!(namespace));
}
if let Some(queue_name) = &placement.queue_name {
placement_map.insert("queueName".to_string(), json!(queue_name));
}
if !placement_map.is_empty() {
m.insert("placement".to_string(), Value::Object(placement_map));
}
}
if let Some(c) = &spec.correlation {
if let Some(tid) = &c.tenant_id {
m.insert("tenantId".to_string(), json!(tid));
}
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
#[allow(clippy::too_many_arguments)]
pub fn lifecycle_destroyed_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
outcome: LifecycleDestroyOutcome,
reason: Option<&str>,
terminal_state: Option<LifecycleTerminalState>,
evidence_bundle_ref: Option<&SubjectUrn>,
residue_class: Option<ResidueClass>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("ttlSeconds".to_string(), json!(spec.lifetime.ttl_seconds));
m.insert("outcome".to_string(), serde_json::to_value(outcome)?);
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
if let Some(tid) = &c.tenant_id {
m.insert("tenantId".to_string(), json!(tid));
}
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
if let Some(s) = reason {
m.insert("reason".to_string(), json!(s));
}
if let Some(ts) = terminal_state {
m.insert("terminalState".to_string(), serde_json::to_value(ts)?);
}
if let Some(urn) = evidence_bundle_ref {
m.insert("evidenceBundleRef".to_string(), json!(urn));
}
if let Some(rc) = residue_class {
m.insert("residueClass".to_string(), serde_json::to_value(rc)?);
}
Ok(Value::Object(m))
}
pub const LIFECYCLE_MANIFEST_FAILED_TYPE: &str =
"dev.cellos.events.cell.lifecycle.v1.manifest-failed";
pub fn manifest_failed_data_v1(
role: &str,
expected_sha256: &str,
actual_sha256: &str,
manifest_path: &str,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("role".to_string(), json!(role));
m.insert("expectedSha256".to_string(), json!(expected_sha256));
m.insert("actualSha256".to_string(), json!(actual_sha256));
m.insert("manifestPath".to_string(), json!(manifest_path));
Ok(Value::Object(m))
}
#[allow(clippy::too_many_arguments)]
pub fn lifecycle_destroyed_data_v1_typed(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
outcome: LifecycleDestroyOutcome,
reason: Option<LifecycleReason>,
terminal_state: Option<LifecycleTerminalState>,
evidence_bundle_ref: Option<&SubjectUrn>,
residue_class: Option<ResidueClass>,
) -> Result<Value, serde_json::Error> {
let reason_str = reason.as_ref().map(|r| r.as_wire_str());
lifecycle_destroyed_data_v1(
spec,
cell_id,
run_id,
outcome,
reason_str,
terminal_state,
evidence_bundle_ref,
residue_class,
)
}
pub fn identity_materialized_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
identity: &WorkloadIdentity,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("identity".to_string(), serde_json::to_value(identity)?);
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub fn identity_revoked_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
identity: &WorkloadIdentity,
reason: Option<&str>,
provenance: Option<&Provenance>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("identity".to_string(), serde_json::to_value(identity)?);
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
if let Some(s) = reason {
m.insert("reason".to_string(), json!(s));
}
if let Some(p) = provenance {
m.insert("provenance".to_string(), serde_json::to_value(p)?);
}
Ok(Value::Object(m))
}
pub fn identity_failed_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
identity: &WorkloadIdentity,
operation: IdentityFailureOperation,
reason: &str,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("identity".to_string(), serde_json::to_value(identity)?);
m.insert("operation".to_string(), serde_json::to_value(operation)?);
m.insert("reason".to_string(), json!(reason));
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub fn command_completed_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
argv: &[String],
exit_code: i32,
duration_ms: u64,
spawn_error: Option<&str>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("exitCode".to_string(), json!(exit_code));
m.insert("durationMs".to_string(), json!(duration_ms));
m.insert("argv".to_string(), json!(argv));
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
if let Some(s) = spawn_error {
m.insert("spawnError".to_string(), json!(s));
}
Ok(Value::Object(m))
}
pub fn observability_network_scope_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
egress_rule_count: usize,
has_opaque_network_authority: bool,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("egressRuleCount".to_string(), json!(egress_rule_count));
m.insert(
"hasOpaqueNetworkAuthority".to_string(),
json!(has_opaque_network_authority),
);
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub fn observability_process_spawned_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
program: &str,
argc: usize,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("program".to_string(), json!(program));
m.insert("argc".to_string(), json!(argc));
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub fn observability_fs_touch_export_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
source_path: &str,
artifact_name: &str,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("purpose".to_string(), json!("export"));
m.insert("sourcePath".to_string(), json!(source_path));
m.insert("artifactName".to_string(), json!(artifact_name));
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub fn export_completed_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
artifact_name: &str,
bytes_written: u64,
destination_relative: &str,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("artifactName".to_string(), json!(artifact_name));
m.insert("bytesWritten".to_string(), json!(bytes_written));
m.insert(
"destinationRelative".to_string(),
json!(destination_relative),
);
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub fn export_completed_data_v2(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
artifact_name: &str,
receipt: &ExportReceipt,
provenance: Option<&Provenance>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("artifactName".to_string(), json!(artifact_name));
m.insert("receipt".to_string(), serde_json::to_value(receipt)?);
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
if let Some(p) = provenance {
m.insert("provenance".to_string(), serde_json::to_value(p)?);
}
Ok(Value::Object(m))
}
#[allow(clippy::too_many_arguments)] pub fn export_failed_data_v2(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
artifact_name: &str,
target_kind: crate::ExportReceiptTargetKind,
target_name: Option<&str>,
destination: Option<&str>,
reason: &str,
provenance: Option<&Provenance>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("artifactName".to_string(), json!(artifact_name));
m.insert("targetKind".to_string(), serde_json::to_value(target_kind)?);
m.insert("reason".to_string(), json!(reason));
if let Some(name) = target_name {
m.insert("targetName".to_string(), json!(name));
}
if let Some(dest) = destination {
m.insert("destination".to_string(), json!(dest));
}
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
if let Some(p) = provenance {
m.insert("provenance".to_string(), serde_json::to_value(p)?);
}
Ok(Value::Object(m))
}
pub fn observability_network_policy_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
isolation_mode: &str,
egress_rules: &[crate::EgressRule],
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("isolationMode".to_string(), json!(isolation_mode));
m.insert("declaredEgressCount".to_string(), json!(egress_rules.len()));
m.insert(
"declaredEgress".to_string(),
serde_json::to_value(egress_rules)?,
);
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
#[allow(clippy::too_many_arguments)]
pub fn observability_network_enforcement_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
nft_rules_applied: bool,
declared_egress_rule_count: usize,
command_exit_code: i32,
spawn_error: Option<&str>,
) -> Result<Value, serde_json::Error> {
let supplementary = nft_rules_applied && declared_egress_rule_count > 0;
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("isolationMode".to_string(), json!("clone_newnet"));
m.insert("nftRulesApplied".to_string(), json!(nft_rules_applied));
m.insert(
"declaredEgressRuleCount".to_string(),
json!(declared_egress_rule_count),
);
m.insert(
"supplementaryEgressFilterActive".to_string(),
json!(supplementary),
);
m.insert("commandExitCode".to_string(), json!(command_exit_code));
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(s) = spawn_error {
m.insert("spawnError".to_string(), json!(s));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub const TRUST_PLANE_BUILTIN_KEYSET_ID: &str = "cellos:builtin-v0";
pub const TRUST_PLANE_BUILTIN_RESOLVER_KID: &str = "cellos-local-resolve-v0";
pub const TRUST_PLANE_BUILTIN_L7_KID: &str = "cellos-local-l7-v0";
pub const TRUST_PLANE_AGGREGATE_EGRESS_FQDN: &str = "declared-egress.trust.cellos.internal";
#[allow(clippy::too_many_arguments)]
pub fn observability_dns_resolution_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
fqdn: &str,
resolved_at: &str,
targets: &[(&str, &str, Option<u16>)],
ttl_seconds: i64,
policy_digest: &str,
keyset_id: &str,
issuer_kid: &str,
receipt_id: Option<&str>,
) -> Result<Value, serde_json::Error> {
let mut rows = Vec::with_capacity(targets.len());
for (addr, family, port) in targets {
let mut row = Map::new();
row.insert("address".to_string(), json!(addr));
row.insert("family".to_string(), json!(family));
if let Some(p) = port {
row.insert("port".to_string(), json!(p));
}
rows.push(Value::Object(row));
}
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(rid) = receipt_id {
m.insert("receiptId".to_string(), json!(rid));
}
m.insert("fqdn".to_string(), json!(fqdn));
m.insert("resolvedAt".to_string(), json!(resolved_at));
m.insert("targets".to_string(), Value::Array(rows));
m.insert("ttlSeconds".to_string(), json!(ttl_seconds));
m.insert("policyDigest".to_string(), json!(policy_digest));
m.insert("keysetId".to_string(), json!(keyset_id));
m.insert("issuerKid".to_string(), json!(issuer_kid));
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
#[allow(clippy::too_many_arguments)]
pub fn observability_dns_target_set_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
fqdn: &str,
previous_digest: &str,
current_digest: &str,
reason: &str,
updated_at: &str,
keyset_id: &str,
issuer_kid: &str,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
m.insert("fqdn".to_string(), json!(fqdn));
m.insert("previousDigest".to_string(), json!(previous_digest));
m.insert("currentDigest".to_string(), json!(current_digest));
m.insert("reason".to_string(), json!(reason));
m.insert("updatedAt".to_string(), json!(updated_at));
m.insert("keysetId".to_string(), json!(keyset_id));
m.insert("issuerKid".to_string(), json!(issuer_kid));
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
#[allow(clippy::too_many_arguments)]
pub fn dns_authority_drift_data_v1(drift: &DnsAuthorityDrift) -> Result<Value, serde_json::Error> {
serde_json::to_value(drift)
}
pub fn cloud_event_v1_dns_authority_drift(
source: &str,
time: &str,
drift: &DnsAuthorityDrift,
) -> Result<CloudEventV1, serde_json::Error> {
Ok(CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.observability.v1.dns_authority_drift".into(),
datacontenttype: Some("application/json".into()),
data: Some(dns_authority_drift_data_v1(drift)?),
time: Some(time.to_string()),
traceparent: None,
})
}
pub fn dns_authority_rebind_threshold_data_v1(
payload: &DnsAuthorityRebindThreshold,
) -> Result<Value, serde_json::Error> {
serde_json::to_value(payload)
}
pub fn cloud_event_v1_dns_authority_rebind_threshold(
source: &str,
time: &str,
payload: &DnsAuthorityRebindThreshold,
) -> Result<CloudEventV1, serde_json::Error> {
Ok(CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.observability.v1.dns_authority_rebind_threshold".into(),
datacontenttype: Some("application/json".into()),
data: Some(dns_authority_rebind_threshold_data_v1(payload)?),
time: Some(time.to_string()),
traceparent: None,
})
}
pub fn dns_authority_rebind_rejected_data_v1(
payload: &DnsAuthorityRebindRejected,
) -> Result<Value, serde_json::Error> {
serde_json::to_value(payload)
}
pub fn cloud_event_v1_dns_authority_rebind_rejected(
source: &str,
time: &str,
payload: &DnsAuthorityRebindRejected,
) -> Result<CloudEventV1, serde_json::Error> {
Ok(CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.observability.v1.dns_authority_rebind_rejected".into(),
datacontenttype: Some("application/json".into()),
data: Some(dns_authority_rebind_rejected_data_v1(payload)?),
time: Some(time.to_string()),
traceparent: None,
})
}
pub fn dns_authority_dnssec_failed_data_v1(
payload: &DnsAuthorityDnssecFailed,
) -> Result<Value, serde_json::Error> {
serde_json::to_value(payload)
}
pub fn cloud_event_v1_dns_authority_dnssec_failed(
source: &str,
time: &str,
payload: &DnsAuthorityDnssecFailed,
) -> Result<CloudEventV1, serde_json::Error> {
Ok(CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.observability.v1.dns_authority_dnssec_failed".into(),
datacontenttype: Some("application/json".into()),
data: Some(dns_authority_dnssec_failed_data_v1(payload)?),
time: Some(time.to_string()),
traceparent: None,
})
}
pub fn dns_query_data_v1(event: &DnsQueryEvent) -> Result<Value, serde_json::Error> {
serde_json::to_value(event)
}
pub fn cloud_event_v1_dns_query(
source: &str,
time: &str,
event: &DnsQueryEvent,
) -> Result<CloudEventV1, serde_json::Error> {
Ok(CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.observability.v1.dns_query".into(),
datacontenttype: Some("application/json".into()),
data: Some(dns_query_data_v1(event)?),
time: Some(time.to_string()),
traceparent: None,
})
}
#[must_use]
pub fn dns_query_permitted_data_v1(
qname: &str,
qtype: &str,
cell_id: &str,
resolver: &str,
) -> Value {
json!({
"schemaVersion": "1.0.0",
"queryName": qname,
"queryType": qtype,
"cellId": cell_id,
"resolver": resolver,
})
}
pub fn cloud_event_v1_dns_query_permitted(
source: &str,
time: &str,
qname: &str,
qtype: &str,
cell_id: &str,
resolver: &str,
) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.dns.v1.query_permitted".into(),
datacontenttype: Some("application/json".into()),
data: Some(dns_query_permitted_data_v1(qname, qtype, cell_id, resolver)),
time: Some(time.to_string()),
traceparent: None,
}
}
#[must_use]
pub fn dns_query_refused_data_v1(qname: &str, qtype: &str, cell_id: &str, reason: &str) -> Value {
json!({
"schemaVersion": "1.0.0",
"queryName": qname,
"queryType": qtype,
"cellId": cell_id,
"reason": reason,
})
}
pub fn cloud_event_v1_dns_query_refused(
source: &str,
time: &str,
qname: &str,
qtype: &str,
cell_id: &str,
reason: &str,
) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.dns.v1.query_refused".into(),
datacontenttype: Some("application/json".into()),
data: Some(dns_query_refused_data_v1(qname, qtype, cell_id, reason)),
time: Some(time.to_string()),
traceparent: None,
}
}
pub fn keyset_verified_data_v1(
keyset_id: &str,
payload_digest: &str,
verified_signer_kid: &str,
verified_at: &str,
correlation_id: Option<&str>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("schemaVersion".to_string(), json!("1.0.0"));
m.insert("keysetId".to_string(), json!(keyset_id));
m.insert("payloadDigest".to_string(), json!(payload_digest));
m.insert("verifiedSignerKid".to_string(), json!(verified_signer_kid));
m.insert("verifiedAt".to_string(), json!(verified_at));
if let Some(cid) = correlation_id {
m.insert("correlationId".to_string(), json!(cid));
}
Ok(Value::Object(m))
}
pub fn cloud_event_v1_keyset_verified(
source: &str,
time: &str,
keyset_id: &str,
payload_digest: &str,
verified_signer_kid: &str,
verified_at: &str,
correlation_id: Option<&str>,
) -> Result<CloudEventV1, serde_json::Error> {
Ok(CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.trust.v1.keyset_verified".into(),
datacontenttype: Some("application/json".into()),
data: Some(keyset_verified_data_v1(
keyset_id,
payload_digest,
verified_signer_kid,
verified_at,
correlation_id,
)?),
time: Some(time.to_string()),
traceparent: None,
})
}
pub fn keyset_verification_failed_data_v1(
attempted_keyset_path: &str,
reason: &str,
failed_at: &str,
correlation_id: Option<&str>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("schemaVersion".to_string(), json!("1.0.0"));
m.insert(
"attemptedKeysetPath".to_string(),
json!(attempted_keyset_path),
);
m.insert("reason".to_string(), json!(reason));
m.insert("failedAt".to_string(), json!(failed_at));
if let Some(cid) = correlation_id {
m.insert("correlationId".to_string(), json!(cid));
}
Ok(Value::Object(m))
}
pub fn cloud_event_v1_keyset_verification_failed(
source: &str,
time: &str,
attempted_keyset_path: &str,
reason: &str,
failed_at: &str,
correlation_id: Option<&str>,
) -> Result<CloudEventV1, serde_json::Error> {
Ok(CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.trust.v1.keyset_verification_failed".into(),
datacontenttype: Some("application/json".into()),
data: Some(keyset_verification_failed_data_v1(
attempted_keyset_path,
reason,
failed_at,
correlation_id,
)?),
time: Some(time.to_string()),
traceparent: None,
})
}
pub fn network_flow_decision_data_v1(
decision: &NetworkFlowDecision,
) -> Result<Value, serde_json::Error> {
serde_json::to_value(decision)
}
pub fn cloud_event_v1_network_flow_decision(
source: &str,
time: &str,
decision: &NetworkFlowDecision,
) -> Result<CloudEventV1, serde_json::Error> {
Ok(CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.observability.v1.network_flow_decision".into(),
datacontenttype: Some("application/json".into()),
data: Some(network_flow_decision_data_v1(decision)?),
time: Some(time.to_string()),
traceparent: None,
})
}
#[allow(clippy::too_many_arguments)]
pub fn observability_l7_egress_decision_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
decision_id: &str,
action: &str,
sni_host: &str,
policy_digest: &str,
keyset_id: &str,
issuer_kid: &str,
reason_code: &str,
rule_ref: Option<&str>,
stream_id: Option<u32>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
m.insert("decisionId".to_string(), json!(decision_id));
m.insert("action".to_string(), json!(action));
m.insert("sniHost".to_string(), json!(sni_host));
m.insert("policyDigest".to_string(), json!(policy_digest));
m.insert("keysetId".to_string(), json!(keyset_id));
m.insert("issuerKid".to_string(), json!(issuer_kid));
m.insert("reasonCode".to_string(), json!(reason_code));
if let Some(rr) = rule_ref {
m.insert("ruleRef".to_string(), json!(rr));
}
if let Some(sid) = stream_id {
m.insert("streamId".to_string(), json!(sid));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub fn observability_container_security_data_v1(
cell_id: &str,
run_id: Option<&str>,
cap_eff: Option<&str>,
cap_prm: Option<&str>,
cap_bnd: Option<&str>,
cap_amb: Option<&str>,
cap_inh: Option<&str>,
) -> Value {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let (Some(eff), Some(prm), Some(bnd), Some(amb), Some(inh)) =
(cap_eff, cap_prm, cap_bnd, cap_amb, cap_inh)
{
m.insert("capEff".to_string(), json!(eff));
m.insert("capPrm".to_string(), json!(prm));
m.insert("capBnd".to_string(), json!(bnd));
m.insert("capAmb".to_string(), json!(amb));
m.insert("capInh".to_string(), json!(inh));
let privileged = eff == "0000001fffffffff";
m.insert("privileged".to_string(), json!(privileged));
}
Value::Object(m)
}
pub fn compliance_summary_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
command_exit_code: Option<i32>,
) -> Result<Value, serde_json::Error> {
compliance_summary_data_v1_with_subjects(spec, cell_id, run_id, command_exit_code, &[])
}
pub fn compliance_summary_data_v1_with_subjects(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
command_exit_code: Option<i32>,
subject_urns: &[SubjectUrn],
) -> Result<Value, serde_json::Error> {
let egress_rule_count = spec
.authority
.egress_rules
.as_ref()
.map(|v| v.len())
.unwrap_or(0);
let export_target_count = spec
.export
.as_ref()
.and_then(|e| e.targets.as_ref())
.map(|t| t.len())
.unwrap_or(0);
let resource_limits_present = spec.run.as_ref().and_then(|r| r.limits.as_ref()).is_some();
let secret_delivery_mode = spec
.run
.as_ref()
.map(|r| serde_json::to_value(&r.secret_delivery))
.transpose()?
.unwrap_or(serde_json::Value::String("env".into()));
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert(
"lifetimeTtlSeconds".to_string(),
json!(spec.lifetime.ttl_seconds),
);
m.insert("egressRuleCount".to_string(), json!(egress_rule_count));
m.insert(
"resourceLimitsPresent".to_string(),
json!(resource_limits_present),
);
m.insert("secretDeliveryMode".to_string(), secret_delivery_mode);
m.insert("exportTargetCount".to_string(), json!(export_target_count));
if let Some(policy) = &spec.policy {
if let Some(id) = &policy.pack_id {
m.insert("policyPackId".to_string(), json!(id));
}
if let Some(ver) = &policy.pack_version {
m.insert("policyPackVersion".to_string(), json!(ver));
}
if let Some(digest) = &policy.bundle_digest {
m.insert("policyBundleDigest".to_string(), json!(digest));
}
}
if let Some(placement) = &spec.placement {
let mut placement_map = Map::new();
if let Some(pool_id) = &placement.pool_id {
placement_map.insert("poolId".to_string(), json!(pool_id));
}
if let Some(namespace) = &placement.kubernetes_namespace {
placement_map.insert("kubernetesNamespace".to_string(), json!(namespace));
}
if let Some(queue_name) = &placement.queue_name {
placement_map.insert("queueName".to_string(), json!(queue_name));
}
if !placement_map.is_empty() {
m.insert("placement".to_string(), Value::Object(placement_map));
}
}
if let Some(code) = command_exit_code {
m.insert("commandExitCode".to_string(), json!(code));
}
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
if !subject_urns.is_empty() {
let urns: Vec<Value> = subject_urns.iter().map(|u| json!(u)).collect();
m.insert("subjectUrns".to_string(), Value::Array(urns));
}
Ok(Value::Object(m))
}
pub fn policy_rejected_data_v1(
spec: &ExecutionCellSpec,
violations: &[PolicyViolation],
) -> Result<Value, serde_json::Error> {
let violation_values: Vec<Value> = violations
.iter()
.map(|v| {
json!({
"rule": v.rule,
"message": v.message,
})
})
.collect();
let mut m = Map::new();
m.insert("specId".to_string(), json!(&spec.id));
m.insert("violationCount".to_string(), json!(violations.len()));
m.insert("violations".to_string(), Value::Array(violation_values));
if let Some(policy) = &spec.policy {
if let Some(id) = &policy.pack_id {
m.insert("policyPackId".to_string(), json!(id));
}
if let Some(ver) = &policy.pack_version {
m.insert("policyPackVersion".to_string(), json!(ver));
}
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub fn authz_rejected_data_v1(
spec: &ExecutionCellSpec,
reason: &str,
message: &str,
denied_pool_id: Option<&str>,
denied_policy_pack_id: Option<&str>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("specId".to_string(), json!(&spec.id));
m.insert("reason".to_string(), json!(reason));
m.insert("message".to_string(), json!(message));
let tenant_id = spec
.correlation
.as_ref()
.and_then(|c| c.tenant_id.as_deref());
if let Some(t) = tenant_id {
m.insert("tenantId".to_string(), json!(t));
m.insert("subject".to_string(), json!(t));
}
if let Some(p) = denied_pool_id {
m.insert("deniedPoolId".to_string(), json!(p));
}
if let Some(p) = denied_policy_pack_id {
m.insert("deniedPolicyPackId".to_string(), json!(p));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub fn homeostasis_signal_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
signal: &crate::HomeostasisSignal,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("specHash".to_string(), json!(&signal.spec_hash));
m.insert(
"declaredEgressRules".to_string(),
json!(signal.declared_egress_rules),
);
m.insert(
"exercisedEgressConnections".to_string(),
match signal.exercised_egress_connections {
Some(n) => json!(n),
None => Value::Null,
},
);
if let Some(reason) = &signal.exercised_egress_reason {
m.insert("exercisedEgressReason".to_string(), json!(reason));
}
m.insert(
"declaredMountPaths".to_string(),
json!(signal.declared_mount_paths),
);
m.insert(
"accessedMountPaths".to_string(),
json!(signal.accessed_mount_paths),
);
m.insert(
"declaredSecretCount".to_string(),
json!(signal.declared_secret_count),
);
m.insert(
"authorityEfficiency".to_string(),
json!(signal.authority_efficiency),
);
m.insert(
"recommendedRemovals".to_string(),
serde_json::to_value(&signal.recommended_removals)?,
);
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub fn homeostasis_violation_data_v1(
cell_id: &str,
declared_egress: u64,
exercised_egress: u64,
spec_hash: &str,
) -> Value {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert(
"declaredEgressRuleCount".to_string(),
json!(declared_egress),
);
m.insert(
"exercisedEgressConnections".to_string(),
json!(exercised_egress),
);
m.insert(
"overage".to_string(),
json!(exercised_egress.saturating_sub(declared_egress)),
);
m.insert("specHash".to_string(), json!(spec_hash));
m.insert("severity".to_string(), json!("critical"));
Value::Object(m)
}
#[allow(clippy::too_many_arguments)]
pub fn observability_host_fc_metrics_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
spec_signature_hash: &str,
sampled_at_unix_ms: u64,
fc_socket_path: &str,
vcpu_exits_total: Option<u64>,
vsock_tx_bytes: Option<u64>,
vsock_rx_bytes: Option<u64>,
block_read_ops: Option<u64>,
block_write_ops: Option<u64>,
sample_error: Option<&str>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("specSignatureHash".to_string(), json!(spec_signature_hash));
m.insert("sampledAtUnixMs".to_string(), json!(sampled_at_unix_ms));
m.insert("fcSocketPath".to_string(), json!(fc_socket_path));
if let Some(v) = vcpu_exits_total {
m.insert("vcpuExitsTotal".to_string(), json!(v));
}
if let Some(v) = vsock_tx_bytes {
m.insert("vsockTxBytes".to_string(), json!(v));
}
if let Some(v) = vsock_rx_bytes {
m.insert("vsockRxBytes".to_string(), json!(v));
}
if let Some(v) = block_read_ops {
m.insert("blockReadOps".to_string(), json!(v));
}
if let Some(v) = block_write_ops {
m.insert("blockWriteOps".to_string(), json!(v));
}
if let Some(e) = sample_error {
m.insert("sampleError".to_string(), json!(e));
}
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
#[derive(Debug, Default, Clone)]
pub struct CgroupSample<'a> {
pub memory_events: Option<&'a [(&'a str, u64)]>,
pub cpu_stat: Option<&'a [(&'a str, u64)]>,
pub pids_events: Option<&'a [(&'a str, u64)]>,
}
fn cgroup_section(keys: &[&str], pairs: Option<&[(&str, u64)]>) -> Option<Value> {
let pairs = pairs?;
let mut section = Map::new();
for (k, v) in pairs {
if keys.contains(k) {
section.insert((*k).to_string(), json!(v));
}
}
if section.is_empty() {
None
} else {
Some(Value::Object(section))
}
}
#[allow(clippy::too_many_arguments)]
pub fn observability_host_cgroup_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
spec_signature_hash: &str,
sampled_at_unix_ms: u64,
cgroup_path: &str,
sample: &CgroupSample<'_>,
sample_error: Option<&str>,
) -> Result<Value, serde_json::Error> {
const MEM_KEYS: &[&str] = &["low", "high", "max", "oom", "oomKill"];
const CPU_KEYS: &[&str] = &[
"usageUsec",
"userUsec",
"systemUsec",
"nrPeriods",
"nrThrottled",
"throttledUsec",
];
const PIDS_KEYS: &[&str] = &["max"];
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("specSignatureHash".to_string(), json!(spec_signature_hash));
m.insert("sampledAtUnixMs".to_string(), json!(sampled_at_unix_ms));
m.insert("cgroupPath".to_string(), json!(cgroup_path));
if let Some(v) = cgroup_section(MEM_KEYS, sample.memory_events) {
m.insert("memoryEvents".to_string(), v);
}
if let Some(v) = cgroup_section(CPU_KEYS, sample.cpu_stat) {
m.insert("cpuStat".to_string(), v);
}
if let Some(v) = cgroup_section(PIDS_KEYS, sample.pids_events) {
m.insert("pidsEvents".to_string(), v);
}
if let Some(e) = sample_error {
m.insert("sampleError".to_string(), json!(e));
}
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
#[derive(Debug, Clone)]
pub struct NftRuleCounter<'a> {
pub rule_handle: &'a str,
pub verdict: Option<&'a str>,
pub packets: u64,
pub bytes: u64,
pub r#match: Option<&'a str>,
}
#[allow(clippy::too_many_arguments)]
pub fn observability_host_nftables_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
spec_signature_hash: &str,
sampled_at_unix_ms: u64,
table_name: &str,
rule_counters: &[NftRuleCounter<'_>],
sample_error: Option<&str>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("specSignatureHash".to_string(), json!(spec_signature_hash));
m.insert("sampledAtUnixMs".to_string(), json!(sampled_at_unix_ms));
m.insert("tableName".to_string(), json!(table_name));
let counters: Vec<Value> = rule_counters
.iter()
.map(|c| {
let mut row = Map::new();
row.insert("ruleHandle".to_string(), json!(c.rule_handle));
if let Some(v) = c.verdict {
row.insert("verdict".to_string(), json!(v));
}
row.insert("packets".to_string(), json!(c.packets));
row.insert("bytes".to_string(), json!(c.bytes));
if let Some(mt) = c.r#match {
row.insert("match".to_string(), json!(mt));
}
Value::Object(row)
})
.collect();
m.insert("ruleCounters".to_string(), Value::Array(counters));
if let Some(e) = sample_error {
m.insert("sampleError".to_string(), json!(e));
}
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
#[derive(Debug, Default, Clone, Copy)]
pub struct TapStats {
pub rx_packets: Option<u64>,
pub tx_packets: Option<u64>,
pub rx_bytes: Option<u64>,
pub tx_bytes: Option<u64>,
pub rx_errors: Option<u64>,
pub tx_errors: Option<u64>,
pub rx_dropped: Option<u64>,
pub tx_dropped: Option<u64>,
}
#[allow(clippy::too_many_arguments)]
pub fn observability_host_tap_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
spec_signature_hash: &str,
sampled_at_unix_ms: u64,
tap_name: &str,
link_state: &str,
stats: &TapStats,
sample_error: Option<&str>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("specSignatureHash".to_string(), json!(spec_signature_hash));
m.insert("sampledAtUnixMs".to_string(), json!(sampled_at_unix_ms));
m.insert("tapName".to_string(), json!(tap_name));
m.insert("linkState".to_string(), json!(link_state));
if let Some(v) = stats.rx_packets {
m.insert("rxPackets".to_string(), json!(v));
}
if let Some(v) = stats.tx_packets {
m.insert("txPackets".to_string(), json!(v));
}
if let Some(v) = stats.rx_bytes {
m.insert("rxBytes".to_string(), json!(v));
}
if let Some(v) = stats.tx_bytes {
m.insert("txBytes".to_string(), json!(v));
}
if let Some(v) = stats.rx_errors {
m.insert("rxErrors".to_string(), json!(v));
}
if let Some(v) = stats.tx_errors {
m.insert("txErrors".to_string(), json!(v));
}
if let Some(v) = stats.rx_dropped {
m.insert("rxDropped".to_string(), json!(v));
}
if let Some(v) = stats.tx_dropped {
m.insert("txDropped".to_string(), json!(v));
}
if let Some(e) = sample_error {
m.insert("sampleError".to_string(), json!(e));
}
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ResidueClass {
None,
DocumentedException,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum LifecycleResidueClass {
None,
MetadataOnly,
DocumentedException,
Unknown,
}
#[derive(Debug, Default, Clone)]
pub struct EvidenceBundleRefs<'a> {
pub started_event_ref: &'a str,
pub cell_destroyed_event_ref: &'a str,
pub command_completed_event_ref: Option<&'a str>,
pub spawned_event_refs: &'a [&'a str],
pub fc_metrics_event_refs: &'a [&'a str],
pub cgroup_event_refs: &'a [&'a str],
pub nftables_event_refs: &'a [&'a str],
pub tap_event_refs: &'a [&'a str],
pub homeostasis_event_ref: Option<&'a str>,
pub compliance_summary_event_ref: Option<&'a str>,
pub guest_event_refs: &'a [(&'a str, &'a str, &'a str)],
pub residue_exception: Option<&'a str>,
}
pub fn evidence_bundle_emitted_data_v1(
spec: &ExecutionCellSpec,
cell_id: &str,
run_id: Option<&str>,
spec_signature_hash: &str,
emitted_at_unix_ms: u64,
residue_class: ResidueClass,
refs: &EvidenceBundleRefs<'_>,
) -> Result<Value, serde_json::Error> {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("specId".to_string(), json!(&spec.id));
m.insert("specSignatureHash".to_string(), json!(spec_signature_hash));
m.insert("emittedAtUnixMs".to_string(), json!(emitted_at_unix_ms));
let mut lifecycle = Map::new();
lifecycle.insert("started".to_string(), json!(refs.started_event_ref));
lifecycle.insert(
"destroyed".to_string(),
json!(refs.cell_destroyed_event_ref),
);
if let Some(cc) = refs.command_completed_event_ref {
lifecycle.insert("commandCompleted".to_string(), json!(cc));
}
if !refs.spawned_event_refs.is_empty() {
lifecycle.insert("spawned".to_string(), json!(refs.spawned_event_refs));
}
m.insert("lifecycleEventRefs".to_string(), Value::Object(lifecycle));
m.insert(
"cellDestroyedEventRef".to_string(),
json!(refs.cell_destroyed_event_ref),
);
m.insert(
"residueClass".to_string(),
serde_json::to_value(residue_class)?,
);
if let Some(rx) = refs.residue_exception {
m.insert("residueException".to_string(), json!(rx));
}
let any_host = !refs.fc_metrics_event_refs.is_empty()
|| !refs.cgroup_event_refs.is_empty()
|| !refs.nftables_event_refs.is_empty()
|| !refs.tap_event_refs.is_empty();
if any_host {
let mut host = Map::new();
if !refs.fc_metrics_event_refs.is_empty() {
host.insert("fcMetrics".to_string(), json!(refs.fc_metrics_event_refs));
}
if !refs.cgroup_event_refs.is_empty() {
host.insert("cgroup".to_string(), json!(refs.cgroup_event_refs));
}
if !refs.nftables_event_refs.is_empty() {
host.insert("nftables".to_string(), json!(refs.nftables_event_refs));
}
if !refs.tap_event_refs.is_empty() {
host.insert("tap".to_string(), json!(refs.tap_event_refs));
}
m.insert("hostProbeEventRefs".to_string(), Value::Object(host));
}
if !refs.guest_event_refs.is_empty() {
let rows: Vec<Value> = refs
.guest_event_refs
.iter()
.map(|(id, ty, rc)| {
let mut row = Map::new();
row.insert("eventId".to_string(), json!(id));
row.insert("eventType".to_string(), json!(ty));
row.insert("ruleClass".to_string(), json!(rc));
Value::Object(row)
})
.collect();
m.insert("guestEventRefs".to_string(), Value::Array(rows));
}
if let Some(h) = refs.homeostasis_event_ref {
m.insert("homeostasisEventRef".to_string(), json!(h));
}
if let Some(c) = refs.compliance_summary_event_ref {
m.insert("complianceSummaryEventRef".to_string(), json!(c));
}
if let Some(r) = run_id {
m.insert("runId".to_string(), json!(r));
}
if let Some(c) = &spec.correlation {
m.insert("correlation".to_string(), serde_json::to_value(c)?);
}
Ok(Value::Object(m))
}
pub fn cortex_dispatched_data_v1(pack_id: &str, cell_id: &str, doctrine_refs: &[String]) -> Value {
let mut m = Map::new();
m.insert("packId".to_string(), json!(pack_id));
m.insert("cellId".to_string(), json!(cell_id));
m.insert("doctrineRefs".to_string(), json!(doctrine_refs));
m.insert("bridgeVersion".to_string(), json!("1.0"));
Value::Object(m)
}
pub fn firecracker_pool_event_data_v1(cell_id: &str, pool_hit: bool, slot_count: usize) -> Value {
let mut m = Map::new();
m.insert("cellId".to_string(), json!(cell_id));
m.insert("poolHit".to_string(), json!(pool_hit));
m.insert("slotCount".to_string(), json!(slot_count));
Value::Object(m)
}
pub fn cloud_event_v1_cortex_dispatched(
source: &str,
time: &str,
pack_id: &str,
cell_id: &str,
doctrine_refs: &[String],
) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.cortex.v1.dispatched".into(),
datacontenttype: Some("application/json".into()),
data: Some(cortex_dispatched_data_v1(pack_id, cell_id, doctrine_refs)),
time: Some(time.to_string()),
traceparent: None,
}
}
pub fn cloud_event_v1_firecracker_pool_checkout(
source: &str,
time: &str,
cell_id: &str,
pool_hit: bool,
slot_count: usize,
) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: "dev.cellos.events.cell.firecracker.v1.pool_checkout".into(),
datacontenttype: Some("application/json".into()),
data: Some(firecracker_pool_event_data_v1(
cell_id, pool_hit, slot_count,
)),
time: Some(time.to_string()),
traceparent: None,
}
}
pub const FORMATION_CREATED_TYPE: &str = "dev.cellos.events.cell.formation.v1.created";
pub const FORMATION_LAUNCHING_TYPE: &str = "dev.cellos.events.cell.formation.v1.launching";
pub const FORMATION_RUNNING_TYPE: &str = "dev.cellos.events.cell.formation.v1.running";
pub const FORMATION_DEGRADED_TYPE: &str = "dev.cellos.events.cell.formation.v1.degraded";
pub const FORMATION_COMPLETED_TYPE: &str = "dev.cellos.events.cell.formation.v1.completed";
pub const FORMATION_FAILED_TYPE: &str = "dev.cellos.events.cell.formation.v1.failed";
fn formation_data_v1(
formation_id: &str,
formation_name: &str,
cell_count: u32,
failed_cell_ids: &[String],
reason: Option<&str>,
) -> Value {
let mut m = Map::new();
m.insert("formationId".to_string(), json!(formation_id));
m.insert("formationName".to_string(), json!(formation_name));
m.insert("cellCount".to_string(), json!(cell_count));
m.insert("failedCellIds".to_string(), json!(failed_cell_ids));
if let Some(r) = reason {
m.insert("reason".to_string(), json!(r));
}
Value::Object(m)
}
pub fn cloud_event_v1_formation_created(
source: &str,
time: &str,
formation_id: &str,
formation_name: &str,
cell_count: u32,
failed_cell_ids: &[String],
reason: Option<&str>,
) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: FORMATION_CREATED_TYPE.to_string(),
datacontenttype: Some("application/json".into()),
data: Some(formation_data_v1(
formation_id,
formation_name,
cell_count,
failed_cell_ids,
reason,
)),
time: Some(time.to_string()),
traceparent: None,
}
}
pub fn cloud_event_v1_formation_launching(
source: &str,
time: &str,
formation_id: &str,
formation_name: &str,
cell_count: u32,
failed_cell_ids: &[String],
reason: Option<&str>,
) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: FORMATION_LAUNCHING_TYPE.to_string(),
datacontenttype: Some("application/json".into()),
data: Some(formation_data_v1(
formation_id,
formation_name,
cell_count,
failed_cell_ids,
reason,
)),
time: Some(time.to_string()),
traceparent: None,
}
}
pub fn cloud_event_v1_formation_running(
source: &str,
time: &str,
formation_id: &str,
formation_name: &str,
cell_count: u32,
failed_cell_ids: &[String],
reason: Option<&str>,
) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: FORMATION_RUNNING_TYPE.to_string(),
datacontenttype: Some("application/json".into()),
data: Some(formation_data_v1(
formation_id,
formation_name,
cell_count,
failed_cell_ids,
reason,
)),
time: Some(time.to_string()),
traceparent: None,
}
}
pub fn cloud_event_v1_formation_degraded(
source: &str,
time: &str,
formation_id: &str,
formation_name: &str,
cell_count: u32,
failed_cell_ids: &[String],
reason: Option<&str>,
) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: FORMATION_DEGRADED_TYPE.to_string(),
datacontenttype: Some("application/json".into()),
data: Some(formation_data_v1(
formation_id,
formation_name,
cell_count,
failed_cell_ids,
reason,
)),
time: Some(time.to_string()),
traceparent: None,
}
}
pub fn cloud_event_v1_formation_completed(
source: &str,
time: &str,
formation_id: &str,
formation_name: &str,
cell_count: u32,
failed_cell_ids: &[String],
reason: Option<&str>,
) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: FORMATION_COMPLETED_TYPE.to_string(),
datacontenttype: Some("application/json".into()),
data: Some(formation_data_v1(
formation_id,
formation_name,
cell_count,
failed_cell_ids,
reason,
)),
time: Some(time.to_string()),
traceparent: None,
}
}
pub fn cloud_event_v1_formation_failed(
source: &str,
time: &str,
formation_id: &str,
formation_name: &str,
cell_count: u32,
failed_cell_ids: &[String],
reason: Option<&str>,
) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: source.to_string(),
ty: FORMATION_FAILED_TYPE.to_string(),
datacontenttype: Some("application/json".into()),
data: Some(formation_data_v1(
formation_id,
formation_name,
cell_count,
failed_cell_ids,
reason,
)),
time: Some(time.to_string()),
traceparent: None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
Correlation, ExecutionCellDocument, ExportReceipt, ExportReceiptTargetKind, Lifetime,
WorkloadIdentity, WorkloadIdentityKind,
};
#[test]
fn lifecycle_started_matches_example_shape() {
let raw =
include_str!("../../../contracts/examples/execution-cell-ci-correlation.valid.json");
let doc: ExecutionCellDocument = serde_json::from_str(raw).unwrap();
let expected: Value = serde_json::from_str(include_str!(
"../../../contracts/examples/cell-lifecycle-started-data.valid.json"
))
.unwrap();
let data = lifecycle_started_data_v1(
&doc.spec,
"host-cell-abc123",
Some("run-2026-04-06-001"),
None,
None,
None,
None,
None,
None,
None,
)
.unwrap();
assert_eq!(data, expected);
}
#[test]
fn lifecycle_started_without_correlation() {
let spec = ExecutionCellSpec {
id: "s1".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let data =
lifecycle_started_data_v1(&spec, "c1", None, None, None, None, None, None, None, None)
.unwrap();
assert!(!data.as_object().unwrap().contains_key("correlation"));
assert!(!data.as_object().unwrap().contains_key("runId"));
assert!(!data.as_object().unwrap().contains_key("derivationVerified"));
assert!(!data.as_object().unwrap().contains_key("roleRoot"));
assert!(!data.as_object().unwrap().contains_key("parentRunId"));
assert!(!data.as_object().unwrap().contains_key("kernelDigestSha256"));
assert!(!data.as_object().unwrap().contains_key("rootfsDigestSha256"));
assert!(!data
.as_object()
.unwrap()
.contains_key("firecrackerDigestSha256"));
}
#[test]
fn lifecycle_started_partial_correlation_serializes() {
let spec = ExecutionCellSpec {
id: "s2".into(),
correlation: Some(Correlation {
platform: Some("custom".into()),
external_run_id: None,
external_job_id: None,
tenant_id: None,
labels: None,
correlation_id: None,
}),
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 1 },
export: None,
telemetry: None,
};
let data =
lifecycle_started_data_v1(&spec, "c2", None, None, None, None, None, None, None, None)
.unwrap();
assert_eq!(data["correlation"]["platform"], "custom");
}
#[test]
fn lifecycle_started_with_derivation_fields_emits_them() {
let spec = ExecutionCellSpec {
id: "deriv-1".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let data = lifecycle_started_data_v1(
&spec,
"cell-deriv",
Some("run-deriv-1"),
Some(false),
Some("role-prod-ci"),
Some("run-parent-001"),
Some("abc123def456"),
None,
None,
None,
)
.unwrap();
assert_eq!(data["derivationVerified"], false);
assert_eq!(data["roleRoot"], "role-prod-ci");
assert_eq!(data["parentRunId"], "run-parent-001");
}
#[test]
fn lifecycle_destroyed_succeeded_shape() {
let raw =
include_str!("../../../contracts/examples/execution-cell-ci-correlation.valid.json");
let doc: ExecutionCellDocument = serde_json::from_str(raw).unwrap();
let data = lifecycle_destroyed_data_v1(
&doc.spec,
"host-xyz",
Some("run-test"),
LifecycleDestroyOutcome::Succeeded,
None,
None,
None,
None,
)
.unwrap();
assert_eq!(data["outcome"], "succeeded");
assert!(!data.as_object().unwrap().contains_key("reason"));
assert!(
!data.as_object().unwrap().contains_key("terminalState"),
"terminalState must be omitted when None for backward-compat"
);
assert!(
!data.as_object().unwrap().contains_key("evidenceBundleRef"),
"evidenceBundleRef must be omitted when None for backward-compat"
);
assert!(
!data.as_object().unwrap().contains_key("residueClass"),
"residueClass must be omitted when None for backward-compat"
);
assert_eq!(data["ttlSeconds"], 3600);
}
#[test]
fn lifecycle_destroyed_failed_includes_reason() {
let spec = ExecutionCellSpec {
id: "s1".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let data = lifecycle_destroyed_data_v1(
&spec,
"c1",
None,
LifecycleDestroyOutcome::Failed,
Some("secret resolve: denied"),
None,
None,
None,
)
.unwrap();
assert_eq!(data["outcome"], "failed");
assert_eq!(data["reason"], "secret resolve: denied");
}
#[test]
fn lifecycle_destroyed_terminal_state_clean_serializes() {
let spec = ExecutionCellSpec {
id: "term-clean".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let data = lifecycle_destroyed_data_v1(
&spec,
"c-clean",
None,
LifecycleDestroyOutcome::Succeeded,
None,
Some(LifecycleTerminalState::Clean),
None,
None,
)
.unwrap();
assert_eq!(data["terminalState"], "clean");
}
#[test]
fn lifecycle_destroyed_terminal_state_forced_serializes() {
let spec = ExecutionCellSpec {
id: "term-forced".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let data = lifecycle_destroyed_data_v1(
&spec,
"c-forced",
None,
LifecycleDestroyOutcome::Failed,
Some("in-VM exit bridge: vsock closed"),
Some(LifecycleTerminalState::Forced),
None,
None,
)
.unwrap();
assert_eq!(data["terminalState"], "forced");
assert_eq!(data["outcome"], "failed");
}
#[test]
fn lifecycle_destroyed_evidence_bundle_and_residue_class_serialize_when_populated() {
let spec = ExecutionCellSpec {
id: "f5-populated".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let bundle = SubjectUrn::parse("urn:cellos:evidence-bundle:run-1").unwrap();
let data = lifecycle_destroyed_data_v1(
&spec,
"c-f5",
Some("run-1"),
LifecycleDestroyOutcome::Succeeded,
None,
None,
Some(&bundle),
Some(ResidueClass::DocumentedException),
)
.unwrap();
assert_eq!(
data["evidenceBundleRef"],
"urn:cellos:evidence-bundle:run-1"
);
assert_eq!(data["residueClass"], "documented_exception");
}
#[test]
fn lifecycle_destroyed_evidence_bundle_and_residue_class_omitted_when_none() {
let spec = ExecutionCellSpec {
id: "f5-omitted".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let data = lifecycle_destroyed_data_v1(
&spec,
"c-f5-omit",
None,
LifecycleDestroyOutcome::Succeeded,
None,
None,
None,
None,
)
.unwrap();
let obj = data.as_object().unwrap();
assert!(
!obj.contains_key("evidenceBundleRef"),
"evidenceBundleRef must be omitted when None"
);
assert!(
!obj.contains_key("residueClass"),
"residueClass must be omitted when None"
);
}
#[test]
fn identity_materialized_matches_example_shape() {
let raw =
include_str!("../../../contracts/examples/execution-cell-github-oidc-s3.valid.json");
let doc: ExecutionCellDocument = serde_json::from_str(raw).unwrap();
let identity = doc.spec.identity.as_ref().expect("identity");
let data = identity_materialized_data_v1(&doc.spec, "host-xyz", Some("run-test"), identity)
.unwrap();
assert_eq!(data["identity"]["kind"], "federatedOidc");
assert_eq!(data["identity"]["provider"], "github-actions");
assert_eq!(data["identity"]["secretRef"], "AWS_WEB_IDENTITY");
assert_eq!(data["runId"], "run-test");
}
#[test]
fn identity_revoked_includes_reason() {
let spec = ExecutionCellSpec {
id: "s3".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: Some(WorkloadIdentity {
kind: WorkloadIdentityKind::FederatedOidc,
provider: "github-actions".into(),
audience: "sts.amazonaws.com".into(),
subject: None,
ttl_seconds: Some(900),
secret_ref: "AWS_WEB_IDENTITY".into(),
}),
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 3600 },
export: None,
telemetry: None,
};
let identity = spec.identity.as_ref().unwrap();
let data =
identity_revoked_data_v1(&spec, "c3", None, identity, Some("teardown"), None).unwrap();
assert_eq!(data["identity"]["audience"], "sts.amazonaws.com");
assert_eq!(data["reason"], "teardown");
}
#[test]
fn identity_failed_matches_example_shape() {
let raw =
include_str!("../../../contracts/examples/execution-cell-github-oidc-s3.valid.json");
let doc: ExecutionCellDocument = serde_json::from_str(raw).unwrap();
let expected: Value = serde_json::from_str(include_str!(
"../../../contracts/examples/cell-identity-failed-data.valid.json"
))
.unwrap();
let identity = doc.spec.identity.as_ref().expect("identity");
let data = identity_failed_data_v1(
&doc.spec,
"host-cell-demo",
Some("run-001"),
identity,
IdentityFailureOperation::Materialize,
"oidc exchange denied by upstream federation policy",
)
.unwrap();
assert_eq!(data, expected);
}
#[test]
fn export_completed_v2_matches_example_shape() {
let raw =
include_str!("../../../contracts/examples/execution-cell-github-oidc-s3.valid.json");
let doc: ExecutionCellDocument = serde_json::from_str(raw).unwrap();
let receipt = ExportReceipt {
target_kind: ExportReceiptTargetKind::S3,
target_name: Some("artifact-bucket".into()),
destination: "s3://acme-cellos-artifacts/github/acme/widget/123456789/test-results"
.into(),
bytes_written: 1024,
};
let data = export_completed_data_v2(
&doc.spec,
"host-xyz",
Some("run-test"),
"test-results",
&receipt,
None,
)
.unwrap();
assert_eq!(data["receipt"]["targetKind"], "s3");
assert_eq!(data["receipt"]["targetName"], "artifact-bucket");
assert_eq!(data["receipt"]["bytesWritten"], 1024);
}
#[test]
fn export_completed_v2_http_matches_example_shape() {
let raw = include_str!(
"../../../contracts/examples/execution-cell-github-oidc-multi-export.valid.json"
);
let doc: ExecutionCellDocument = serde_json::from_str(raw).unwrap();
let expected: Value = serde_json::from_str(include_str!(
"../../../contracts/examples/cell-export-v2-completed-data-http.valid.json"
))
.unwrap();
let receipt = ExportReceipt {
target_kind: ExportReceiptTargetKind::Http,
target_name: Some("artifact-api".into()),
destination: "https://artifacts.acme.internal/upload/host-cell-demo/coverage-summary"
.into(),
bytes_written: 512,
};
let data = export_completed_data_v2(
&doc.spec,
"host-cell-demo",
Some("run-002"),
"coverage-summary",
&receipt,
None,
)
.unwrap();
assert_eq!(data, expected);
}
#[test]
fn export_failed_v2_http_matches_example_shape() {
let raw = include_str!(
"../../../contracts/examples/execution-cell-github-oidc-multi-export.valid.json"
);
let doc: ExecutionCellDocument = serde_json::from_str(raw).unwrap();
let expected: Value = serde_json::from_str(include_str!(
"../../../contracts/examples/cell-export-v2-failed-data.valid.json"
))
.unwrap();
let data = export_failed_data_v2(
&doc.spec,
"host-cell-demo",
Some("run-002"),
"coverage-summary",
ExportReceiptTargetKind::Http,
Some("artifact-api"),
Some("https://artifacts.acme.internal/upload/host-cell-demo/coverage-summary"),
"http put returned 403 Forbidden",
None,
)
.unwrap();
assert_eq!(data, expected);
}
#[test]
fn compliance_summary_matches_example_shape() {
let raw =
include_str!("../../../contracts/examples/execution-cell-ci-correlation.valid.json");
let doc: ExecutionCellDocument = serde_json::from_str(raw).unwrap();
let expected: Value = serde_json::from_str(include_str!(
"../../../contracts/examples/cell-compliance-summary-data.valid.json"
))
.unwrap();
let data =
compliance_summary_data_v1(&doc.spec, "host-cell-demo", Some("run-003"), Some(0))
.unwrap();
assert_eq!(data, expected);
}
#[test]
fn compliance_summary_omits_placement_when_absent() {
let spec = ExecutionCellSpec {
id: "compliance-no-placement".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let data = compliance_summary_data_v1(&spec, "cell-001", None, None).unwrap();
assert!(!data.as_object().unwrap().contains_key("placement"));
}
#[test]
fn compliance_summary_with_empty_subjects_omits_field() {
let raw =
include_str!("../../../contracts/examples/execution-cell-ci-correlation.valid.json");
let doc: ExecutionCellDocument = serde_json::from_str(raw).unwrap();
let legacy =
compliance_summary_data_v1(&doc.spec, "host-cell-demo", Some("run-003"), Some(0))
.unwrap();
let with_empty = compliance_summary_data_v1_with_subjects(
&doc.spec,
"host-cell-demo",
Some("run-003"),
Some(0),
&[],
)
.unwrap();
assert_eq!(legacy, with_empty);
assert!(!with_empty.as_object().unwrap().contains_key("subjectUrns"));
}
#[test]
fn compliance_summary_with_subjects_matches_example_shape() {
let raw =
include_str!("../../../contracts/examples/execution-cell-ci-correlation.valid.json");
let doc: ExecutionCellDocument = serde_json::from_str(raw).unwrap();
let expected: Value = serde_json::from_str(include_str!(
"../../../contracts/examples/cell-compliance-summary-data-with-subjects.valid.json"
))
.unwrap();
let subjects: Vec<SubjectUrn> = vec![
SubjectUrn::parse("urn:cellos:cell:host-cell-demo").unwrap(),
SubjectUrn::parse("urn:tsafe:lease:lease-42").unwrap(),
SubjectUrn::parse("urn:cellos:export:run-003%2Fartifact-1").unwrap(),
];
let data = compliance_summary_data_v1_with_subjects(
&doc.spec,
"host-cell-demo",
Some("run-003"),
Some(0),
&subjects,
)
.unwrap();
assert_eq!(data, expected);
let urns = data["subjectUrns"].as_array().unwrap();
assert_eq!(urns.len(), 3);
assert_eq!(urns[0], "urn:cellos:cell:host-cell-demo");
}
#[test]
fn compliance_summary_invalid_subject_urns_fixture_is_malformed() {
let raw =
include_str!("../../../contracts/examples/cell-compliance-summary-data.invalid.json");
let v: Value = serde_json::from_str(raw).unwrap();
let urns = v["subjectUrns"]
.as_array()
.expect("invalid fixture must carry subjectUrns array");
assert!(!urns.is_empty(), "negative fixture must have entries");
fn matches_schema_shape(s: &str) -> bool {
let parts: Vec<&str> = s.splitn(4, ':').collect();
if parts.len() != 4 {
return false;
}
if parts[0] != "urn" {
return false;
}
let segment_ok = |seg: &str| {
let mut it = seg.chars();
match it.next() {
Some(c) if c.is_ascii_lowercase() || c.is_ascii_digit() => {}
_ => return false,
}
it.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
};
if !segment_ok(parts[1]) || !segment_ok(parts[2]) {
return false;
}
if parts[3].is_empty() {
return false;
}
parts[3]
.chars()
.all(|c| c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | ':' | '%' | '-'))
}
for (i, urn) in urns.iter().enumerate() {
let s = urn.as_str().unwrap_or("");
assert!(
!matches_schema_shape(s),
"invalid fixture entry [{i}] {s:?} unexpectedly matches the schema URN regex; \
fixture must remain a negative case"
);
}
}
#[test]
fn network_enforcement_matches_example_shape() {
let raw = include_str!(
"../../../contracts/examples/cell-observability-network-enforcement-data.valid.json"
);
let expected: Value = serde_json::from_str(raw).unwrap();
let spec = ExecutionCellSpec {
id: "net-enforcement-demo".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let data = observability_network_enforcement_data_v1(
&spec,
"net-enforcement-demo",
Some("run-local-001"),
true,
1,
1,
None,
)
.unwrap();
assert_eq!(data, expected);
}
#[test]
fn dns_resolution_matches_example_shape() {
let raw = include_str!(
"../../../contracts/examples/cell-observability-dns-resolution-data.valid.json"
);
let expected: Value = serde_json::from_str(raw).unwrap();
let spec = ExecutionCellSpec {
id: "demo-cell-dns".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let targets: &[(&str, &str, Option<u16>)] = &[
("203.0.113.10", "inet", Some(443)),
("2001:db8::1", "inet6", Some(443)),
];
let data = observability_dns_resolution_data_v1(
&spec,
"demo-cell-dns",
Some("run-001"),
"api.example.com",
"2026-04-30T12:00:00Z",
targets,
300,
"sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"keyset-demo-001",
"kid-resolver-01",
Some("rcpt-demo-0001"),
)
.unwrap();
assert_eq!(data, expected);
}
#[test]
fn dns_target_set_matches_example_shape() {
let raw = include_str!(
"../../../contracts/examples/cell-observability-dns-target-set-data.valid.json"
);
let expected: Value = serde_json::from_str(raw).unwrap();
let spec = ExecutionCellSpec {
id: "demo-cell-dns".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let data = observability_dns_target_set_data_v1(
&spec,
"demo-cell-dns",
Some("run-001"),
"cdn.example.com",
"empty",
"sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"refresh",
"2026-04-30T12:05:00Z",
"keyset-demo-001",
"kid-resolver-01",
)
.unwrap();
assert_eq!(data, expected);
}
#[test]
fn dns_query_data_v1_serializes_allow_path() {
use crate::{DnsQueryDecision, DnsQueryEvent, DnsQueryReasonCode, DnsQueryType};
let ev = DnsQueryEvent {
schema_version: "1.0.0".into(),
cell_id: "demo-cell-dns".into(),
run_id: "run-2026-05-01-001".into(),
query_id: "q-3b58b2a4-e4bb-4f89-9c4f-2a0a2c8b6f01".into(),
query_name: "api.example.com".into(),
query_type: DnsQueryType::A,
decision: DnsQueryDecision::Allow,
reason_code: DnsQueryReasonCode::AllowedByAllowlist,
response_rcode: Some(0),
upstream_resolver_id: Some("resolver-do53-internal".into()),
upstream_latency_ms: Some(4),
response_target_count: Some(2),
keyset_id: Some("keyset-demo-001".into()),
issuer_kid: Some("kid-resolver-01".into()),
policy_digest: Some(
"sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into(),
),
correlation_id: Some("corr-demo-0001".into()),
observed_at: "2026-05-01T12:34:56Z".into(),
};
let v = dns_query_data_v1(&ev).unwrap();
assert_eq!(v["schemaVersion"], "1.0.0");
assert_eq!(v["queryName"], "api.example.com");
assert_eq!(v["queryType"], "A");
assert_eq!(v["decision"], "allow");
assert_eq!(v["reasonCode"], "allowed_by_allowlist");
assert_eq!(v["upstreamResolverId"], "resolver-do53-internal");
assert_eq!(v["responseTargetCount"], 2);
}
#[test]
fn dns_query_data_v1_omits_optionals_on_deny_path() {
use crate::{DnsQueryDecision, DnsQueryEvent, DnsQueryReasonCode, DnsQueryType};
let ev = DnsQueryEvent {
schema_version: "1.0.0".into(),
cell_id: "demo-cell-dns".into(),
run_id: "run-2026-05-01-001".into(),
query_id: "q-deny-001".into(),
query_name: "blocked.example.com".into(),
query_type: DnsQueryType::AAAA,
decision: DnsQueryDecision::Deny,
reason_code: DnsQueryReasonCode::DeniedNotInAllowlist,
response_rcode: Some(5),
upstream_resolver_id: None,
upstream_latency_ms: None,
response_target_count: Some(0),
keyset_id: None,
issuer_kid: None,
policy_digest: None,
correlation_id: None,
observed_at: "2026-05-01T12:35:00Z".into(),
};
let v = dns_query_data_v1(&ev).unwrap();
let obj = v.as_object().unwrap();
assert!(!obj.contains_key("upstreamResolverId"));
assert!(!obj.contains_key("upstreamLatencyMs"));
assert!(!obj.contains_key("keysetId"));
assert!(!obj.contains_key("issuerKid"));
assert!(!obj.contains_key("policyDigest"));
assert!(!obj.contains_key("correlationId"));
assert_eq!(v["decision"], "deny");
assert_eq!(v["reasonCode"], "denied_not_in_allowlist");
assert_eq!(v["responseRcode"], 5);
}
#[test]
fn cloud_event_v1_dns_query_envelope() {
use crate::{DnsQueryDecision, DnsQueryEvent, DnsQueryReasonCode, DnsQueryType};
let ev = DnsQueryEvent {
schema_version: "1.0.0".into(),
cell_id: "c1".into(),
run_id: "r1".into(),
query_id: "q1".into(),
query_name: "api.example.com".into(),
query_type: DnsQueryType::A,
decision: DnsQueryDecision::Allow,
reason_code: DnsQueryReasonCode::AllowedByAllowlist,
response_rcode: Some(0),
upstream_resolver_id: Some("r-001".into()),
upstream_latency_ms: Some(3),
response_target_count: Some(1),
keyset_id: None,
issuer_kid: None,
policy_digest: None,
correlation_id: None,
observed_at: "2026-05-01T12:34:56Z".into(),
};
let env =
cloud_event_v1_dns_query("cellos-dns-proxy", "2026-05-01T12:34:56Z", &ev).unwrap();
assert_eq!(env.specversion, "1.0");
assert_eq!(env.ty, "dev.cellos.events.cell.observability.v1.dns_query");
assert_eq!(env.source, "cellos-dns-proxy");
assert_eq!(env.datacontenttype.as_deref(), Some("application/json"));
assert!(env.data.is_some());
}
#[test]
fn qtype_mapping_covers_phase1_set() {
use crate::{qtype_to_dns_query_type, DnsQueryType};
assert_eq!(qtype_to_dns_query_type(1), Some(DnsQueryType::A));
assert_eq!(qtype_to_dns_query_type(2), Some(DnsQueryType::NS));
assert_eq!(qtype_to_dns_query_type(5), Some(DnsQueryType::CNAME));
assert_eq!(qtype_to_dns_query_type(12), Some(DnsQueryType::PTR));
assert_eq!(qtype_to_dns_query_type(15), Some(DnsQueryType::MX));
assert_eq!(qtype_to_dns_query_type(16), Some(DnsQueryType::TXT));
assert_eq!(qtype_to_dns_query_type(28), Some(DnsQueryType::AAAA));
assert_eq!(qtype_to_dns_query_type(33), Some(DnsQueryType::SRV));
assert_eq!(qtype_to_dns_query_type(64), Some(DnsQueryType::SVCB));
assert_eq!(qtype_to_dns_query_type(65), Some(DnsQueryType::HTTPS));
assert_eq!(qtype_to_dns_query_type(0), None);
assert_eq!(qtype_to_dns_query_type(99), None);
assert_eq!(qtype_to_dns_query_type(255), None); }
#[test]
fn l7_egress_decision_matches_example_shape() {
let raw = include_str!(
"../../../contracts/examples/cell-observability-l7-egress-decision-data.valid.json"
);
let expected: Value = serde_json::from_str(raw).unwrap();
let spec = ExecutionCellSpec {
id: "demo-cell-dns".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let data = observability_l7_egress_decision_data_v1(
&spec,
"demo-cell-dns",
Some("run-001"),
"l7-demo-0002",
"deny",
"blocked.example.com",
"sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"keyset-demo-001",
"kid-l7-01",
"deny_default",
Some("authority.egressRules.default"),
None, )
.unwrap();
assert_eq!(data, expected);
}
#[test]
fn l7_egress_decision_with_stream_id_emits_field() {
let spec = ExecutionCellSpec {
id: "demo-cell-dns".into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
};
let data = observability_l7_egress_decision_data_v1(
&spec,
"demo-cell-dns",
Some("run-001"),
"l7-demo-0003",
"deny",
"evil.example.com",
"sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"keyset-demo-001",
"kid-l7-01",
"l7_h2_authority_allowlist_miss",
None,
Some(3), )
.unwrap();
assert_eq!(data["streamId"], serde_json::json!(3));
}
fn _seam_g1_g2_minimal_spec(id: &str) -> ExecutionCellSpec {
ExecutionCellSpec {
id: id.into(),
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: None,
authority: Default::default(),
lifetime: Lifetime { ttl_seconds: 60 },
export: None,
telemetry: None,
}
}
#[test]
fn seam_g2_identity_revoked_includes_provenance_when_set() {
let mut spec = _seam_g1_g2_minimal_spec("seam-g2-revoke");
spec.identity = Some(WorkloadIdentity {
kind: WorkloadIdentityKind::FederatedOidc,
provider: "github-actions".into(),
audience: "sts.amazonaws.com".into(),
subject: None,
ttl_seconds: Some(900),
secret_ref: "AWS_WEB_IDENTITY".into(),
});
let identity = spec.identity.as_ref().unwrap();
let prov = Provenance {
parent: "urn:cellos:event:00000000-0000-0000-0000-00000000abcd".into(),
parent_type: "dev.cellos.events.cell.lifecycle.v1.started".into(),
};
let data = identity_revoked_data_v1(
&spec,
"cell-seam-g2",
Some("run-seam-g2"),
identity,
Some("teardown"),
Some(&prov),
)
.unwrap();
assert_eq!(
data["provenance"]["parent"],
"urn:cellos:event:00000000-0000-0000-0000-00000000abcd"
);
assert_eq!(
data["provenance"]["parentType"],
"dev.cellos.events.cell.lifecycle.v1.started"
);
}
#[test]
fn seam_g2_identity_revoked_omits_provenance_when_none() {
let mut spec = _seam_g1_g2_minimal_spec("seam-g2-revoke-no-prov");
spec.identity = Some(WorkloadIdentity {
kind: WorkloadIdentityKind::FederatedOidc,
provider: "github-actions".into(),
audience: "sts.amazonaws.com".into(),
subject: None,
ttl_seconds: Some(900),
secret_ref: "AWS_WEB_IDENTITY".into(),
});
let identity = spec.identity.as_ref().unwrap();
let data =
identity_revoked_data_v1(&spec, "cell-x", None, identity, Some("teardown"), None)
.unwrap();
assert!(!data.as_object().unwrap().contains_key("provenance"));
}
#[test]
fn seam_g2_export_completed_v2_includes_provenance_when_set() {
let spec = _seam_g1_g2_minimal_spec("seam-g2-export");
let receipt = ExportReceipt {
target_kind: ExportReceiptTargetKind::Local,
target_name: None,
destination: "/tmp/out/run-1/artifact.json".into(),
bytes_written: 42,
};
let prov = Provenance {
parent: "urn:cellos:event:11111111-1111-1111-1111-111111111111".into(),
parent_type: "dev.cellos.events.cell.lifecycle.v1.started".into(),
};
let data = export_completed_data_v2(
&spec,
"cell-export",
Some("run-export"),
"artifact",
&receipt,
Some(&prov),
)
.unwrap();
assert_eq!(
data["provenance"]["parent"],
"urn:cellos:event:11111111-1111-1111-1111-111111111111"
);
assert_eq!(
data["provenance"]["parentType"],
"dev.cellos.events.cell.lifecycle.v1.started"
);
}
#[test]
fn seam_g2_export_failed_v2_includes_provenance_when_set() {
let spec = _seam_g1_g2_minimal_spec("seam-g2-export-failed");
let prov = Provenance {
parent: "urn:cellos:event:22222222-2222-2222-2222-222222222222".into(),
parent_type: "dev.cellos.events.cell.lifecycle.v1.started".into(),
};
let data = export_failed_data_v2(
&spec,
"cell-fail",
Some("run-fail"),
"artifact",
ExportReceiptTargetKind::S3,
Some("bucket"),
Some("s3://bucket/artifact"),
"denied by policy",
Some(&prov),
)
.unwrap();
assert_eq!(
data["provenance"]["parent"],
"urn:cellos:event:22222222-2222-2222-2222-222222222222"
);
assert_eq!(data["reason"], "denied by policy");
}
#[test]
fn seam_g1_correlation_id_propagates_when_present_in_spec() {
let mut spec = _seam_g1_g2_minimal_spec("seam-g1-corr");
spec.correlation = Some(Correlation {
platform: None,
external_run_id: None,
external_job_id: None,
tenant_id: None,
labels: None,
correlation_id: Some("urn:tsafe:corr:01J".into()),
});
spec.identity = Some(WorkloadIdentity {
kind: WorkloadIdentityKind::FederatedOidc,
provider: "github-actions".into(),
audience: "sts.amazonaws.com".into(),
subject: None,
ttl_seconds: Some(900),
secret_ref: "AWS_WEB_IDENTITY".into(),
});
let identity = spec.identity.as_ref().unwrap();
let data =
identity_revoked_data_v1(&spec, "cell-1", Some("r"), identity, Some("teardown"), None)
.unwrap();
assert_eq!(
data["correlation"]["correlationId"], "urn:tsafe:corr:01J",
"identity.revoked must mirror correlationId from spec"
);
let receipt = ExportReceipt {
target_kind: ExportReceiptTargetKind::Local,
target_name: None,
destination: "/tmp/x".into(),
bytes_written: 1,
};
let data = export_completed_data_v2(&spec, "c", Some("r"), "art", &receipt, None).unwrap();
assert_eq!(
data["correlation"]["correlationId"], "urn:tsafe:corr:01J",
"export.v2.completed must mirror correlationId from spec"
);
let data = export_failed_data_v2(
&spec,
"c",
Some("r"),
"art",
ExportReceiptTargetKind::Local,
None,
None,
"boom",
None,
)
.unwrap();
assert_eq!(
data["correlation"]["correlationId"], "urn:tsafe:corr:01J",
"export.v2.failed must mirror correlationId from spec"
);
let data =
command_completed_data_v1(&spec, "c", Some("r"), &["echo".to_string()], 0, 5, None)
.unwrap();
assert_eq!(
data["correlation"]["correlationId"], "urn:tsafe:corr:01J",
"command.completed must mirror correlationId from spec"
);
let data = compliance_summary_data_v1(&spec, "c", Some("r"), Some(0)).unwrap();
assert_eq!(
data["correlation"]["correlationId"], "urn:tsafe:corr:01J",
"compliance.summary must mirror correlationId from spec"
);
}
#[test]
fn subject_urn_accepts_canonical_cell_form() {
let urn = SubjectUrn::parse("urn:cellos:cell:abc-123").expect("must parse");
assert_eq!(urn.as_str(), "urn:cellos:cell:abc-123");
}
#[test]
fn subject_urn_accepts_id_with_internal_colons() {
let urn = SubjectUrn::parse("urn:cellos:event:abc:01j").expect("must parse");
assert_eq!(urn.as_str(), "urn:cellos:event:abc:01j");
}
#[test]
fn subject_urn_rejects_when_no_urn_scheme() {
let err = SubjectUrn::parse("cell:abc-123").unwrap_err();
assert_eq!(err, SubjectUrnError::MissingUrnScheme);
}
#[test]
fn subject_urn_rejects_three_segment_form() {
let err = SubjectUrn::parse("urn:cellos:cell").unwrap_err();
assert_eq!(err, SubjectUrnError::TooFewSegments);
}
#[test]
fn subject_urn_rejects_empty_id() {
let err = SubjectUrn::parse("urn:cellos:cell:").unwrap_err();
assert_eq!(err, SubjectUrnError::EmptySegment);
}
#[test]
fn subject_urn_rejects_uppercase_tool_or_kind() {
let err = SubjectUrn::parse("urn:CellOS:cell:abc-123").unwrap_err();
assert_eq!(err, SubjectUrnError::InvalidToolOrKindCharset);
}
#[test]
fn subject_urn_rejects_embedded_whitespace() {
let err = SubjectUrn::parse("urn:cellos:cell:abc 123").unwrap_err();
assert_eq!(err, SubjectUrnError::ControlOrWhitespace);
}
#[test]
fn subject_urn_rejects_empty_tool_segment() {
let err = SubjectUrn::parse("urn::cell:abc-123").unwrap_err();
assert_eq!(err, SubjectUrnError::EmptySegment);
}
#[test]
fn cell_subject_urn_helper_round_trips() {
let urn = cell_subject_urn("cell-host-7").expect("helper must accept ASCII id");
assert_eq!(urn.as_str(), "urn:cellos:cell:cell-host-7");
let reparsed = SubjectUrn::parse(urn.as_str()).expect("must reparse");
assert_eq!(reparsed, urn);
}
#[test]
fn cell_subject_urn_helper_rejects_empty_id() {
let err = cell_subject_urn("").unwrap_err();
assert_eq!(err, SubjectUrnError::EmptySegment);
}
#[test]
fn formation_data_v1_shape_happy_path() {
let data = formation_data_v1("f-123", "demo-formation", 3, &[], None);
assert_eq!(data["formationId"], json!("f-123"));
assert_eq!(data["formationName"], json!("demo-formation"));
assert_eq!(data["cellCount"], json!(3));
assert_eq!(data["failedCellIds"], json!([] as [String; 0]));
let obj = data.as_object().unwrap();
assert!(!obj.contains_key("reason"));
}
#[test]
fn formation_data_v1_shape_degraded_path_includes_failed_cells_and_reason() {
let failed = vec!["cell-a".to_string(), "cell-b".to_string()];
let data = formation_data_v1(
"f-123",
"demo-formation",
5,
&failed,
Some("2/5 cells exited non-zero"),
);
assert_eq!(data["failedCellIds"], json!(failed));
assert_eq!(data["reason"], json!("2/5 cells exited non-zero"));
}
#[test]
fn formation_created_envelope_carries_correct_urn() {
let ev = cloud_event_v1_formation_created(
"cellos-supervisor",
"2026-05-16T00:00:00Z",
"f-1",
"demo",
2,
&[],
None,
);
assert_eq!(ev.ty, "dev.cellos.events.cell.formation.v1.created");
assert_eq!(ev.specversion, "1.0");
assert_eq!(ev.source, "cellos-supervisor");
assert!(ev.data.is_some());
}
#[test]
fn formation_launching_envelope_carries_correct_urn() {
let ev = cloud_event_v1_formation_launching(
"cellos-supervisor",
"2026-05-16T00:00:00Z",
"f-1",
"demo",
2,
&[],
None,
);
assert_eq!(ev.ty, "dev.cellos.events.cell.formation.v1.launching");
}
#[test]
fn formation_running_envelope_carries_correct_urn() {
let ev = cloud_event_v1_formation_running(
"cellos-supervisor",
"2026-05-16T00:00:00Z",
"f-1",
"demo",
2,
&[],
None,
);
assert_eq!(ev.ty, "dev.cellos.events.cell.formation.v1.running");
}
#[test]
fn formation_degraded_envelope_carries_correct_urn_and_failed_cells() {
let failed = vec!["cell-a".to_string()];
let ev = cloud_event_v1_formation_degraded(
"cellos-supervisor",
"2026-05-16T00:00:00Z",
"f-1",
"demo",
3,
&failed,
Some("one cell exited 1"),
);
assert_eq!(ev.ty, "dev.cellos.events.cell.formation.v1.degraded");
let data = ev.data.unwrap();
assert_eq!(data["failedCellIds"], json!(failed));
assert_eq!(data["reason"], json!("one cell exited 1"));
}
#[test]
fn formation_completed_envelope_carries_correct_urn() {
let ev = cloud_event_v1_formation_completed(
"cellos-supervisor",
"2026-05-16T00:00:00Z",
"f-1",
"demo",
2,
&[],
None,
);
assert_eq!(ev.ty, "dev.cellos.events.cell.formation.v1.completed");
}
#[test]
fn formation_failed_envelope_carries_correct_urn_and_reason() {
let failed = vec!["cell-a".to_string(), "cell-b".to_string()];
let ev = cloud_event_v1_formation_failed(
"cellos-supervisor",
"2026-05-16T00:00:00Z",
"f-1",
"demo",
2,
&failed,
Some("all cells exited non-zero"),
);
assert_eq!(ev.ty, "dev.cellos.events.cell.formation.v1.failed");
let data = ev.data.unwrap();
assert_eq!(data["failedCellIds"], json!(failed));
assert_eq!(data["reason"], json!("all cells exited non-zero"));
}
#[test]
fn formation_type_constants_match_envelope_urns() {
assert_eq!(
FORMATION_CREATED_TYPE,
"dev.cellos.events.cell.formation.v1.created"
);
assert_eq!(
FORMATION_LAUNCHING_TYPE,
"dev.cellos.events.cell.formation.v1.launching"
);
assert_eq!(
FORMATION_RUNNING_TYPE,
"dev.cellos.events.cell.formation.v1.running"
);
assert_eq!(
FORMATION_DEGRADED_TYPE,
"dev.cellos.events.cell.formation.v1.degraded"
);
assert_eq!(
FORMATION_COMPLETED_TYPE,
"dev.cellos.events.cell.formation.v1.completed"
);
assert_eq!(
FORMATION_FAILED_TYPE,
"dev.cellos.events.cell.formation.v1.failed"
);
}
}