cellos-supervisor 0.5.1

CellOS execution-cell runner — boots cells in Firecracker microVMs or gVisor, enforces narrow typed authority, emits signed CloudEvents.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
//! [`EventSink`] wrapper that signs each emitted CloudEvent (I5).
//!
//! This wraps any `Arc<dyn EventSink>` and, before calling `inner.emit()`,
//! signs the event into a [`cellos_core::SignedEventEnvelopeV1`] (HMAC-SHA256
//! or Ed25519 — operator's choice) then re-wraps it inside a transport
//! CloudEvent of `type` [`SIGNED_ENVELOPE_TRANSPORT_TYPE`] whose `data` field
//! is the JSON-serialized envelope. JetStream / JSONL pipelines downstream
//! see a normal `CloudEventV1` and need no trait change; the projector and
//! other typed consumers detect the wrapper type and unwrap it before
//! projecting the inner event.
//!
//! # Activation
//!
//! Three env vars, parsed in the composition root:
//!
//! - `CELLOS_EVENT_SIGNING={off|ed25519|hmac}` — algorithm selector. Default
//!   `off`. Any other value (including unparseable text) is logged at WARN
//!   and falls back to `off`.
//! - `CELLOS_EVENT_SIGNING_KID=<string>` — producer-asserted signer kid that
//!   downstream verifiers must know about. Required when the algo toggle is
//!   not `off`; missing/empty kid → signing disabled.
//! - `CELLOS_EVENT_SIGNING_KEY_BASE64=<base64url>` — the secret. For
//!   `ed25519` this is the raw 32-byte Ed25519 signing-key seed. For `hmac`
//!   this is the shared symmetric key (any length; HMAC-SHA256 internally
//!   uses RFC 2104's keyed-block padding for keys that are not 64 bytes).
//!   Padded base64url is tolerated. Decode failures → signing disabled.
//!
//! # Doctrine
//!
//! - **D1 — opt-in.** The default is `off`. Producers that don't sign emit
//!   raw `CloudEventV1` envelopes exactly as before; consumers that don't
//!   verify see no change.
//! - **Sign-after-redact.** The composition root wraps:
//!   `SigningEventSink::from_env(RedactingEventSink::from_env(inner))`
//!   so the signed payload reflects the post-redaction text. A consumer
//!   that decodes a `SignedEventEnvelopeV1` and re-runs redaction would
//!   diverge from what the producer signed.
//! - **No transparent fallback to unsigned on emit failure.** If signing is
//!   configured and fails for a single event, that emit returns an error;
//!   the supervisor decides what to do with it (today: surfaces it to the
//!   `emit()` call-site).

use std::sync::Arc;

use async_trait::async_trait;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use base64::Engine as _;
use cellos_core::ports::EventSink;
use cellos_core::{
    sign_event_ed25519, sign_event_hmac_sha256, CellosError, CloudEventV1, SignedEventEnvelopeV1,
};

/// CloudEvent `type` of the transport wrapper produced by [`SigningEventSink`].
///
/// Downstream consumers (the projector, taudit, etc.) MUST treat any event
/// whose `ty` matches this constant as a wrapper whose `data` JSON
/// deserializes into [`SignedEventEnvelopeV1`].
pub const SIGNED_ENVELOPE_TRANSPORT_TYPE: &str = "dev.cellos.events.signed_envelope.v1";

/// U1-04: a single structured "this signing-config env var was ignored / fell
/// back to the default" record, returned by
/// [`SigningEventSink::from_env_with_warnings`].
///
/// The composition root translates each entry into a
/// `StartupConfigWarnings::record` call so `CELLOS_STRICT_CONFIG=1` escalates
/// signing misconfig to a fatal startup error instead of silently degrading
/// to passthrough. Reviewer wave 2 (bebc77b) flagged that the legacy
/// `from_env` shape only emitted `tracing::warn!` and therefore slipped
/// through strict mode.
#[derive(Debug, Clone)]
pub struct EventSigningConfigWarning {
    /// Env var name that was misconfigured (e.g. `CELLOS_EVENT_SIGNING_KEY_BASE64`).
    pub var: &'static str,
    /// Operator-supplied value (best-effort; empty when only a companion var
    /// was missing). Never contains decoded key material — only the toggle /
    /// kid / opaque "<base64 ...>" placeholders are stored verbatim.
    pub value: String,
    /// Operator-facing explanation of why the value was rejected.
    pub reason: String,
}

