cellos-projector 0.5.0

Projection layer for CellOS — consumes JetStream CloudEvents into in-memory cell/formation state. Used by cellos-server.
Documentation
use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};

use anyhow::{bail, Context, Result};
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use base64::Engine as _;
use cellos_core::{
    parse_trust_verify_keys, verify_signed_event_envelope, CellStateProjection, CellStateSnapshot,
    CloudEventV1, SignedEventEnvelopeV1,
};
use ed25519_dalek::VerifyingKey;

/// CloudEvent `type` of the I5 signing wrapper produced by
/// `cellos_supervisor::event_signing::SigningEventSink`. Kept as a string
/// constant here to avoid a build-time dep on the supervisor binary crate.
const SIGNED_ENVELOPE_TRANSPORT_TYPE: &str = "dev.cellos.events.signed_envelope.v1";

fn main() -> Result<()> {
    // P4-03: `<binary> <semver> (<short-sha>)` on --version / -V. Must run
    // before Cli::parse so it doesn't trip the positional-arg requirement.
    if cellos_projector::build_info::print_version_if_requested(
        "cellos-projector",
        env!("CARGO_PKG_VERSION"),
    ) {
        return Ok(());
    }

    let cli = Cli::parse(std::env::args().skip(1))?;
    let snapshots = project_jsonl(&cli.path)?;

    if let Some(cell_id) = cli.cell_id.as_deref() {
        let snapshot = snapshots
            .values()
            .find(|snapshot| snapshot.cell_id.as_deref() == Some(cell_id))
            .with_context(|| format!("no snapshot found for cell-id {cell_id:?}"))?;
        print_snapshot(snapshot, cli.pretty)?;
        return Ok(());
    }

    if let Some(spec_id) = cli.spec_id.as_deref() {
        let snapshot = snapshots
            .values()
            .find(|snapshot| snapshot.spec_id.as_deref() == Some(spec_id))
            .with_context(|| format!("no snapshot found for spec-id {spec_id:?}"))?;
        print_snapshot(snapshot, cli.pretty)?;
        return Ok(());
    }

    if cli.pretty {
        println!("{}", serde_json::to_string_pretty(&snapshots)?);
    } else {
        println!("{}", serde_json::to_string(&snapshots)?);
    }
    Ok(())
}

#[derive(Debug, Default)]
struct Cli {
    path: PathBuf,
    cell_id: Option<String>,
    spec_id: Option<String>,
    pretty: bool,
}

impl Cli {
    fn parse<I>(args: I) -> Result<Self>
    where
        I: IntoIterator<Item = String>,
    {
        let mut cli = Cli::default();
        let mut args = args.into_iter();

        while let Some(arg) = args.next() {
            match arg.as_str() {
                "--cell-id" => {
                    cli.cell_id = Some(args.next().context("--cell-id requires a value")?);
                }
                "--spec-id" => {
                    cli.spec_id = Some(args.next().context("--spec-id requires a value")?);
                }
                "--pretty" => cli.pretty = true,
                "-h" | "--help" => {
                    print_usage();
                    std::process::exit(0);
                }
                value if cli.path.as_os_str().is_empty() => {
                    cli.path = PathBuf::from(value);
                }
                _ => bail!("unexpected argument {arg:?}"),
            }
        }

        if cli.path.as_os_str().is_empty() {
            bail!(
                "usage: cellos-projector <events.jsonl> [--cell-id ID | --spec-id ID] [--pretty]"
            );
        }
        if cli.cell_id.is_some() && cli.spec_id.is_some() {
            bail!("use only one of --cell-id or --spec-id");
        }
        Ok(cli)
    }
}

fn print_usage() {
    eprintln!("usage: cellos-projector <events.jsonl> [--cell-id ID | --spec-id ID] [--pretty]");
}

