use std::fs::OpenOptions;
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use async_trait::async_trait;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use base64::Engine as _;
use cellos_core::error::CellosError;
use cellos_core::ports::EventSink;
use cellos_core::types::CloudEventV1;
use ed25519_dalek::{Signer, SigningKey};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CortexLedgerRow {
pub kind: String,
pub event_id: String,
pub event_type: String,
pub source: String,
pub time: Option<String>,
pub traceparent: Option<String>,
pub payload: Option<serde_json::Value>,
}
impl CortexLedgerRow {
pub fn from_cloud_event(event: &CloudEventV1) -> Self {
Self {
kind: "cellos.lifecycle.v1".to_string(),
event_id: event.id.clone(),
event_type: event.ty.clone(),
source: event.source.clone(),
time: event.time.clone(),
traceparent: event.traceparent.clone(),
payload: event.data.clone(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmittedLedgerEntry {
pub event: CortexLedgerRow,
pub ledger_seq: u64,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub cellos_sig: Option<String>,
}
#[async_trait]
pub trait LedgerSink: Send + Sync {
async fn append(&self, entry: &EmittedLedgerEntry) -> Result<(), anyhow::Error>;
}
pub struct NdjsonLedgerSink {
path: PathBuf,
writer: Mutex<()>, }
impl NdjsonLedgerSink {
pub fn new(path: impl Into<PathBuf>) -> Self {
Self {
path: path.into(),
writer: Mutex::new(()),
}
}
pub fn path(&self) -> &PathBuf {
&self.path
}
}
#[async_trait]
impl LedgerSink for NdjsonLedgerSink {
async fn append(&self, entry: &EmittedLedgerEntry) -> Result<(), anyhow::Error> {
let line = serde_json::to_string(entry)?;
let _guard = self.writer.lock().unwrap();
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
file.write_all(line.as_bytes())?;
file.write_all(b"\n")?;
Ok(())
}
}
pub struct CellosLedgerEmitter {
sink: std::sync::Arc<dyn LedgerSink>,
signing_key: Option<SigningKey>,
next_seq: AtomicU64,
}
pub const LEDGER_SIGNING_KEY_ENV: &str = "CELLOS_CORTEX_LEDGER_SIGNING_KEY_BASE64";
impl CellosLedgerEmitter {
pub fn new(sink: std::sync::Arc<dyn LedgerSink>) -> Self {
Self::with_signing_key(sink, None)
}
pub fn with_signing_key(
sink: std::sync::Arc<dyn LedgerSink>,
signing_key: Option<SigningKey>,
) -> Self {
Self {
sink,
signing_key,
next_seq: AtomicU64::new(1),
}
}
pub fn with_env_signing(sink: std::sync::Arc<dyn LedgerSink>) -> Result<Self, anyhow::Error> {
let key = Self::from_env_signing_key()?;
Ok(Self::with_signing_key(sink, key))
}
pub fn from_env_signing_key() -> Result<Option<SigningKey>, anyhow::Error> {
let raw = match std::env::var(LEDGER_SIGNING_KEY_ENV) {
Ok(v) => v,
Err(_) => return Ok(None),
};
let trimmed = raw.trim().trim_end_matches('=');
if trimmed.is_empty() {
return Ok(None);
}
let bytes = URL_SAFE_NO_PAD
.decode(trimmed)
.map_err(|e| anyhow::anyhow!("{LEDGER_SIGNING_KEY_ENV}: invalid base64url: {e}"))?;
let array: [u8; 32] = bytes.as_slice().try_into().map_err(|_| {
anyhow::anyhow!(
"{LEDGER_SIGNING_KEY_ENV}: expected 32-byte ed25519 seed, got {} bytes",
bytes.len()
)
})?;
Ok(Some(SigningKey::from_bytes(&array)))
}
pub fn is_signed(&self) -> bool {
self.signing_key.is_some()
}
fn build_entry(&self, event: &CloudEventV1) -> Result<EmittedLedgerEntry, CellosError> {
let row = CortexLedgerRow::from_cloud_event(event);
let ledger_seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
let cellos_sig = match &self.signing_key {
None => None,
Some(key) => {
let bytes = serde_json::to_vec(&row).map_err(|e| {
CellosError::EventSink(format!("cortex ledger: serialize event: {e}"))
})?;
let sig = key.sign(&bytes);
Some(URL_SAFE_NO_PAD.encode(sig.to_bytes()))
}
};
Ok(EmittedLedgerEntry {
event: row,
ledger_seq,
cellos_sig,
})
}
}
#[async_trait]
impl EventSink for CellosLedgerEmitter {
async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
let entry = self.build_entry(event)?;
self.sink.append(&entry).await.map_err(|e| {
CellosError::EventSink(format!("cortex ledger append failed: {e}"))
})
}
}
#[cfg(feature = "http-ledger")]
pub mod http_sink {
use super::{EmittedLedgerEntry, LedgerSink};
use async_trait::async_trait;
pub struct HttpLedgerSink {
client: reqwest::Client,
endpoint: String,
}
impl HttpLedgerSink {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
client: reqwest::Client::new(),
endpoint: endpoint.into(),
}
}
pub fn from_env() -> Option<Self> {
std::env::var("CORTEX_LEDGER_ENDPOINT")
.ok()
.filter(|s| !s.is_empty())
.map(Self::new)
}
}
#[async_trait]
impl LedgerSink for HttpLedgerSink {
async fn append(&self, entry: &EmittedLedgerEntry) -> Result<(), anyhow::Error> {
let resp = self.client.post(&self.endpoint).json(entry).send().await?;
if !resp.status().is_success() {
anyhow::bail!("cortex ledger HTTP append failed: status {}", resp.status());
}
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use ed25519_dalek::{Verifier, VerifyingKey};
use std::sync::Arc;
fn sample_event() -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: "evt-1".into(),
source: "cellos://supervisor/host-1".into(),
ty: "cell.lifecycle.v1.started".into(),
datacontenttype: Some("application/json".into()),
data: Some(serde_json::json!({"cell_id": "c1"})),
time: Some("2026-05-16T00:00:00Z".into()),
traceparent: None,
}
}
struct CaptureSink(Mutex<Vec<EmittedLedgerEntry>>);
impl CaptureSink {
fn new() -> Arc<Self> {
Arc::new(Self(Mutex::new(Vec::new())))
}
fn entries(&self) -> Vec<EmittedLedgerEntry> {
self.0.lock().unwrap().clone()
}
}
#[async_trait]
impl LedgerSink for CaptureSink {
async fn append(&self, entry: &EmittedLedgerEntry) -> Result<(), anyhow::Error> {
self.0.lock().unwrap().push(entry.clone());
Ok(())
}
}
#[test]
fn row_projection_preserves_event_identity() {
let evt = sample_event();
let row = CortexLedgerRow::from_cloud_event(&evt);
assert_eq!(row.kind, "cellos.lifecycle.v1");
assert_eq!(row.event_id, "evt-1");
assert_eq!(row.event_type, "cell.lifecycle.v1.started");
assert_eq!(row.source, "cellos://supervisor/host-1");
assert_eq!(row.time.as_deref(), Some("2026-05-16T00:00:00Z"));
assert_eq!(row.payload, Some(serde_json::json!({"cell_id": "c1"})));
}
#[tokio::test]
async fn ndjson_sink_appends_line_per_event() {
let dir = std::env::temp_dir().join(format!("cellos-cortex-ndjson-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("ledger.ndjson");
let _ = std::fs::remove_file(&path);
let sink = Arc::new(NdjsonLedgerSink::new(&path));
let emitter = CellosLedgerEmitter::new(sink);
emitter.emit(&sample_event()).await.expect("emit 1");
let mut evt2 = sample_event();
evt2.id = "evt-2".into();
emitter.emit(&evt2).await.expect("emit 2");
let content = std::fs::read_to_string(&path).expect("read ledger");
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 2);
let entry1: EmittedLedgerEntry = serde_json::from_str(lines[0]).unwrap();
let entry2: EmittedLedgerEntry = serde_json::from_str(lines[1]).unwrap();
assert_eq!(entry1.event.event_id, "evt-1");
assert_eq!(entry2.event.event_id, "evt-2");
assert_eq!(entry1.ledger_seq, 1);
assert_eq!(entry2.ledger_seq, 2);
assert!(
!lines[0].contains("cellos_sig"),
"unsigned emitter must omit cellos_sig from serialized entries: {}",
lines[0]
);
}
#[tokio::test]
async fn unsigned_entry_when_no_key_set() {
let sink = CaptureSink::new();
let dyn_sink: Arc<dyn LedgerSink> = sink.clone();
let emitter = CellosLedgerEmitter::with_signing_key(dyn_sink, None);
assert!(!emitter.is_signed());
emitter.emit(&sample_event()).await.expect("emit ok");
let entries = sink.entries();
assert_eq!(entries.len(), 1);
assert!(
entries[0].cellos_sig.is_none(),
"unsigned emitter must not produce a signature"
);
let json = serde_json::to_string(&entries[0]).unwrap();
assert!(
!json.contains("cellos_sig"),
"unsigned ledger JSON must not carry cellos_sig: {json}"
);
}
#[tokio::test]
async fn signed_entry_when_key_set() {
let seed = [11u8; 32];
let signing_key = SigningKey::from_bytes(&seed);
let sink = CaptureSink::new();
let dyn_sink: Arc<dyn LedgerSink> = sink.clone();
let emitter = CellosLedgerEmitter::with_signing_key(dyn_sink, Some(signing_key));
assert!(emitter.is_signed());
emitter.emit(&sample_event()).await.expect("emit ok");
let entries = sink.entries();
assert_eq!(entries.len(), 1);
let sig_b64 = entries[0]
.cellos_sig
.as_ref()
.expect("signed emitter must populate cellos_sig");
let sig_bytes = URL_SAFE_NO_PAD.decode(sig_b64).expect("decode sig");
assert_eq!(
sig_bytes.len(),
64,
"ed25519 detached signatures are 64 bytes; got {}",
sig_bytes.len()
);
assert_eq!(entries[0].ledger_seq, 1);
}
#[tokio::test]
async fn verify_signature_roundtrip() {
let seed = [23u8; 32];
let signing_key = SigningKey::from_bytes(&seed);
let verifying_key: VerifyingKey = signing_key.verifying_key();
let sink = CaptureSink::new();
let dyn_sink: Arc<dyn LedgerSink> = sink.clone();
let emitter = CellosLedgerEmitter::with_signing_key(dyn_sink, Some(signing_key));
let event = sample_event();
emitter.emit(&event).await.expect("emit ok");
let entries = sink.entries();
let entry = &entries[0];
let signed_bytes = serde_json::to_vec(&entry.event).expect("serialize event");
let sig_b64 = entry
.cellos_sig
.as_ref()
.expect("signed emitter populates cellos_sig");
let sig_bytes = URL_SAFE_NO_PAD.decode(sig_b64).expect("decode sig");
let signature = ed25519_dalek::Signature::from_slice(&sig_bytes).expect("sig from slice");
verifying_key
.verify(&signed_bytes, &signature)
.expect("roundtrip verification succeeds");
let mut tampered = entry.event.clone();
tampered.event_id = "evt-tampered".into();
let tampered_bytes = serde_json::to_vec(&tampered).unwrap();
assert!(
verifying_key.verify(&tampered_bytes, &signature).is_err(),
"post-emit tampering must fail verification"
);
}
#[test]
fn env_signing_key_malformed_base64_errors() {
struct EnvGuard;
impl Drop for EnvGuard {
fn drop(&mut self) {
std::env::remove_var(LEDGER_SIGNING_KEY_ENV);
}
}
let _g = EnvGuard;
std::env::set_var(LEDGER_SIGNING_KEY_ENV, "***not-base64***");
let err = CellosLedgerEmitter::from_env_signing_key()
.expect_err("malformed base64 must surface as error");
assert!(format!("{err}").contains("invalid base64url"));
}
}