/// `source` attribute of the wrapper CloudEvent. Static — the original event's
/// `source` is preserved on the inner event inside the envelope.
const WRAPPER_SOURCE: &str = "/cellos-supervisor/event-signing";

/// Operator's algorithm choice from `CELLOS_EVENT_SIGNING`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Algorithm {
    Ed25519,
    HmacSha256,
}

/// Resolved signing configuration: algo + kid + key bytes.
///
/// # Zeroize posture (D7)
///
/// `key_bytes` holds raw signing-key material. Wrapped in
/// [`zeroize::Zeroizing`] so the heap-allocated buffer is wiped on drop;
/// `kid` and `algorithm` are non-secret operator-supplied identifiers and
/// are `#[zeroize(skip)]`. `Clone` is intentionally retained for
/// composition-root plumbing — every clone preserves the same zeroize-on-drop
/// guarantee on its `key_bytes` field.
#[derive(Clone, zeroize::ZeroizeOnDrop)]
struct SigningConfig {
    #[zeroize(skip)]
    algorithm: Algorithm,
    #[zeroize(skip)]
    kid: String,
    /// For Ed25519 this is the 32-byte signing-key seed.
    /// For HMAC-SHA256 this is the raw shared key (any length).
    key_bytes: zeroize::Zeroizing<Vec<u8>>,
}

/// [`EventSink`] wrapper that signs each emitted CloudEvent and forwards a
/// transport-wrapper CloudEvent of type [`SIGNED_ENVELOPE_TRANSPORT_TYPE`].
pub struct SigningEventSink {
    inner: Arc<dyn EventSink>,
    cfg: SigningConfig,
}

impl SigningEventSink {
    /// Construct a signing sink directly. Prefer [`SigningEventSink::from_env`]
    /// in the composition root.
    fn new(inner: Arc<dyn EventSink>, cfg: SigningConfig) -> Self {
        Self { inner, cfg }
    }

    /// Parse `CELLOS_EVENT_SIGNING` + `CELLOS_EVENT_SIGNING_KID` +
    /// `CELLOS_EVENT_SIGNING_KEY_BASE64` and return a wrapped sink if all
    /// three are valid, otherwise return `inner` unwrapped.
    ///
    /// Backwards-compatible thin wrapper over
    /// [`SigningEventSink::from_env_with_warnings`] for callers that don't
    /// need to surface the structured warnings (existing in-module unit
    /// tests, the `sink_composition_chain` integration test). Composition
    /// root MUST use `from_env_with_warnings` so misconfig is observable
    /// under `CELLOS_STRICT_CONFIG=1` (U1-04).
    pub fn from_env(inner: Arc<dyn EventSink>) -> Arc<dyn EventSink> {
        let (sink, _warnings) = Self::from_env_with_warnings(inner);
        sink
    }