fn project_jsonl(path: &Path) -> Result<BTreeMap<String, CellStateSnapshot>> {
    let file = File::open(path).with_context(|| format!("open {}", path.display()))?;
    let reader = BufReader::new(file);
    let mut projections: BTreeMap<String, CellStateProjection> = BTreeMap::new();

    // I5: load optional verifier keyrings once. When neither is configured,
    // signed envelopes are unwrapped without verification (consumer-side
    // opt-in mirrors producer-side opt-in: a deployment that runs the
    // projector against unsigned-or-mixed JSONL must not fail just because
    // some events happen to be wrapped).
    let verifying_keys = load_verifying_keys()?;
    let hmac_keys = load_hmac_keys()?;

    for (index, line) in reader.lines().enumerate() {
        let line = line.with_context(|| format!("read line {}", index + 1))?;
        if line.trim().is_empty() {
            continue;
        }
        let event: CloudEventV1 = serde_json::from_str(&line)
            .with_context(|| format!("parse CloudEvent JSON on line {}", index + 1))?;

        // I5: unwrap signing envelopes before projection.
        let projected = unwrap_signed_envelope(&event, &verifying_keys, &hmac_keys)
            .with_context(|| format!("unwrap signed envelope on line {}", index + 1))?;

        let key = event_projection_key(&projected)?;
        let projection = projections.entry(key).or_default();
        projection
            .apply(&projected)
            .with_context(|| format!("apply event {:?} on line {}", projected.ty, index + 1))?;
    }

    Ok(projections
        .into_iter()
        .map(|(key, projection)| (key, projection.snapshot()))
        .collect())
}

/// I5: detect transport-wrapper events of type
/// [`SIGNED_ENVELOPE_TRANSPORT_TYPE`] and unwrap them into the inner
/// `CloudEventV1`. Non-wrapper events are passed through unchanged.
///
/// When the verifier keyrings are non-empty and the envelope's signer kid
/// is present, the envelope is verified before unwrap; verification failure
/// is fatal (a bogus wrapper that's let through projects an attacker-chosen
/// inner event). When the keyrings are empty (no operator config), the
/// envelope is unwrapped without verification — the signed-envelope shape
/// itself plus a kid match downstream operator workflow is the only check.
fn unwrap_signed_envelope(
    event: &CloudEventV1,
    verifying_keys: &HashMap<String, VerifyingKey>,
    hmac_keys: &HashMap<String, Vec<u8>>,
) -> Result<CloudEventV1> {
    if event.ty != SIGNED_ENVELOPE_TRANSPORT_TYPE {
        return Ok(event.clone());
    }
    let data = event
        .data
        .as_ref()
        .with_context(|| format!("wrapper event {:?} has no data payload", event.ty))?;
    let envelope: SignedEventEnvelopeV1 = serde_json::from_value(data.clone())
        .with_context(|| "parse SignedEventEnvelopeV1 from wrapper data")?;

    let any_keys_configured = !verifying_keys.is_empty() || !hmac_keys.is_empty();
    if any_keys_configured {
        verify_signed_event_envelope(&envelope, verifying_keys, hmac_keys)
            .map_err(|e| anyhow::anyhow!("verify signed envelope: {e}"))?;
    }
    Ok(envelope.event)
}

/// Load `CELLOS_EVENT_VERIFY_KEYS_PATH` (Ed25519, kid → base64url-pubkey)
/// using `cellos_core::parse_trust_verify_keys` (same JSON shape as the
/// SEC-25 trust keyset verifier file). Empty/unset → empty map.
fn load_verifying_keys() -> Result<HashMap<String, VerifyingKey>> {
    let Some(path) = std::env::var("CELLOS_EVENT_VERIFY_KEYS_PATH").ok() else {
        return Ok(HashMap::new());
    };
    if path.trim().is_empty() {
        return Ok(HashMap::new());
    }
    let raw = std::fs::read_to_string(&path)
        .with_context(|| format!("read CELLOS_EVENT_VERIFY_KEYS_PATH={path}"))?;
    parse_trust_verify_keys(&raw)
        .map_err(|e| anyhow::anyhow!("parse CELLOS_EVENT_VERIFY_KEYS_PATH={path}: {e}"))
}

