cellos-supervisor 0.5.1

CellOS execution-cell runner — boots cells in Firecracker microVMs or gVisor, enforces narrow typed authority, emits signed CloudEvents.
Documentation
//! F5 destruction-evidence aggregator (D5 integration).
//!
//! Per-run aggregator that emits an `evidence_bundle_emitted` precursor
//! event and returns the [`SubjectUrn`] the supervisor will eventually
//! thread into [`cellos_core::lifecycle_destroyed_data_v1`] via the
//! `evidence_bundle_ref` argument.
//!
//! Today this is a **stub** — F1b owns the evidence-bundle schema and
//! signed aggregator. F5's job is to land the destination shape (typed
//! URN return + the precursor CloudEvent) so the supervisor's teardown
//! path has somewhere to call once F1b lands. When F1b ships its
//! constructor (e.g. `cellos_core::evidence_bundle_emitted_data_v1`), the
//! stub here switches to that constructor without changing this module's
//! public surface.
//!
//! See:
//! - `docs/destruction-semantics.md` — residue taxonomy this aggregator
//!   reports against.
//! - `docs/adr/0006-in-vm-observability-runner-evidence.md` §4 —
//!   `evidence_bundle` is the 1.0 deliverable; F5 is the destruction-side
//!   wedge.

use std::sync::Arc;

use cellos_core::ports::EventSink;
use cellos_core::{CloudEventV1, ResidueClass, SubjectUrn};
use serde_json::json;

#[cfg(test)]
use cellos_core::CellosError;

/// Per-run destruction-evidence aggregator.
///
/// Holds the run's identity (run_id, cell_id, spec_id) and the
/// [`EventSink`] the precursor `evidence_bundle_emitted` event flows to.
/// The supervisor builds one of these at run-start, calls
/// [`DestructionEvidenceAggregator::emit_bundle_and_finalize`] inside the
/// teardown path, and threads the returned [`SubjectUrn`] into
/// `lifecycle_destroyed_data_v1`'s `evidence_bundle_ref` argument.
pub struct DestructionEvidenceAggregator {
    run_id: String,
    cell_id: String,
    spec_id: String,
    sink: Arc<dyn EventSink>,
}

impl DestructionEvidenceAggregator {
    pub fn new(
        run_id: impl Into<String>,
        cell_id: impl Into<String>,
        spec_id: impl Into<String>,
        sink: Arc<dyn EventSink>,
    ) -> Self {
        Self {
            run_id: run_id.into(),
            cell_id: cell_id.into(),
            spec_id: spec_id.into(),
            sink,
        }
    }

    /// Build the canonical URN for this run's evidence_bundle.
    ///
    /// Stable, derivable shape so taudit / projector consumers can
    /// resolve the bundle from a `cell.destroyed` event without an
    /// additional join. F1b is free to swap this for a content-addressed
    /// digest when the signed aggregator lands; the supervisor only
    /// observes the [`SubjectUrn`] return.
    pub fn bundle_urn(&self) -> SubjectUrn {
        // The run_id is supervisor-generated and shape-validated upstream
        // (UUID/ULID-shaped, no whitespace), so the resulting URN parses
        // cleanly. A panic here would mean the supervisor produced an
        // invalid run_id — a programming error worth surfacing loudly.
        SubjectUrn::parse(format!("urn:cellos:evidence-bundle:{}", self.run_id))
            .expect("supervisor run_id must yield a valid evidence-bundle URN")
    }

    /// Emit the `evidence_bundle_emitted` precursor CloudEvent and return
    /// the bundle's [`SubjectUrn`] alongside the [`ResidueClass`] for
    /// this run.
    ///
    /// Stub today: emits a minimal data shape under the F1b event type.
    /// When `cellos_core::evidence_bundle_emitted_data_v1` is published
    /// by F1b, this body switches to the canonical constructor — the
    /// signature here stays stable so the supervisor caller does not
    /// move.
    pub async fn emit_bundle_and_finalize(
        &self,
        residue_class: ResidueClass,
    ) -> anyhow::Result<(SubjectUrn, ResidueClass)> {
        let urn = self.bundle_urn();
        let data = json!({
            "evidenceBundleRef": urn,
            "runId": self.run_id,
            "cellId": self.cell_id,
            "specId": self.spec_id,
            "residueClass": match residue_class {
                ResidueClass::None => "none",
                ResidueClass::DocumentedException => "documented_exception",
            },
            "stub": true,
        });
        let event = CloudEventV1 {
            specversion: "1.0".into(),
            id: uuid::Uuid::new_v4().to_string(),
            source: "cellos-supervisor".into(),
            ty: "dev.cellos.events.cell.observability.v1.evidence_bundle_emitted".into(),
            datacontenttype: Some("application/json".into()),
            data: Some(data),
            time: Some(chrono::Utc::now().to_rfc3339()),
            traceparent: None,
        };
        self.sink
            .emit(&event)
            .await
            .map_err(|e| anyhow::anyhow!("evidence_bundle_emitted sink emit: {e}"))?;
        Ok((urn, residue_class))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use async_trait::async_trait;
    use cellos_core::ports::EventSink;
    use std::sync::Mutex;

    #[derive(Default)]
    struct CapturingSink {
        events: Mutex<Vec<CloudEventV1>>,
    }

    #[async_trait]
    impl EventSink for CapturingSink {
        async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
            self.events.lock().unwrap().push(event.clone());
            Ok(())
        }
    }

    #[tokio::test]
    async fn aggregator_emits_precursor_and_returns_stable_urn() {
        let sink: Arc<CapturingSink> = Arc::new(CapturingSink::default());
        let agg = DestructionEvidenceAggregator::new(
            "run-abc",
            "cell-1",
            "spec-1",
            sink.clone() as Arc<dyn EventSink>,
        );
        let (urn, residue) = agg
            .emit_bundle_and_finalize(ResidueClass::None)
            .await
            .unwrap();
        assert_eq!(urn.as_str(), "urn:cellos:evidence-bundle:run-abc");
        assert_eq!(residue, ResidueClass::None);
        let events = sink.events.lock().unwrap();
        assert_eq!(events.len(), 1);
        assert_eq!(
            events[0].ty,
            "dev.cellos.events.cell.observability.v1.evidence_bundle_emitted"
        );
        let data = events[0].data.as_ref().unwrap();
        assert_eq!(
            data["evidenceBundleRef"],
            "urn:cellos:evidence-bundle:run-abc"
        );
        assert_eq!(data["residueClass"], "none");
    }
}