    /// Same parsing as [`SigningEventSink::from_env`] but additionally returns
    /// a [`Vec<EventSigningConfigWarning>`] capturing every WARN-and-fall-back
    /// decision. The composition root threads these into the supervisor's
    /// `StartupConfigWarnings` accumulator so a misconfigured `event_signing`
    /// triggers `CELLOS_STRICT_CONFIG=1` startup failure (U1-04 / reviewer
    /// wave 2 finding bebc77b) rather than silently degrading to passthrough.
    ///
    /// Each disable path also emits the legacy `tracing::warn!` so log-based
    /// alerting keeps working unchanged.
    pub fn from_env_with_warnings(
        inner: Arc<dyn EventSink>,
    ) -> (Arc<dyn EventSink>, Vec<EventSigningConfigWarning>) {
        let mut warnings: Vec<EventSigningConfigWarning> = Vec::new();

        let toggle = match std::env::var("CELLOS_EVENT_SIGNING") {
            Ok(v) => v,
            Err(_) => return (inner, warnings),
        };
        let toggle_norm = toggle.trim().to_ascii_lowercase();
        if toggle_norm.is_empty() || toggle_norm == "off" {
            return (inner, warnings);
        }

        let algorithm = match toggle_norm.as_str() {
            "ed25519" => Algorithm::Ed25519,
            "hmac" | "hmac-sha256" => Algorithm::HmacSha256,
            other => {
                tracing::warn!(
                    target: "cellos.supervisor.event_signing",
                    toggle = %other,
                    "CELLOS_EVENT_SIGNING: unknown algorithm — signing disabled (expected off|ed25519|hmac)"
                );
                warnings.push(EventSigningConfigWarning {
                    var: "CELLOS_EVENT_SIGNING",
                    value: other.to_string(),
                    reason: "unknown algorithm (expected off|ed25519|hmac); signing disabled"
                        .into(),
                });
                return (inner, warnings);
            }
        };

        let kid = match std::env::var("CELLOS_EVENT_SIGNING_KID") {
            Ok(k) => k.trim().to_string(),
            Err(_) => {
                tracing::warn!(
                    target: "cellos.supervisor.event_signing",
                    "CELLOS_EVENT_SIGNING is set but CELLOS_EVENT_SIGNING_KID is missing — signing disabled"
                );
                warnings.push(EventSigningConfigWarning {
                    var: "CELLOS_EVENT_SIGNING_KID",
                    value: String::new(),
                    reason: format!(
                        "CELLOS_EVENT_SIGNING={toggle_norm} requires a kid but CELLOS_EVENT_SIGNING_KID is unset; signing disabled"
                    ),
                });
                return (inner, warnings);
            }
        };
        if kid.is_empty() {
            tracing::warn!(
                target: "cellos.supervisor.event_signing",
                "CELLOS_EVENT_SIGNING_KID is empty — signing disabled"
            );
            warnings.push(EventSigningConfigWarning {
                var: "CELLOS_EVENT_SIGNING_KID",
                value: String::new(),
                reason: "kid is empty; signing disabled".into(),
            });
            return (inner, warnings);
        }

        let key_b64 = match std::env::var("CELLOS_EVENT_SIGNING_KEY_BASE64") {
            Ok(k) => k,
            Err(_) => {
                tracing::warn!(
                    target: "cellos.supervisor.event_signing",
                    "CELLOS_EVENT_SIGNING is set but CELLOS_EVENT_SIGNING_KEY_BASE64 is missing — signing disabled"
                );
                warnings.push(EventSigningConfigWarning {
                    var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
                    value: String::new(),
                    reason: format!(
                        "CELLOS_EVENT_SIGNING={toggle_norm} requires a key but CELLOS_EVENT_SIGNING_KEY_BASE64 is unset; signing disabled"
                    ),
                });
                return (inner, warnings);
            }
        };
        let trimmed = key_b64.trim().trim_end_matches('=');
        // D7: wrap the decoded raw key in `Zeroizing` immediately so that the
        // intermediate buffer is wiped on drop, even on the early-return error
        // paths below.
        let key_bytes: zeroize::Zeroizing<Vec<u8>> = match URL_SAFE_NO_PAD.decode(trimmed) {
            Ok(b) => zeroize::Zeroizing::new(b),
            Err(e) => {
                tracing::warn!(
                    target: "cellos.supervisor.event_signing",
                    error = %e,
                    "CELLOS_EVENT_SIGNING_KEY_BASE64: invalid base64url — signing disabled"
                );
                // Don't echo the operator's raw key bytes back into a structured
                // warning record — record an opaque marker instead, with the
                // base64 decoder's diagnostic in `reason` for triage.
                warnings.push(EventSigningConfigWarning {
                    var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
                    value: "<base64 decode failed>".into(),
                    reason: format!("invalid base64url: {e}; signing disabled"),
                });
                return (inner, warnings);
            }
        };

        // For Ed25519 the seed must decode to exactly 32 bytes; HMAC accepts
        // any non-empty length (RFC 2104 § 2 explicitly allows arbitrary
        // key lengths via the keyed-block transformation).
        match algorithm {
            Algorithm::Ed25519 if key_bytes.len() != 32 => {
                tracing::warn!(
                    target: "cellos.supervisor.event_signing",
                    got_bytes = key_bytes.len(),
                    "CELLOS_EVENT_SIGNING=ed25519 requires a 32-byte key — signing disabled"
                );
                warnings.push(EventSigningConfigWarning {
                    var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
                    value: format!("<{} bytes>", key_bytes.len()),
                    reason: format!(
                        "CELLOS_EVENT_SIGNING=ed25519 requires a 32-byte key, got {}; signing disabled",
                        key_bytes.len()
                    ),
                });
                return (inner, warnings);
            }
            Algorithm::HmacSha256 if key_bytes.is_empty() => {
                tracing::warn!(
                    target: "cellos.supervisor.event_signing",
                    "CELLOS_EVENT_SIGNING=hmac requires a non-empty key — signing disabled"
                );
                warnings.push(EventSigningConfigWarning {
                    var: "CELLOS_EVENT_SIGNING_KEY_BASE64",
                    value: "<empty>".into(),
                    reason: "CELLOS_EVENT_SIGNING=hmac requires a non-empty key; signing disabled"
                        .into(),
                });
                return (inner, warnings);
            }
            _ => {}
        }

        let algo_label = match algorithm {
            Algorithm::Ed25519 => "ed25519",
            Algorithm::HmacSha256 => "hmac-sha256",
        };
        tracing::info!(
            target: "cellos.supervisor.event_signing",
            algorithm = %algo_label,
            kid = %kid,
            "per-event signing enabled"
        );

        let sink: Arc<dyn EventSink> = Arc::new(Self::new(
            inner,
            SigningConfig {
                algorithm,
                kid,
                key_bytes,
            },
        ));
        (sink, warnings)
    }