/// Load `CELLOS_EVENT_VERIFY_HMAC_KEYS_PATH` (HMAC-SHA256, kid → base64url
/// shared key). File format mirrors the verifying-keys file: top-level JSON
/// object whose values are base64url-encoded raw key bytes (any non-empty
/// length). Empty/unset → empty map.
fn load_hmac_keys() -> Result<HashMap<String, Vec<u8>>> {
    let Some(path) = std::env::var("CELLOS_EVENT_VERIFY_HMAC_KEYS_PATH").ok() else {
        return Ok(HashMap::new());
    };
    if path.trim().is_empty() {
        return Ok(HashMap::new());
    }
    let raw = std::fs::read_to_string(&path)
        .with_context(|| format!("read CELLOS_EVENT_VERIFY_HMAC_KEYS_PATH={path}"))?;
    let value: serde_json::Value = serde_json::from_str(&raw)
        .with_context(|| format!("parse JSON CELLOS_EVENT_VERIFY_HMAC_KEYS_PATH={path}"))?;
    let object = value
        .as_object()
        .with_context(|| "hmac verify keys: top-level value must be a JSON object")?;
    let mut out: HashMap<String, Vec<u8>> = HashMap::with_capacity(object.len());
    for (kid, val) in object {
        let s = val
            .as_str()
            .with_context(|| format!("hmac verify keys: value for kid {kid:?} must be a string"))?;
        let trimmed = s.trim_end_matches('=');
        let bytes = URL_SAFE_NO_PAD
            .decode(trimmed)
            .with_context(|| format!("hmac verify keys: kid {kid:?} not valid base64url"))?;
        if bytes.is_empty() {
            bail!("hmac verify keys: kid {kid:?} decoded to empty");
        }
        out.insert(kid.clone(), bytes);
    }
    Ok(out)
}

fn event_projection_key(event: &CloudEventV1) -> Result<String> {
    let Some(data) = event.data.as_ref() else {
        bail!("event {:?} is missing data payload", event.ty);
    };
    if let Some(cell_id) = data.get("cellId").and_then(|value| value.as_str()) {
        return Ok(format!("cell:{cell_id}"));
    }
    if let Some(spec_id) = data.get("specId").and_then(|value| value.as_str()) {
        return Ok(format!("spec:{spec_id}"));
    }
    bail!("event {:?} is not a cell-scoped event", event.ty)
}

fn print_snapshot(snapshot: &CellStateSnapshot, pretty: bool) -> Result<()> {
    if pretty {
        println!("{}", serde_json::to_string_pretty(snapshot)?);
    } else {
        println!("{}", serde_json::to_string(snapshot)?);
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use std::io::Write;

    use super::*;

    #[test]
    fn projects_jsonl_to_snapshot_map() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("events.jsonl");
        let mut file = File::create(&path).unwrap();
        writeln!(
            file,
            "{}",
            serde_json::json!({
                "specversion": "1.0",
                "id": "1",
                "source": "urn:test",
                "type": "dev.cellos.events.cell.lifecycle.v1.started",
                "data": { "cellId": "cell-1", "specId": "spec-1" }
            })
        )
        .unwrap();
        writeln!(
            file,
            "{}",
            serde_json::json!({
                "specversion": "1.0",
                "id": "2",
                "source": "urn:test",
                "type": "dev.cellos.events.cell.command.v1.completed",
                "data": { "cellId": "cell-1", "specId": "spec-1", "exitCode": 0 }
            })
        )
        .unwrap();

        let snapshots = project_jsonl(&path).unwrap();
        let snapshot = snapshots.get("cell:cell-1").unwrap();
        assert_eq!(
            snapshot.current_state,
            cellos_core::ProjectionCurrentState::CommandSucceeded
        );
        assert_eq!(snapshot.processed_events, 2);
    }

    /// P4-03 smoke: confirm the version format string compiles. The
    /// shared helper has its own format-stability test in
    /// `build_info::tests`; this one just exercises the call site so a
    /// renamed const or a typo in the binary name fails at `cargo test`.
    #[test]
    fn version_compiles() {
        let _ = cellos_projector::build_info::version_line(
            "cellos-projector",
            env!("CARGO_PKG_VERSION"),
        );
    }

    #[test]
    fn cli_rejects_both_filters() {
        let err = Cli::parse([
            "events.jsonl".to_string(),
            "--cell-id".to_string(),
            "a".to_string(),
            "--spec-id".to_string(),
            "b".to_string(),
        ])
        .unwrap_err();

        assert!(err.to_string().contains("only one"));
    }
}