cellos-projector 0.5.0

Projection layer for CellOS — consumes JetStream CloudEvents into in-memory cell/formation state. Used by cellos-server.
Documentation
//! I5 — per-event signing decode/verify shim shared by the JSONL projector
//! and the JetStream `cellos-state-server` consumer.
//!
//! Doctrine: D1 — verification is OPT-IN. When no verification path is
//! configured, the consumer accepts both raw `CloudEventV1` lines (the
//! historical wire format) and unsigned `SignedEventEnvelopeV1` wrappers
//! transparently. When `CELLOS_EVENT_VERIFY_KEYS_PATH` is set, every signed
//! envelope MUST verify; raw `CloudEventV1` is still accepted unless
//! `CELLOS_EVENT_REQUIRE_SIGNED=1` flips the policy to require-signed.
//!
//! Key file format mirrors `cellos_core::parse_trust_verify_keys` —
//! a JSON object mapping signer kid → base64url-encoded 32-byte Ed25519
//! verifying key. HMAC verification is supported through the underlying
//! [`cellos_core::verify_signed_event_envelope`] but no env hook is wired
//! here yet (cellos-supervisor's emit path only signs Ed25519 today).

use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use cellos_core::{
    load_trust_verify_keys_file, verify_signed_event_envelope, CloudEventV1, SignedEventEnvelopeV1,
};
use ed25519_dalek::VerifyingKey;

/// Verification configuration sourced once at startup.
///
/// Cheap to clone (`Arc`-wrapped keyring); pass into per-event hot loops.
#[derive(Clone, Default)]
pub struct EventVerifierConfig {
    /// `signer_kid → VerifyingKey` map. Empty when no verification key file
    /// is configured.
    pub verifying_keys: Arc<HashMap<String, VerifyingKey>>,
    /// HMAC keys placeholder — currently always empty (supervisor signs
    /// Ed25519 only). Wired so the [`verify_signed_event_envelope`] call
    /// surface stays algorithm-agnostic.
    pub hmac_keys: Arc<HashMap<String, Vec<u8>>>,
    /// When true, raw `CloudEventV1` payloads (no signature wrapper) are
    /// rejected. Mirrors `CELLOS_EVENT_REQUIRE_SIGNED=1`.
    pub require_signed: bool,
}

impl EventVerifierConfig {
    /// Read configuration from process environment.
    ///
    /// Variables:
    /// - `CELLOS_EVENT_VERIFY_KEYS_PATH`: JSON keyring (kid → base64url
    ///   ed25519 pubkey). Loaded with O_NOFOLLOW on Unix (mirrors SEC-15b).
    /// - `CELLOS_EVENT_REQUIRE_SIGNED`: `1`/`true`/`yes`/`on` → require every
    ///   event to arrive in a signed envelope.
    ///
    /// Returns `Ok(default)` when no key file is configured. Returns an error
    /// when the file is configured but cannot be loaded — fail-closed so a
    /// misconfigured operator does not silently accept unsigned traffic.
    pub fn from_env() -> Result<Self> {
        let require_signed = std::env::var("CELLOS_EVENT_REQUIRE_SIGNED")
            .map(|v| {
                let t = v.trim().to_ascii_lowercase();
                matches!(t.as_str(), "1" | "true" | "yes" | "on")
            })
            .unwrap_or(false);

        let verifying_keys = match std::env::var("CELLOS_EVENT_VERIFY_KEYS_PATH") {
            Ok(p) if !p.trim().is_empty() => {
                let path = PathBuf::from(p.trim());
                let keys = load_trust_verify_keys_file(&path).map_err(|e| {
                    anyhow!(
                        "CELLOS_EVENT_VERIFY_KEYS_PATH={}: load failed: {e}",
                        path.display()
                    )
                })?;
                Arc::new(keys)
            }
            _ => Arc::new(HashMap::new()),
        };

        Ok(Self {
            verifying_keys,
            hmac_keys: Arc::new(HashMap::new()),
            require_signed,
        })
    }

    /// Have any keys been loaded?
    pub fn has_keys(&self) -> bool {
        !self.verifying_keys.is_empty() || !self.hmac_keys.is_empty()
    }
}

/// Decode a single payload (JSON-encoded JetStream message body or one JSONL
/// line). Tries the signed-envelope shape first; on miss, falls back to the
/// raw `CloudEventV1` shape — preserving historical wire-format compatibility.
///
/// Verification gates:
/// - Signed envelope + `cfg.has_keys()` → MUST verify successfully.
/// - Signed envelope + no keys → accepted (the operator chose not to verify).
/// - Raw event + `cfg.require_signed` → rejected.
/// - Raw event + permissive → accepted as-is.
pub fn decode_event(bytes: &[u8], cfg: &EventVerifierConfig) -> Result<CloudEventV1> {
    // First try the signed envelope (the field set is a strict superset of a
    // raw CloudEvent, so we use field-level discrimination via try-parse).
    if let Ok(envelope) = serde_json::from_slice::<SignedEventEnvelopeV1>(bytes) {
        // Heuristic: a real signed envelope has a non-empty signature and
        // algorithm. A raw CloudEventV1 does not — but `SignedEventEnvelopeV1`
        // serde requires `signerKid`/`algorithm`/`signature`, so a raw event
        // would fail to parse here. This branch is unambiguous.
        if cfg.has_keys() {
            verify_signed_event_envelope(&envelope, &cfg.verifying_keys, &cfg.hmac_keys)
                .map_err(|e| anyhow!("signed event envelope verify failed: {e}"))?;
        }
        return Ok(envelope.event);
    }

    // Fallback: treat as a raw CloudEvent.
    if cfg.require_signed {
        return Err(anyhow!(
            "CELLOS_EVENT_REQUIRE_SIGNED=1 but payload is not a SignedEventEnvelopeV1"
        ));
    }
    let event: CloudEventV1 = serde_json::from_slice(bytes)
        .map_err(|e| anyhow!("payload is neither a signed envelope nor a CloudEventV1: {e}"))?;
    Ok(event)
}

