use std::sync::Arc;
use cellos_core::ports::EventSink;
use cellos_core::{CloudEventV1, ResidueClass, SubjectUrn};
use serde_json::json;
#[cfg(test)]
use cellos_core::CellosError;
pub struct DestructionEvidenceAggregator {
run_id: String,
cell_id: String,
spec_id: String,
sink: Arc<dyn EventSink>,
}
impl DestructionEvidenceAggregator {
pub fn new(
run_id: impl Into<String>,
cell_id: impl Into<String>,
spec_id: impl Into<String>,
sink: Arc<dyn EventSink>,
) -> Self {
Self {
run_id: run_id.into(),
cell_id: cell_id.into(),
spec_id: spec_id.into(),
sink,
}
}
pub fn bundle_urn(&self) -> SubjectUrn {
SubjectUrn::parse(format!("urn:cellos:evidence-bundle:{}", self.run_id))
.expect("supervisor run_id must yield a valid evidence-bundle URN")
}
pub async fn emit_bundle_and_finalize(
&self,
residue_class: ResidueClass,
) -> anyhow::Result<(SubjectUrn, ResidueClass)> {
let urn = self.bundle_urn();
let data = json!({
"evidenceBundleRef": urn,
"runId": self.run_id,
"cellId": self.cell_id,
"specId": self.spec_id,
"residueClass": match residue_class {
ResidueClass::None => "none",
ResidueClass::DocumentedException => "documented_exception",
},
"stub": true,
});
let event = CloudEventV1 {
specversion: "1.0".into(),
id: uuid::Uuid::new_v4().to_string(),
source: "cellos-supervisor".into(),
ty: "dev.cellos.events.cell.observability.v1.evidence_bundle_emitted".into(),
datacontenttype: Some("application/json".into()),
data: Some(data),
time: Some(chrono::Utc::now().to_rfc3339()),
traceparent: None,
};
self.sink
.emit(&event)
.await
.map_err(|e| anyhow::anyhow!("evidence_bundle_emitted sink emit: {e}"))?;
Ok((urn, residue_class))
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use cellos_core::ports::EventSink;
use std::sync::Mutex;
#[derive(Default)]
struct CapturingSink {
events: Mutex<Vec<CloudEventV1>>,
}
#[async_trait]
impl EventSink for CapturingSink {
async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
self.events.lock().unwrap().push(event.clone());
Ok(())
}
}
#[tokio::test]
async fn aggregator_emits_precursor_and_returns_stable_urn() {
let sink: Arc<CapturingSink> = Arc::new(CapturingSink::default());
let agg = DestructionEvidenceAggregator::new(
"run-abc",
"cell-1",
"spec-1",
sink.clone() as Arc<dyn EventSink>,
);
let (urn, residue) = agg
.emit_bundle_and_finalize(ResidueClass::None)
.await
.unwrap();
assert_eq!(urn.as_str(), "urn:cellos:evidence-bundle:run-abc");
assert_eq!(residue, ResidueClass::None);
let events = sink.events.lock().unwrap();
assert_eq!(events.len(), 1);
assert_eq!(
events[0].ty,
"dev.cellos.events.cell.observability.v1.evidence_bundle_emitted"
);
let data = events[0].data.as_ref().unwrap();
assert_eq!(
data["evidenceBundleRef"],
"urn:cellos:evidence-bundle:run-abc"
);
assert_eq!(data["residueClass"], "none");
}
}