    /// Sign `event` and produce the transport-wrapper CloudEvent.
    fn wrap(&self, event: &CloudEventV1) -> Result<CloudEventV1, CellosError> {
        let envelope: SignedEventEnvelopeV1 = match self.cfg.algorithm {
            Algorithm::Ed25519 => {
                let array: [u8; 32] = self.cfg.key_bytes.as_slice().try_into().map_err(|_| {
                    CellosError::InvalidSpec(format!(
                        "event signing: ed25519 key must be 32 bytes, got {}",
                        self.cfg.key_bytes.len()
                    ))
                })?;
                let signing_key = ed25519_dalek::SigningKey::from_bytes(&array);
                sign_event_ed25519(event, &self.cfg.kid, &signing_key)?
            }
            Algorithm::HmacSha256 => {
                sign_event_hmac_sha256(event, &self.cfg.kid, &self.cfg.key_bytes)?
            }
        };
        let data = serde_json::to_value(&envelope).map_err(|e| {
            CellosError::InvalidSpec(format!("event signing: serialize envelope: {e}"))
        })?;
        Ok(CloudEventV1 {
            specversion: "1.0".into(),
            // Preserve the original event id so downstream observers can
            // correlate the wrapper with logs that reference the inner id.
            id: event.id.clone(),
            source: WRAPPER_SOURCE.to_string(),
            ty: SIGNED_ENVELOPE_TRANSPORT_TYPE.to_string(),
            datacontenttype: Some("application/json".into()),
            data: Some(data),
            time: event.time.clone(),
            traceparent: event.traceparent.clone(),
        })
    }
}

#[async_trait]
impl EventSink for SigningEventSink {
    async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
        let wrapped = self.wrap(event)?;
        self.inner.emit(&wrapped).await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use cellos_core::{verify_signed_event_envelope, CloudEventV1};
    use ed25519_dalek::SigningKey;
    use serde_json::json;
    use std::collections::HashMap;
    use std::sync::Mutex;

