1use std::sync::Arc;
44
45use async_trait::async_trait;
46use base64::engine::general_purpose::URL_SAFE_NO_PAD;
47use base64::Engine as _;
48use cellos_core::ports::EventSink;
49use cellos_core::{
50 sign_event_ed25519, sign_event_hmac_sha256, CellosError, CloudEventV1, SignedEventEnvelopeV1,
51};
52
53pub const SIGNED_ENVELOPE_TRANSPORT_TYPE: &str = "dev.cellos.events.signed_envelope.v1";
59
60#[derive(Debug, Clone)]
71pub struct EventSigningConfigWarning {
72 pub var: &'static str,
74 pub value: String,
78 pub reason: String,
80}
81
82const WRAPPER_SOURCE: &str = "/cellos-supervisor/event-signing";
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88enum Algorithm {
89 Ed25519,
90 HmacSha256,
91}
92
93#[derive(Clone, zeroize::ZeroizeOnDrop)]
104struct SigningConfig {
105 #[zeroize(skip)]
106 algorithm: Algorithm,
107 #[zeroize(skip)]
108 kid: String,
109 key_bytes: zeroize::Zeroizing<Vec<u8>>,
112}
113
114pub struct SigningEventSink {
117 inner: Arc<dyn EventSink>,
118 cfg: SigningConfig,
119}
120
121impl SigningEventSink {
122 fn new(inner: Arc<dyn EventSink>, cfg: SigningConfig) -> Self {
125 Self { inner, cfg }
126 }
127
128 pub fn from_env(inner: Arc<dyn EventSink>) -> Arc<dyn EventSink> {
139 let (sink, _warnings) = Self::from_env_with_warnings(inner);
140 sink
141 }
142
143 pub fn from_env_with_warnings(
153 inner: Arc<dyn EventSink>,
154 ) -> (Arc<dyn EventSink>, Vec<EventSigningConfigWarning>) {
155 let mut warnings: Vec<EventSigningConfigWarning> = Vec::new();
156
157 let toggle = match std::env::var("CELLOS_EVENT_SIGNING") {
158 Ok(v) => v,
159 Err(_) => return (inner, warnings),
160 };
161 let toggle_norm = toggle.trim().to_ascii_lowercase();
162 if toggle_norm.is_empty() || toggle_norm == "off" {
163 return (inner, warnings);
164 }
165
166 let algorithm = match toggle_norm.as_str() {
167 "ed25519" => Algorithm::Ed25519,
168 "hmac" | "hmac-sha256" => Algorithm::HmacSha256,
169 other => {
170 tracing::warn!(
171 target: "cellos.supervisor.event_signing",
172 toggle = %other,
173 "CELLOS_EVENT_SIGNING: unknown algorithm — signing disabled (expected off|ed25519|hmac)"
174 );
175 warnings.push(EventSigningConfigWarning {
176 var: "CELLOS_EVENT_SIGNING",
177 value: other.to_string(),
178 reason: "unknown algorithm (expected off|ed25519|hmac); signing disabled"
179 .into(),
180 });
181 return (inner, warnings);
182 }
183 };
184
185 let kid = match std::env::var("CELLOS_EVENT_SIGNING_KID") {
186 Ok(k) => k.trim().to_string(),
187 Err(_) => {
188 tracing::warn!(
189 target: "cellos.supervisor.event_signing",
190 "CELLOS_EVENT_SIGNING is set but CELLOS_EVENT_SIGNING_KID is missing — signing disabled"
191 );
192 warnings.push(EventSigningConfigWarning {
193 var: "CELLOS_EVENT_SIGNING_KID",
194 value: String::new(),
195 reason: format!(
196 "CELLOS_EVENT_SIGNING={toggle_norm} requires a kid but CELLOS_EVENT_SIGNING_KID is unset; signing disabled"
197 ),
198 });
199 return (inner, warnings);
200 }
201 };
202 if kid.is_empty() {
203 tracing::warn!(
204 target: "cellos.supervisor.event_signing",
205 "CELLOS_EVENT_SIGNING_KID is empty — signing disabled"
206 );
207 warnings.push(EventSigningConfigWarning {
208 var: "CELLOS_EVENT_SIGNING_KID",
209 value: String::new(),
210 reason: "kid is empty; signing disabled".into(),
211 });
212 return (inner, warnings);
213 }
214
215 let key_b64 = match std::env::var("CELLOS_EVENT_SIGNING_KEY_BASE64") {
216 Ok(k) => k,
217 Err(_) => {
218 tracing::warn!(
219 target: "cellos.supervisor.event_signing",
220 "CELLOS_EVENT_SIGNING is set but CELLOS_EVENT_SIGNING_KEY_BASE64 is missing — signing disabled"
221 );
222 warnings.push(EventSigningConfigWarning {
223 var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
224 value: String::new(),
225 reason: format!(
226 "CELLOS_EVENT_SIGNING={toggle_norm} requires a key but CELLOS_EVENT_SIGNING_KEY_BASE64 is unset; signing disabled"
227 ),
228 });
229 return (inner, warnings);
230 }
231 };
232 let trimmed = key_b64.trim().trim_end_matches('=');
233 let key_bytes: zeroize::Zeroizing<Vec<u8>> = match URL_SAFE_NO_PAD.decode(trimmed) {
237 Ok(b) => zeroize::Zeroizing::new(b),
238 Err(e) => {
239 tracing::warn!(
240 target: "cellos.supervisor.event_signing",
241 error = %e,
242 "CELLOS_EVENT_SIGNING_KEY_BASE64: invalid base64url — signing disabled"
243 );
244 warnings.push(EventSigningConfigWarning {
248 var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
249 value: "<base64 decode failed>".into(),
250 reason: format!("invalid base64url: {e}; signing disabled"),
251 });
252 return (inner, warnings);
253 }
254 };
255
256 match algorithm {
260 Algorithm::Ed25519 if key_bytes.len() != 32 => {
261 tracing::warn!(
262 target: "cellos.supervisor.event_signing",
263 got_bytes = key_bytes.len(),
264 "CELLOS_EVENT_SIGNING=ed25519 requires a 32-byte key — signing disabled"
265 );
266 warnings.push(EventSigningConfigWarning {
267 var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
268 value: format!("<{} bytes>", key_bytes.len()),
269 reason: format!(
270 "CELLOS_EVENT_SIGNING=ed25519 requires a 32-byte key, got {}; signing disabled",
271 key_bytes.len()
272 ),
273 });
274 return (inner, warnings);
275 }
276 Algorithm::HmacSha256 if key_bytes.is_empty() => {
277 tracing::warn!(
278 target: "cellos.supervisor.event_signing",
279 "CELLOS_EVENT_SIGNING=hmac requires a non-empty key — signing disabled"
280 );
281 warnings.push(EventSigningConfigWarning {
282 var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
283 value: "<empty>".into(),
284 reason: "CELLOS_EVENT_SIGNING=hmac requires a non-empty key; signing disabled"
285 .into(),
286 });
287 return (inner, warnings);
288 }
289 _ => {}
290 }
291
292 let algo_label = match algorithm {
293 Algorithm::Ed25519 => "ed25519",
294 Algorithm::HmacSha256 => "hmac-sha256",
295 };
296 tracing::info!(
297 target: "cellos.supervisor.event_signing",
298 algorithm = %algo_label,
299 kid = %kid,
300 "per-event signing enabled"
301 );
302
303 let sink: Arc<dyn EventSink> = Arc::new(Self::new(
304 inner,
305 SigningConfig {
306 algorithm,
307 kid,
308 key_bytes,
309 },
310 ));
311 (sink, warnings)
312 }
313
314 fn wrap(&self, event: &CloudEventV1) -> Result<CloudEventV1, CellosError> {
316 let envelope: SignedEventEnvelopeV1 = match self.cfg.algorithm {
317 Algorithm::Ed25519 => {
318 let array: [u8; 32] = self.cfg.key_bytes.as_slice().try_into().map_err(|_| {
319 CellosError::InvalidSpec(format!(
320 "event signing: ed25519 key must be 32 bytes, got {}",
321 self.cfg.key_bytes.len()
322 ))
323 })?;
324 let signing_key = ed25519_dalek::SigningKey::from_bytes(&array);
325 sign_event_ed25519(event, &self.cfg.kid, &signing_key)?
326 }
327 Algorithm::HmacSha256 => {
328 sign_event_hmac_sha256(event, &self.cfg.kid, &self.cfg.key_bytes)?
329 }
330 };
331 let data = serde_json::to_value(&envelope).map_err(|e| {
332 CellosError::InvalidSpec(format!("event signing: serialize envelope: {e}"))
333 })?;
334 Ok(CloudEventV1 {
335 specversion: "1.0".into(),
336 id: event.id.clone(),
339 source: WRAPPER_SOURCE.to_string(),
340 ty: SIGNED_ENVELOPE_TRANSPORT_TYPE.to_string(),
341 datacontenttype: Some("application/json".into()),
342 data: Some(data),
343 time: event.time.clone(),
344 traceparent: event.traceparent.clone(),
345 })
346 }
347}
348
349#[async_trait]
350impl EventSink for SigningEventSink {
351 async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
352 let wrapped = self.wrap(event)?;
353 self.inner.emit(&wrapped).await
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use cellos_core::{verify_signed_event_envelope, CloudEventV1};
361 use ed25519_dalek::SigningKey;
362 use serde_json::json;
363 use std::collections::HashMap;
364 use std::sync::Mutex;
365
366 struct CaptureSink(Mutex<Option<CloudEventV1>>);
368
369 impl CaptureSink {
370 fn new() -> Arc<Self> {
371 Arc::new(Self(Mutex::new(None)))
372 }
373 fn last(&self) -> Option<CloudEventV1> {
374 self.0.lock().unwrap().clone()
375 }
376 }
377
378 #[async_trait]
379 impl EventSink for CaptureSink {
380 async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
381 *self.0.lock().unwrap() = Some(event.clone());
382 Ok(())
383 }
384 }
385
386 fn sample_event() -> CloudEventV1 {
387 CloudEventV1 {
388 specversion: "1.0".into(),
389 id: "ev-001".into(),
390 source: "/cellos-supervisor".into(),
391 ty: "dev.cellos.events.cell.lifecycle.v1.started".into(),
392 datacontenttype: Some("application/json".into()),
393 data: Some(json!({"cellId": "test-cell-1"})),
394 time: Some("2026-05-06T12:00:00Z".into()),
395 traceparent: None,
396 }
397 }
398
399 static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
401
402 fn clear_signing_env() {
403 std::env::remove_var("CELLOS_EVENT_SIGNING");
404 std::env::remove_var("CELLOS_EVENT_SIGNING_KID");
405 std::env::remove_var("CELLOS_EVENT_SIGNING_KEY_BASE64");
406 }
407
408 #[tokio::test]
409 async fn from_env_off_passes_through_unwrapped() {
410 let capture = CaptureSink::new();
411 let sink = {
412 let _g = FROM_ENV_MUTEX.lock().unwrap();
413 clear_signing_env();
414 std::env::set_var("CELLOS_EVENT_SIGNING", "off");
415 SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
416 };
417 let event = sample_event();
418 sink.emit(&event).await.unwrap();
419 {
420 let _g = FROM_ENV_MUTEX.lock().unwrap();
421 clear_signing_env();
422 }
423 let got = capture.last().unwrap();
424 assert_eq!(
425 got.ty, "dev.cellos.events.cell.lifecycle.v1.started",
426 "off must pass through the original event unchanged"
427 );
428 }
429
430 #[tokio::test]
431 async fn from_env_unknown_toggle_disables_signing() {
432 let capture = CaptureSink::new();
433 let sink = {
434 let _g = FROM_ENV_MUTEX.lock().unwrap();
435 clear_signing_env();
436 std::env::set_var("CELLOS_EVENT_SIGNING", "rsa-pss-sha512");
437 std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-event-2026-q2");
438 std::env::set_var(
439 "CELLOS_EVENT_SIGNING_KEY_BASE64",
440 URL_SAFE_NO_PAD.encode([7u8; 32]),
441 );
442 SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
443 };
444 let event = sample_event();
445 sink.emit(&event).await.unwrap();
446 {
447 let _g = FROM_ENV_MUTEX.lock().unwrap();
448 clear_signing_env();
449 }
450 let got = capture.last().unwrap();
451 assert_eq!(
452 got.ty, "dev.cellos.events.cell.lifecycle.v1.started",
453 "unknown algorithm must fall back to passthrough"
454 );
455 }
456
457 #[tokio::test]
458 async fn from_env_ed25519_round_trip_via_sink() {
459 let capture = CaptureSink::new();
460 let signer_seed = [13u8; 32];
461 let signer = SigningKey::from_bytes(&signer_seed);
462 let sink = {
463 let _g = FROM_ENV_MUTEX.lock().unwrap();
464 clear_signing_env();
465 std::env::set_var("CELLOS_EVENT_SIGNING", "ed25519");
466 std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-event-2026-q2");
467 std::env::set_var(
468 "CELLOS_EVENT_SIGNING_KEY_BASE64",
469 URL_SAFE_NO_PAD.encode(signer_seed),
470 );
471 SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
472 };
473 let event = sample_event();
474 sink.emit(&event).await.unwrap();
475 {
476 let _g = FROM_ENV_MUTEX.lock().unwrap();
477 clear_signing_env();
478 }
479
480 let got = capture.last().expect("wrapper emitted");
481 assert_eq!(got.ty, SIGNED_ENVELOPE_TRANSPORT_TYPE);
482 let envelope: SignedEventEnvelopeV1 =
483 serde_json::from_value(got.data.expect("wrapper has data")).expect("parse envelope");
484 assert_eq!(envelope.algorithm, "ed25519");
485 assert_eq!(envelope.signer_kid, "ops-event-2026-q2");
486
487 let mut keys = HashMap::new();
488 keys.insert("ops-event-2026-q2".to_string(), signer.verifying_key());
489 let hmac_keys: HashMap<String, Vec<u8>> = HashMap::new();
490 let inner = verify_signed_event_envelope(&envelope, &keys, &hmac_keys).expect("verify ok");
491 assert_eq!(inner.id, event.id, "inner event id round-trips");
492 assert_eq!(inner.ty, event.ty);
493 }
494
495 #[tokio::test]
496 async fn from_env_hmac_round_trip_via_sink() {
497 let capture = CaptureSink::new();
498 let key = b"super-secret-shared-symmetric-key-bytes-padded";
499 let sink = {
500 let _g = FROM_ENV_MUTEX.lock().unwrap();
501 clear_signing_env();
502 std::env::set_var("CELLOS_EVENT_SIGNING", "hmac");
503 std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-hmac-2026-q2");
504 std::env::set_var(
505 "CELLOS_EVENT_SIGNING_KEY_BASE64",
506 URL_SAFE_NO_PAD.encode(key),
507 );
508 SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
509 };
510 let event = sample_event();
511 sink.emit(&event).await.unwrap();
512 {
513 let _g = FROM_ENV_MUTEX.lock().unwrap();
514 clear_signing_env();
515 }
516
517 let got = capture.last().expect("wrapper emitted");
518 assert_eq!(got.ty, SIGNED_ENVELOPE_TRANSPORT_TYPE);
519 let envelope: SignedEventEnvelopeV1 =
520 serde_json::from_value(got.data.expect("wrapper has data")).expect("parse envelope");
521 assert_eq!(envelope.algorithm, "hmac-sha256");
522
523 let verifying_keys: HashMap<String, ed25519_dalek::VerifyingKey> = HashMap::new();
524 let mut hmac_keys: HashMap<String, Vec<u8>> = HashMap::new();
525 hmac_keys.insert("ops-hmac-2026-q2".to_string(), key.to_vec());
526 let inner = verify_signed_event_envelope(&envelope, &verifying_keys, &hmac_keys)
527 .expect("verify ok");
528 assert_eq!(inner.id, event.id);
529 }
530
531 #[tokio::test]
532 async fn post_sign_event_mutation_fails_verification() {
533 let capture = CaptureSink::new();
534 let signer_seed = [29u8; 32];
535 let signer = SigningKey::from_bytes(&signer_seed);
536 let sink = {
537 let _g = FROM_ENV_MUTEX.lock().unwrap();
538 clear_signing_env();
539 std::env::set_var("CELLOS_EVENT_SIGNING", "ed25519");
540 std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-event-2026-q2");
541 std::env::set_var(
542 "CELLOS_EVENT_SIGNING_KEY_BASE64",
543 URL_SAFE_NO_PAD.encode(signer_seed),
544 );
545 SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
546 };
547 let event = sample_event();
548 sink.emit(&event).await.unwrap();
549 {
550 let _g = FROM_ENV_MUTEX.lock().unwrap();
551 clear_signing_env();
552 }
553
554 let got = capture.last().expect("wrapper emitted");
555 let mut envelope: SignedEventEnvelopeV1 =
556 serde_json::from_value(got.data.expect("wrapper has data")).expect("parse envelope");
557 envelope.event.id = "ev-tampered".into();
559
560 let mut keys = HashMap::new();
561 keys.insert("ops-event-2026-q2".to_string(), signer.verifying_key());
562 let hmac_keys: HashMap<String, Vec<u8>> = HashMap::new();
563 let err = verify_signed_event_envelope(&envelope, &keys, &hmac_keys)
564 .expect_err("post-sign mutation must fail verification");
565 assert!(format!("{err}").contains("ed25519 verify failed"));
566 }
567
568 #[tokio::test]
569 async fn from_env_missing_kid_disables_signing() {
570 let capture = CaptureSink::new();
571 let sink = {
572 let _g = FROM_ENV_MUTEX.lock().unwrap();
573 clear_signing_env();
574 std::env::set_var("CELLOS_EVENT_SIGNING", "ed25519");
575 std::env::set_var(
577 "CELLOS_EVENT_SIGNING_KEY_BASE64",
578 URL_SAFE_NO_PAD.encode([7u8; 32]),
579 );
580 SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
581 };
582 let event = sample_event();
583 sink.emit(&event).await.unwrap();
584 {
585 let _g = FROM_ENV_MUTEX.lock().unwrap();
586 clear_signing_env();
587 }
588
589 let got = capture.last().expect("event emitted");
590 assert_eq!(
591 got.ty, "dev.cellos.events.cell.lifecycle.v1.started",
592 "missing kid must fall back to passthrough"
593 );
594 }
595}