#[cfg(test)]
mod tests {
    use super::*;
    use cellos_core::sign_event_ed25519;
    use ed25519_dalek::SigningKey;

    fn signing_key(seed: u8) -> SigningKey {
        SigningKey::from_bytes(&[seed; 32])
    }

    fn sample_event(id: &str) -> CloudEventV1 {
        CloudEventV1 {
            specversion: "1.0".into(),
            id: id.into(),
            source: "/test".into(),
            ty: "dev.cellos.events.cell.lifecycle.v1.started".into(),
            datacontenttype: Some("application/json".into()),
            data: Some(serde_json::json!({"cellId": "c-1", "specId": "s-1"})),
            time: None,
            traceparent: None,
        }
    }

    #[test]
    fn raw_event_accepted_in_permissive_mode() {
        let cfg = EventVerifierConfig::default();
        let event = sample_event("e1");
        let bytes = serde_json::to_vec(&event).unwrap();
        let decoded = decode_event(&bytes, &cfg).expect("permissive accepts raw");
        assert_eq!(decoded.id, "e1");
    }

    #[test]
    fn raw_event_rejected_when_require_signed() {
        let cfg = EventVerifierConfig {
            require_signed: true,
            ..Default::default()
        };
        let event = sample_event("e1");
        let bytes = serde_json::to_vec(&event).unwrap();
        let err = decode_event(&bytes, &cfg).expect_err("require_signed rejects raw");
        assert!(format!("{err}").contains("REQUIRE_SIGNED"));
    }

    #[test]
    fn signed_envelope_verifies_against_keyring() {
        let signer = signing_key(7);
        let event = sample_event("e1");
        let envelope = sign_event_ed25519(&event, "kid-7", &signer).unwrap();
        let bytes = serde_json::to_vec(&envelope).unwrap();

        let mut keys = HashMap::new();
        keys.insert("kid-7".to_string(), signer.verifying_key());
        let cfg = EventVerifierConfig {
            verifying_keys: Arc::new(keys),
            hmac_keys: Arc::new(HashMap::new()),
            require_signed: true,
        };

        let decoded = decode_event(&bytes, &cfg).expect("verifies");
        assert_eq!(decoded.id, "e1");
    }

    #[test]
    fn signed_envelope_with_tampered_event_rejected() {
        let signer = signing_key(7);
        let event = sample_event("e1");
        let mut envelope = sign_event_ed25519(&event, "kid-7", &signer).unwrap();
        envelope.event.id = "tampered".into();
        let bytes = serde_json::to_vec(&envelope).unwrap();

        let mut keys = HashMap::new();
        keys.insert("kid-7".to_string(), signer.verifying_key());
        let cfg = EventVerifierConfig {
            verifying_keys: Arc::new(keys),
            hmac_keys: Arc::new(HashMap::new()),
            require_signed: false,
        };

        let err = decode_event(&bytes, &cfg).expect_err("tampered must be rejected");
        assert!(format!("{err}").contains("verify failed"));
    }

    #[test]
    fn signed_envelope_unknown_kid_rejected() {
        let signer = signing_key(7);
        let event = sample_event("e1");
        let envelope = sign_event_ed25519(&event, "kid-unknown", &signer).unwrap();
        let bytes = serde_json::to_vec(&envelope).unwrap();

        let mut keys = HashMap::new();
        keys.insert("kid-other".to_string(), signing_key(11).verifying_key());
        let cfg = EventVerifierConfig {
            verifying_keys: Arc::new(keys),
            hmac_keys: Arc::new(HashMap::new()),
            require_signed: false,
        };

        let err = decode_event(&bytes, &cfg).expect_err("unknown kid must be rejected");
        let msg = format!("{err}");
        assert!(
            msg.contains("verify failed") || msg.contains("unknown"),
            "got: {msg}"
        );
    }

    #[test]
    fn signed_envelope_accepted_without_keyring() {
        // No keys configured → the verifier acts as a transparent unwrapper.
        let signer = signing_key(7);
        let event = sample_event("e1");
        let envelope = sign_event_ed25519(&event, "kid-7", &signer).unwrap();
        let bytes = serde_json::to_vec(&envelope).unwrap();

        let cfg = EventVerifierConfig::default();
        let decoded = decode_event(&bytes, &cfg).expect("no-keys is transparent unwrap");
        assert_eq!(decoded.id, "e1");
    }
}