    /// Capture sink — records the last event it received.
    struct CaptureSink(Mutex<Option<CloudEventV1>>);

    impl CaptureSink {
        fn new() -> Arc<Self> {
            Arc::new(Self(Mutex::new(None)))
        }
        fn last(&self) -> Option<CloudEventV1> {
            self.0.lock().unwrap().clone()
        }
    }

    #[async_trait]
    impl EventSink for CaptureSink {
        async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
            *self.0.lock().unwrap() = Some(event.clone());
            Ok(())
        }
    }

    fn sample_event() -> CloudEventV1 {
        CloudEventV1 {
            specversion: "1.0".into(),
            id: "ev-001".into(),
            source: "/cellos-supervisor".into(),
            ty: "dev.cellos.events.cell.lifecycle.v1.started".into(),
            datacontenttype: Some("application/json".into()),
            data: Some(json!({"cellId": "test-cell-1"})),
            time: Some("2026-05-06T12:00:00Z".into()),
            traceparent: None,
        }
    }

    /// Guards env-var mutation so from_env tests don't race with each other.
    static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());

    fn clear_signing_env() {
        std::env::remove_var("CELLOS_EVENT_SIGNING");
        std::env::remove_var("CELLOS_EVENT_SIGNING_KID");
        std::env::remove_var("CELLOS_EVENT_SIGNING_KEY_BASE64");
    }

    #[tokio::test]
    async fn from_env_off_passes_through_unwrapped() {
        let capture = CaptureSink::new();
        let sink = {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
            std::env::set_var("CELLOS_EVENT_SIGNING", "off");
            SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
        };
        let event = sample_event();
        sink.emit(&event).await.unwrap();
        {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
        }
        let got = capture.last().unwrap();
        assert_eq!(
            got.ty, "dev.cellos.events.cell.lifecycle.v1.started",
            "off must pass through the original event unchanged"
        );
    }

    #[tokio::test]
    async fn from_env_unknown_toggle_disables_signing() {
        let capture = CaptureSink::new();
        let sink = {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
            std::env::set_var("CELLOS_EVENT_SIGNING", "rsa-pss-sha512");
            std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-event-2026-q2");
            std::env::set_var(
                "CELLOS_EVENT_SIGNING_KEY_BASE64",
                URL_SAFE_NO_PAD.encode([7u8; 32]),
            );
            SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
        };
        let event = sample_event();
        sink.emit(&event).await.unwrap();
        {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
        }
        let got = capture.last().unwrap();
        assert_eq!(
            got.ty, "dev.cellos.events.cell.lifecycle.v1.started",
            "unknown algorithm must fall back to passthrough"
        );
    }

    #[tokio::test]
    async fn from_env_ed25519_round_trip_via_sink() {
        let capture = CaptureSink::new();
        let signer_seed = [13u8; 32];
        let signer = SigningKey::from_bytes(&signer_seed);
        let sink = {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
            std::env::set_var("CELLOS_EVENT_SIGNING", "ed25519");
            std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-event-2026-q2");
            std::env::set_var(
                "CELLOS_EVENT_SIGNING_KEY_BASE64",
                URL_SAFE_NO_PAD.encode(signer_seed),
            );
            SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
        };
        let event = sample_event();
        sink.emit(&event).await.unwrap();
        {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
        }

        let got = capture.last().expect("wrapper emitted");
        assert_eq!(got.ty, SIGNED_ENVELOPE_TRANSPORT_TYPE);
        let envelope: SignedEventEnvelopeV1 =
            serde_json::from_value(got.data.expect("wrapper has data")).expect("parse envelope");
        assert_eq!(envelope.algorithm, "ed25519");
        assert_eq!(envelope.signer_kid, "ops-event-2026-q2");

        let mut keys = HashMap::new();
        keys.insert("ops-event-2026-q2".to_string(), signer.verifying_key());
        let hmac_keys: HashMap<String, Vec<u8>> = HashMap::new();
        let inner = verify_signed_event_envelope(&envelope, &keys, &hmac_keys).expect("verify ok");
        assert_eq!(inner.id, event.id, "inner event id round-trips");
        assert_eq!(inner.ty, event.ty);
    }

    #[tokio::test]
    async fn from_env_hmac_round_trip_via_sink() {
        let capture = CaptureSink::new();
        let key = b"super-secret-shared-symmetric-key-bytes-padded";
        let sink = {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
            std::env::set_var("CELLOS_EVENT_SIGNING", "hmac");
            std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-hmac-2026-q2");
            std::env::set_var(
                "CELLOS_EVENT_SIGNING_KEY_BASE64",
                URL_SAFE_NO_PAD.encode(key),
            );
            SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
        };
        let event = sample_event();
        sink.emit(&event).await.unwrap();
        {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
        }

        let got = capture.last().expect("wrapper emitted");
        assert_eq!(got.ty, SIGNED_ENVELOPE_TRANSPORT_TYPE);
        let envelope: SignedEventEnvelopeV1 =
            serde_json::from_value(got.data.expect("wrapper has data")).expect("parse envelope");
        assert_eq!(envelope.algorithm, "hmac-sha256");

        let verifying_keys: HashMap<String, ed25519_dalek::VerifyingKey> = HashMap::new();
        let mut hmac_keys: HashMap<String, Vec<u8>> = HashMap::new();
        hmac_keys.insert("ops-hmac-2026-q2".to_string(), key.to_vec());
        let inner = verify_signed_event_envelope(&envelope, &verifying_keys, &hmac_keys)
            .expect("verify ok");
        assert_eq!(inner.id, event.id);
    }

    #[tokio::test]
    async fn post_sign_event_mutation_fails_verification() {
        let capture = CaptureSink::new();
        let signer_seed = [29u8; 32];
        let signer = SigningKey::from_bytes(&signer_seed);
        let sink = {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
            std::env::set_var("CELLOS_EVENT_SIGNING", "ed25519");
            std::env::set_var("CELLOS_EVENT_SIGNING_KID", "ops-event-2026-q2");
            std::env::set_var(
                "CELLOS_EVENT_SIGNING_KEY_BASE64",
                URL_SAFE_NO_PAD.encode(signer_seed),
            );
            SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
        };
        let event = sample_event();
        sink.emit(&event).await.unwrap();
        {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
        }

        let got = capture.last().expect("wrapper emitted");
        let mut envelope: SignedEventEnvelopeV1 =
            serde_json::from_value(got.data.expect("wrapper has data")).expect("parse envelope");
        // Adversary mutates the carried event after signing — different id.
        envelope.event.id = "ev-tampered".into();

        let mut keys = HashMap::new();
        keys.insert("ops-event-2026-q2".to_string(), signer.verifying_key());
        let hmac_keys: HashMap<String, Vec<u8>> = HashMap::new();
        let err = verify_signed_event_envelope(&envelope, &keys, &hmac_keys)
            .expect_err("post-sign mutation must fail verification");
        assert!(format!("{err}").contains("ed25519 verify failed"));
    }

    #[tokio::test]
    async fn from_env_missing_kid_disables_signing() {
        let capture = CaptureSink::new();
        let sink = {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
            std::env::set_var("CELLOS_EVENT_SIGNING", "ed25519");
            // Deliberately no CELLOS_EVENT_SIGNING_KID set.
            std::env::set_var(
                "CELLOS_EVENT_SIGNING_KEY_BASE64",
                URL_SAFE_NO_PAD.encode([7u8; 32]),
            );
            SigningEventSink::from_env(capture.clone() as Arc<dyn EventSink>)
        };
        let event = sample_event();
        sink.emit(&event).await.unwrap();
        {
            let _g = FROM_ENV_MUTEX.lock().unwrap();
            clear_signing_env();
        }

        let got = capture.last().expect("event emitted");
        assert_eq!(
            got.ty, "dev.cellos.events.cell.lifecycle.v1.started",
            "missing kid must fall back to passthrough"
        );
    }
}