use std::sync::Arc;
use async_trait::async_trait;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use base64::Engine as _;
use cellos_core::ports::EventSink;
use cellos_core::{
sign_event_ed25519, sign_event_hmac_sha256, CellosError, CloudEventV1, SignedEventEnvelopeV1,
};
pub const SIGNED_ENVELOPE_TRANSPORT_TYPE: &str = "dev.cellos.events.signed_envelope.v1";
#[derive(Debug, Clone)]
pub struct EventSigningConfigWarning {
pub var: &'static str,
pub value: String,
pub reason: String,
}
const WRAPPER_SOURCE: &str = "/cellos-supervisor/event-signing";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Algorithm {
Ed25519,
HmacSha256,
}
#[derive(Clone, zeroize::ZeroizeOnDrop)]
struct SigningConfig {
#[zeroize(skip)]
algorithm: Algorithm,
#[zeroize(skip)]
kid: String,
key_bytes: zeroize::Zeroizing<Vec<u8>>,
}
pub struct SigningEventSink {
inner: Arc<dyn EventSink>,
cfg: SigningConfig,
}
impl SigningEventSink {
fn new(inner: Arc<dyn EventSink>, cfg: SigningConfig) -> Self {
Self { inner, cfg }
}
pub fn from_env(inner: Arc<dyn EventSink>) -> Arc<dyn EventSink> {
let (sink, _warnings) = Self::from_env_with_warnings(inner);
sink
}
pub fn from_env_with_warnings(
inner: Arc<dyn EventSink>,
) -> (Arc<dyn EventSink>, Vec<EventSigningConfigWarning>) {
let mut warnings: Vec<EventSigningConfigWarning> = Vec::new();
let toggle = match std::env::var("CELLOS_EVENT_SIGNING") {
Ok(v) => v,
Err(_) => return (inner, warnings),
};
let toggle_norm = toggle.trim().to_ascii_lowercase();
if toggle_norm.is_empty() || toggle_norm == "off" {
return (inner, warnings);
}
let algorithm = match toggle_norm.as_str() {
"ed25519" => Algorithm::Ed25519,
"hmac" | "hmac-sha256" => Algorithm::HmacSha256,
other => {
tracing::warn!(
target: "cellos.supervisor.event_signing",
toggle = %other,
"CELLOS_EVENT_SIGNING: unknown algorithm — signing disabled (expected off|ed25519|hmac)"
);
warnings.push(EventSigningConfigWarning {
var: "CELLOS_EVENT_SIGNING",
value: other.to_string(),
reason: "unknown algorithm (expected off|ed25519|hmac); signing disabled"
.into(),
});
return (inner, warnings);
}
};
let kid = match std::env::var("CELLOS_EVENT_SIGNING_KID") {
Ok(k) => k.trim().to_string(),
Err(_) => {
tracing::warn!(
target: "cellos.supervisor.event_signing",
"CELLOS_EVENT_SIGNING is set but CELLOS_EVENT_SIGNING_KID is missing — signing disabled"
);
warnings.push(EventSigningConfigWarning {
var: "CELLOS_EVENT_SIGNING_KID",
value: String::new(),
reason: format!(
"CELLOS_EVENT_SIGNING={toggle_norm} requires a kid but CELLOS_EVENT_SIGNING_KID is unset; signing disabled"
),
});
return (inner, warnings);
}
};
if kid.is_empty() {
tracing::warn!(
target: "cellos.supervisor.event_signing",
"CELLOS_EVENT_SIGNING_KID is empty — signing disabled"
);
warnings.push(EventSigningConfigWarning {
var: "CELLOS_EVENT_SIGNING_KID",
value: String::new(),
reason: "kid is empty; signing disabled".into(),
});
return (inner, warnings);
}
let key_b64 = match std::env::var("CELLOS_EVENT_SIGNING_KEY_BASE64") {
Ok(k) => k,
Err(_) => {
tracing::warn!(
target: "cellos.supervisor.event_signing",
"CELLOS_EVENT_SIGNING is set but CELLOS_EVENT_SIGNING_KEY_BASE64 is missing — signing disabled"
);
warnings.push(EventSigningConfigWarning {
var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
value: String::new(),
reason: format!(
"CELLOS_EVENT_SIGNING={toggle_norm} requires a key but CELLOS_EVENT_SIGNING_KEY_BASE64 is unset; signing disabled"
),
});
return (inner, warnings);
}
};
let trimmed = key_b64.trim().trim_end_matches('=');
let key_bytes: zeroize::Zeroizing<Vec<u8>> = match URL_SAFE_NO_PAD.decode(trimmed) {
Ok(b) => zeroize::Zeroizing::new(b),
Err(e) => {
tracing::warn!(
target: "cellos.supervisor.event_signing",
error = %e,
"CELLOS_EVENT_SIGNING_KEY_BASE64: invalid base64url — signing disabled"
);
warnings.push(EventSigningConfigWarning {
var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
value: "<base64 decode failed>".into(),
reason: format!("invalid base64url: {e}; signing disabled"),
});
return (inner, warnings);
}
};
match algorithm {
Algorithm::Ed25519 if key_bytes.len() != 32 => {
tracing::warn!(
target: "cellos.supervisor.event_signing",
got_bytes = key_bytes.len(),
"CELLOS_EVENT_SIGNING=ed25519 requires a 32-byte key — signing disabled"
);
warnings.push(EventSigningConfigWarning {
var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
value: format!("<{} bytes>", key_bytes.len()),
reason: format!(
"CELLOS_EVENT_SIGNING=ed25519 requires a 32-byte key, got {}; signing disabled",
key_bytes.len()
),
});
return (inner, warnings);
}
Algorithm::HmacSha256 if key_bytes.is_empty() => {
tracing::warn!(
target: "cellos.supervisor.event_signing",
"CELLOS_EVENT_SIGNING=hmac requires a non-empty key — signing disabled"
);
warnings.push(EventSigningConfigWarning {
var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
value: "<empty>".into(),
reason: "CELLOS_EVENT_SIGNING=hmac requires a non-empty key; signing disabled"
.into(),
});
return (inner, warnings);
}
_ => {}
}
let algo_label = match algorithm {
Algorithm::Ed25519 => "ed25519",
Algorithm::HmacSha256 => "hmac-sha256",
};
tracing::info!(
target: "cellos.supervisor.event_signing",
algorithm = %algo_label,
kid = %kid,
"per-event signing enabled"
);
let sink: Arc<dyn EventSink> = Arc::new(Self::new(
inner,
SigningConfig {
algorithm,
kid,
key_bytes,
},
));
(sink, warnings)
}
fn wrap(&self, event: &CloudEventV1) -> Result<CloudEventV1, CellosError> {
let envelope: SignedEventEnvelopeV1 = match self.cfg.algorithm {
Algorithm::Ed25519 => {
let array: [u8; 32] = self.cfg.key_bytes.as_slice().try_into().map_err(|_| {
CellosError::InvalidSpec(format!(
"event signing: ed25519 key must be 32 bytes, got {}",
self.cfg.key_bytes.len()
))
})?;
let signing_key = ed25519_dalek::SigningKey::from_bytes(&array);
sign_event_ed25519(event, &self.cfg.kid, &signing_key)?
}
Algorithm::HmacSha256 => {
sign_event_hmac_sha256(event, &self.cfg.kid, &self.cfg.key_bytes)?
}
};
let data = serde_json::to_value(&envelope).map_err(|e| {
CellosError::InvalidSpec(format!("event signing: serialize envelope: {e}"))
})?;
Ok(CloudEventV1 {
specversion: "1.0".into(),
id: event.id.clone(),
source: WRAPPER_SOURCE.to_string(),
ty: SIGNED_ENVELOPE_TRANSPORT_TYPE.to_string(),
datacontenttype: Some("application/json".into()),
data: Some(data),
time: event.time.clone(),
traceparent: event.traceparent.clone(),
})
}
}
#[async_trait]
impl EventSink for SigningEventSink {
async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
let wrapped = self.wrap(event)?;
self.inner.emit(&wrapped).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use cellos_core::{verify_signed_event_envelope, CloudEventV1};
use ed25519_dalek::SigningKey;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Mutex;
struct CaptureSink(Mutex<Option<CloudEventV1>>);
impl CaptureSink {
fn new() -> Arc<Self> {
Arc::new(Self(Mutex::new(None)))
}
fn last(&self) -> Option<CloudEventV1> {
self.0.lock().unwrap().clone()
}
}
#[async_trait]
impl EventSink for CaptureSink {
async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
*self.0.lock().unwrap() = Some(event.clone());
Ok(())
}
}
fn sample_event() -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: "ev-001".into(),
source: "/cellos-supervisor".into(),
ty: "dev.cellos.events.cell.lifecycle.v1.started".into(),
datacontenttype: Some("application/json".into()),
data: Some(json!({"cellId": "test-cell-1"})),
time: Some("2026-05-06T12:00:00Z".into()),
traceparent: None,
}
}
static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
fn clear_signing_env() {
std::env::remove_var("CELLOS_EVENT_SIGNING");
std::env::remove_var("CELLOS_EVENT_SIGNING_KID");
std::env::remove_var("CELLOS_EVENT_SIGNING_KEY_BASE64");
}
#[tokio::test]
async fn from_env_off_passes_through_unwrapped() {
let capture = CaptureSink::new();
let sink = {
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
std::env::set_var("CELLOS_EVENT_SIGNING", "off");
SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
};
let event = sample_event();
sink.emit(&event).await.unwrap();
{
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
}
let got = capture.last().unwrap();
assert_eq!(
got.ty, "dev.cellos.events.cell.lifecycle.v1.started",
"off must pass through the original event unchanged"
);
}
#[tokio::test]
async fn from_env_unknown_toggle_disables_signing() {
let capture = CaptureSink::new();
let sink = {
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
std::env::set_var("CELLOS_EVENT_SIGNING", "rsa-pss-sha512");
std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-event-2026-q2");
std::env::set_var(
"CELLOS_EVENT_SIGNING_KEY_BASE64",
URL_SAFE_NO_PAD.encode([7u8; 32]),
);
SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
};
let event = sample_event();
sink.emit(&event).await.unwrap();
{
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
}
let got = capture.last().unwrap();
assert_eq!(
got.ty, "dev.cellos.events.cell.lifecycle.v1.started",
"unknown algorithm must fall back to passthrough"
);
}
#[tokio::test]
async fn from_env_ed25519_round_trip_via_sink() {
let capture = CaptureSink::new();
let signer_seed = [13u8; 32];
let signer = SigningKey::from_bytes(&signer_seed);
let sink = {
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
std::env::set_var("CELLOS_EVENT_SIGNING", "ed25519");
std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-event-2026-q2");
std::env::set_var(
"CELLOS_EVENT_SIGNING_KEY_BASE64",
URL_SAFE_NO_PAD.encode(signer_seed),
);
SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
};
let event = sample_event();
sink.emit(&event).await.unwrap();
{
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
}
let got = capture.last().expect("wrapper emitted");
assert_eq!(got.ty, SIGNED_ENVELOPE_TRANSPORT_TYPE);
let envelope: SignedEventEnvelopeV1 =
serde_json::from_value(got.data.expect("wrapper has data")).expect("parse envelope");
assert_eq!(envelope.algorithm, "ed25519");
assert_eq!(envelope.signer_kid, "ops-event-2026-q2");
let mut keys = HashMap::new();
keys.insert("ops-event-2026-q2".to_string(), signer.verifying_key());
let hmac_keys: HashMap<String, Vec<u8>> = HashMap::new();
let inner = verify_signed_event_envelope(&envelope, &keys, &hmac_keys).expect("verify ok");
assert_eq!(inner.id, event.id, "inner event id round-trips");
assert_eq!(inner.ty, event.ty);
}
#[tokio::test]
async fn from_env_hmac_round_trip_via_sink() {
let capture = CaptureSink::new();
let key = b"super-secret-shared-symmetric-key-bytes-padded";
let sink = {
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
std::env::set_var("CELLOS_EVENT_SIGNING", "hmac");
std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-hmac-2026-q2");
std::env::set_var(
"CELLOS_EVENT_SIGNING_KEY_BASE64",
URL_SAFE_NO_PAD.encode(key),
);
SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
};
let event = sample_event();
sink.emit(&event).await.unwrap();
{
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
}
let got = capture.last().expect("wrapper emitted");
assert_eq!(got.ty, SIGNED_ENVELOPE_TRANSPORT_TYPE);
let envelope: SignedEventEnvelopeV1 =
serde_json::from_value(got.data.expect("wrapper has data")).expect("parse envelope");
assert_eq!(envelope.algorithm, "hmac-sha256");
let verifying_keys: HashMap<String, ed25519_dalek::VerifyingKey> = HashMap::new();
let mut hmac_keys: HashMap<String, Vec<u8>> = HashMap::new();
hmac_keys.insert("ops-hmac-2026-q2".to_string(), key.to_vec());
let inner = verify_signed_event_envelope(&envelope, &verifying_keys, &hmac_keys)
.expect("verify ok");
assert_eq!(inner.id, event.id);
}
#[tokio::test]
async fn post_sign_event_mutation_fails_verification() {
let capture = CaptureSink::new();
let signer_seed = [29u8; 32];
let signer = SigningKey::from_bytes(&signer_seed);
let sink = {
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
std::env::set_var("CELLOS_EVENT_SIGNING", "ed25519");
std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-event-2026-q2");
std::env::set_var(
"CELLOS_EVENT_SIGNING_KEY_BASE64",
URL_SAFE_NO_PAD.encode(signer_seed),
);
SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
};
let event = sample_event();
sink.emit(&event).await.unwrap();
{
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
}
let got = capture.last().expect("wrapper emitted");
let mut envelope: SignedEventEnvelopeV1 =
serde_json::from_value(got.data.expect("wrapper has data")).expect("parse envelope");
envelope.event.id = "ev-tampered".into();
let mut keys = HashMap::new();
keys.insert("ops-event-2026-q2".to_string(), signer.verifying_key());
let hmac_keys: HashMap<String, Vec<u8>> = HashMap::new();
let err = verify_signed_event_envelope(&envelope, &keys, &hmac_keys)
.expect_err("post-sign mutation must fail verification");
assert!(format!("{err}").contains("ed25519 verify failed"));
}
#[tokio::test]
async fn from_env_missing_kid_disables_signing() {
let capture = CaptureSink::new();
let sink = {
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
std::env::set_var("CELLOS_EVENT_SIGNING", "ed25519");
std::env::set_var(
"CELLOS_EVENT_SIGNING_KEY_BASE64",
URL_SAFE_NO_PAD.encode([7u8; 32]),
);
SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
};
let event = sample_event();
sink.emit(&event).await.unwrap();
{
let _g = FROM_ENV_MUTEX.lock().unwrap();
clear_signing_env();
}
let got = capture.last().expect("event emitted");
assert_eq!(
got.ty, "dev.cellos.events.cell.lifecycle.v1.started",
"missing kid must fall back to passthrough"
);
}
}