use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use async_trait::async_trait;
use cellos_core::events::cloud_event_v1_cortex_dispatched;
use cellos_core::ports::EventSink;
use cellos_core::types::{
AuthorityBundle, CloudEventV1, ExecutionCellDocument, ExecutionCellSpec, Lifetime, RunSpec,
};
use serde::{Deserialize, Serialize};
use crate::context::ContextPack;
use crate::policy::{apply_policy, DoctrineAuthorityPolicy};
const CORTEX_EVENT_SOURCE: &str = "cellos-cortex";
const LIFECYCLE_DESTROYED_TYPE: &str = "dev.cellos.events.cell.lifecycle.v1.destroyed";
const COMMAND_COMPLETED_TYPE: &str = "dev.cellos.events.cell.command.v1.completed";
const EXPORT_COMPLETED_V1_TYPE: &str = "dev.cellos.events.cell.export.v1.completed";
const EXPORT_COMPLETED_V2_TYPE: &str = "dev.cellos.events.cell.export.v2.completed";
pub const CELL_OS_JSONL_EVENTS_ENV: &str = "CELL_OS_JSONL_EVENTS";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CortexCellResult {
pub cell_id: String,
pub pack_id: String,
pub exit_code: i32,
pub success: bool,
pub lifecycle_destroyed_at: u64,
pub export_paths: Vec<String>,
pub doctrine_refs: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CellSubmissionOutcome {
pub cell_id: String,
pub exit_code: Option<i32>,
pub lifecycle_events: Vec<CloudEventV1>,
}
#[derive(Debug, Clone)]
pub struct ContextPackTranslation {
pub pack: ContextPack,
pub document: ExecutionCellDocument,
}
#[async_trait]
pub trait CellSubmitter: Send + Sync {
async fn submit(
&self,
document: &ExecutionCellDocument,
) -> Result<CellSubmissionOutcome, anyhow::Error>;
}
pub struct CortexCellRunner {
submitter: Arc<dyn CellSubmitter>,
agent_argv: Vec<String>,
api_version: String,
kind: String,
default_ttl_seconds: u64,
policy: DoctrineAuthorityPolicy,
event_sink: Option<Arc<dyn EventSink>>,
}
impl CortexCellRunner {
pub fn new(submitter: Arc<dyn CellSubmitter>, agent_argv: Vec<String>) -> Self {
Self {
submitter,
agent_argv,
api_version: "cellos.io/v1".to_string(),
kind: "ExecutionCell".to_string(),
default_ttl_seconds: 300,
policy: DoctrineAuthorityPolicy::built_in(),
event_sink: None,
}
}
pub fn with_event_sink(mut self, event_sink: Arc<dyn EventSink>) -> Self {
self.event_sink = Some(event_sink);
self
}
pub fn with_api_version(mut self, api_version: impl Into<String>) -> Self {
self.api_version = api_version.into();
self
}
pub fn with_default_ttl_seconds(mut self, ttl: u64) -> Self {
self.default_ttl_seconds = ttl;
self
}
pub fn with_policy(mut self, policy: DoctrineAuthorityPolicy) -> Self {
self.policy = policy;
self
}
pub fn translate(&self, pack: &ContextPack) -> ContextPackTranslation {
let cell_id = pack_to_cell_id(pack);
let mut argv = self.agent_argv.clone();
argv.push(pack.task.clone());
let ttl = self.ttl_for_pack(pack);
let mut spec = ExecutionCellSpec {
id: cell_id,
correlation: None,
ingress: None,
environment: None,
placement: None,
policy: None,
identity: None,
run: Some(RunSpec {
argv,
working_directory: None,
timeout_ms: Some(ttl.saturating_mul(1000)),
limits: None,
secret_delivery: Default::default(),
}),
authority: AuthorityBundle::default(),
lifetime: Lifetime { ttl_seconds: ttl },
export: None,
telemetry: None,
};
apply_policy(pack, &mut spec, &self.policy);
let document = ExecutionCellDocument {
api_version: self.api_version.clone(),
kind: self.kind.clone(),
spec,
};
ContextPackTranslation {
pack: pack.clone(),
document,
}
}
pub async fn dispatch(
&self,
pack: &ContextPack,
) -> Result<CellSubmissionOutcome, anyhow::Error> {
let translation = self.translate(pack);
let outcome = self.submitter.submit(&translation.document).await?;
if let Some(ref sink) = self.event_sink {
let event = cloud_event_v1_cortex_dispatched(
CORTEX_EVENT_SOURCE,
&chrono::Utc::now().to_rfc3339(),
&pack.memory_digest,
&outcome.cell_id,
&pack.doctrine_refs,
);
if let Err(e) = sink.emit(&event).await {
tracing::warn!(
target: "cellos.cortex.runner",
cell_id = %outcome.cell_id,
error = %e,
"cortex_dispatched CloudEvent emit failed (best-effort, dispatch already succeeded)"
);
}
}
Ok(outcome)
}
pub fn wait_for_result(
&self,
pack: &ContextPack,
outcome: &CellSubmissionOutcome,
timeout: Duration,
) -> Result<CortexCellResult, anyhow::Error> {
if !outcome.lifecycle_events.is_empty() {
if let Some(result) = build_result_from_events(
&outcome.cell_id,
&pack.memory_digest,
&pack.doctrine_refs,
&outcome.lifecycle_events,
) {
return Ok(result);
}
}
let Some(jsonl_path) = std::env::var(CELL_OS_JSONL_EVENTS_ENV)
.ok()
.map(PathBuf::from)
else {
anyhow::bail!(
"wait_for_result: no in-memory lifecycle event captured and \
${CELL_OS_JSONL_EVENTS_ENV} is unset — cannot observe cell result"
);
};
let mut result = wait_for_result_from_jsonl(&outcome.cell_id, &jsonl_path, timeout)?;
result.doctrine_refs = pack.doctrine_refs.clone();
result.pack_id = pack.memory_digest.clone();
Ok(result)
}
fn ttl_for_pack(&self, pack: &ContextPack) -> u64 {
let Some(expires_at_ms) = pack.expires_at else {
return self.default_ttl_seconds;
};
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
if expires_at_ms <= now_ms {
return 1;
}
let remaining = (expires_at_ms - now_ms) / 1000;
remaining.max(1).min(self.default_ttl_seconds)
}
}
fn pack_to_cell_id(pack: &ContextPack) -> String {
let digest_prefix: String = pack
.memory_digest
.chars()
.filter(|c| c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-'))
.take(12)
.collect();
let task_prefix: String = pack
.task
.chars()
.filter(|c| c.is_ascii_alphanumeric())
.take(8)
.collect::<String>()
.to_lowercase();
if digest_prefix.is_empty() && task_prefix.is_empty() {
"cortex-cell".to_string()
} else if digest_prefix.is_empty() {
format!("cortex-{task_prefix}")
} else if task_prefix.is_empty() {
format!("cortex-{digest_prefix}")
} else {
format!("cortex-{digest_prefix}-{task_prefix}")
}
}
pub fn wait_for_result_from_jsonl(
cell_id: &str,
jsonl_path: &Path,
timeout: Duration,
) -> Result<CortexCellResult, anyhow::Error> {
const POLL_INTERVAL: Duration = Duration::from_millis(25);
let started = Instant::now();
loop {
if let Ok(contents) = std::fs::read_to_string(jsonl_path) {
let events = parse_jsonl_events(&contents);
if let Some(result) =
build_result_from_events(cell_id, "", &[], &events)
{
return Ok(result);
}
}
if started.elapsed() >= timeout {
anyhow::bail!(
"cell result timeout after {} seconds (cell_id={cell_id})",
timeout.as_secs(),
);
}
std::thread::sleep(POLL_INTERVAL);
}
}
fn parse_jsonl_events(contents: &str) -> Vec<CloudEventV1> {
contents
.lines()
.filter(|line| !line.trim().is_empty())
.filter_map(|line| serde_json::from_str::<CloudEventV1>(line).ok())
.collect()
}
fn build_result_from_events(
cell_id: &str,
pack_id: &str,
doctrine_refs: &[String],
events: &[CloudEventV1],
) -> Option<CortexCellResult> {
let destroyed = events.iter().rev().find(|ev| {
ev.ty == LIFECYCLE_DESTROYED_TYPE
&& event_data_cell_id(ev)
.map(|id| id == cell_id)
.unwrap_or(false)
})?;
let success = destroyed
.data
.as_ref()
.and_then(|d| d.get("outcome"))
.and_then(|v| v.as_str())
.map(|s| s == "succeeded")
.unwrap_or(false);
let lifecycle_destroyed_at = destroyed
.time
.as_deref()
.and_then(parse_rfc3339_to_unix_ms)
.unwrap_or(0);
let exit_code = events
.iter()
.rev()
.find(|ev| {
ev.ty == COMMAND_COMPLETED_TYPE
&& event_data_cell_id(ev)
.map(|id| id == cell_id)
.unwrap_or(false)
})
.and_then(|ev| ev.data.as_ref())
.and_then(|d| d.get("exitCode"))
.and_then(|v| v.as_i64())
.map(|n| n as i32)
.unwrap_or(if success { 0 } else { -1 });
let export_paths = events
.iter()
.filter(|ev| {
(ev.ty == EXPORT_COMPLETED_V1_TYPE || ev.ty == EXPORT_COMPLETED_V2_TYPE)
&& event_data_cell_id(ev)
.map(|id| id == cell_id)
.unwrap_or(false)
})
.filter_map(|ev| {
let data = ev.data.as_ref()?;
if let Some(p) = data.get("destinationRelative").and_then(|v| v.as_str()) {
return Some(p.to_string());
}
let receipt = data.get("receipt")?;
receipt
.get("destination")
.and_then(|v| v.as_str())
.or_else(|| receipt.get("destinationRelative").and_then(|v| v.as_str()))
.map(|s| s.to_string())
})
.collect();
Some(CortexCellResult {
cell_id: cell_id.to_string(),
pack_id: pack_id.to_string(),
exit_code,
success,
lifecycle_destroyed_at,
export_paths,
doctrine_refs: doctrine_refs.to_vec(),
})
}
fn event_data_cell_id(ev: &CloudEventV1) -> Option<&str> {
ev.data.as_ref()?.get("cellId")?.as_str()
}
fn parse_rfc3339_to_unix_ms(s: &str) -> Option<u64> {
let parsed = chrono::DateTime::parse_from_rfc3339(s).ok()?;
let ms = parsed.timestamp_millis();
if ms < 0 {
None
} else {
Some(ms as u64)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
struct CapturingSubmitter {
captured: Mutex<Vec<ExecutionCellDocument>>,
}
#[async_trait]
impl CellSubmitter for CapturingSubmitter {
async fn submit(
&self,
document: &ExecutionCellDocument,
) -> Result<CellSubmissionOutcome, anyhow::Error> {
self.captured.lock().unwrap().push(document.clone());
Ok(CellSubmissionOutcome {
cell_id: document.spec.id.clone(),
exit_code: Some(0),
lifecycle_events: Vec::new(),
})
}
}
fn submitter() -> (Arc<CapturingSubmitter>, Arc<dyn CellSubmitter>) {
let cap = Arc::new(CapturingSubmitter {
captured: Mutex::new(Vec::new()),
});
let dyn_cap: Arc<dyn CellSubmitter> = cap.clone();
(cap, dyn_cap)
}
#[test]
fn translate_produces_runspec_with_task_appended_to_argv() {
let (_cap, sub) = submitter();
let runner =
CortexCellRunner::new(sub, vec!["agent".into(), "--mode".into(), "run".into()]);
let pack = ContextPack {
memory_digest: "blake3:abcdef0123456789".into(),
doctrine_refs: vec!["p1".into()],
task: "do the thing".into(),
expires_at: None,
};
let t = runner.translate(&pack);
let run = t.document.spec.run.expect("runspec present");
assert_eq!(run.argv, vec!["agent", "--mode", "run", "do the thing"]);
assert_eq!(t.document.spec.lifetime.ttl_seconds, 300);
assert!(t.document.spec.id.starts_with("cortex-"));
}
#[test]
fn translate_clamps_ttl_to_default_when_pack_unbounded() {
let (_cap, sub) = submitter();
let runner = CortexCellRunner::new(sub, vec!["bin".into()]).with_default_ttl_seconds(42);
let pack = ContextPack::new("t");
let t = runner.translate(&pack);
assert_eq!(t.document.spec.lifetime.ttl_seconds, 42);
}
#[test]
fn translate_clamps_ttl_below_default_when_pack_expires_soon() {
let (_cap, sub) = submitter();
let runner = CortexCellRunner::new(sub, vec!["bin".into()]).with_default_ttl_seconds(1000);
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let pack = ContextPack {
memory_digest: String::new(),
doctrine_refs: Vec::new(),
task: "soon".into(),
expires_at: Some(now_ms + 5_000),
};
let t = runner.translate(&pack);
let ttl = t.document.spec.lifetime.ttl_seconds;
assert!((1..=6).contains(&ttl), "ttl was {ttl}");
}
#[test]
fn expired_pack_collapses_to_minimal_ttl() {
let (_cap, sub) = submitter();
let runner = CortexCellRunner::new(sub, vec!["bin".into()]);
let pack = ContextPack {
memory_digest: String::new(),
doctrine_refs: Vec::new(),
task: "stale".into(),
expires_at: Some(0),
};
let t = runner.translate(&pack);
assert_eq!(t.document.spec.lifetime.ttl_seconds, 1);
}
#[tokio::test]
async fn dispatch_submits_translated_document() {
let (cap, sub) = submitter();
let runner = CortexCellRunner::new(sub, vec!["agent".into()]);
let pack = ContextPack::new("ship it");
let outcome = runner.dispatch(&pack).await.expect("dispatch ok");
assert_eq!(outcome.exit_code, Some(0));
let captured = cap.captured.lock().unwrap();
assert_eq!(captured.len(), 1);
assert_eq!(
captured[0].spec.run.as_ref().unwrap().argv.last().unwrap(),
"ship it"
);
}
struct CaptureSink(std::sync::Mutex<Vec<CloudEventV1>>);
#[async_trait]
impl cellos_core::ports::EventSink for CaptureSink {
async fn emit(&self, event: &CloudEventV1) -> Result<(), cellos_core::error::CellosError> {
self.0.lock().unwrap().push(event.clone());
Ok(())
}
}
#[tokio::test]
async fn dispatch_emits_cortex_dispatched_event_when_sink_wired() {
let (_cap, sub) = submitter();
let sink: Arc<CaptureSink> = Arc::new(CaptureSink(std::sync::Mutex::new(Vec::new())));
let dyn_sink: Arc<dyn cellos_core::ports::EventSink> = sink.clone();
let runner = CortexCellRunner::new(sub, vec!["agent".into()]).with_event_sink(dyn_sink);
let pack = ContextPack {
memory_digest: "pack-abc".into(),
doctrine_refs: vec!["D1".into(), "D5".into()],
task: "ship it".into(),
expires_at: None,
};
let outcome = runner.dispatch(&pack).await.expect("dispatch ok");
let events = sink.0.lock().unwrap();
assert_eq!(events.len(), 1, "exactly one cortex_dispatched event");
let ev = &events[0];
assert_eq!(ev.ty, "dev.cellos.events.cell.cortex.v1.dispatched");
assert_eq!(ev.source, CORTEX_EVENT_SOURCE);
let data = ev.data.as_ref().expect("data present");
assert_eq!(
data.get("packId").and_then(|v| v.as_str()),
Some("pack-abc")
);
assert_eq!(
data.get("cellId").and_then(|v| v.as_str()),
Some(outcome.cell_id.as_str())
);
let refs = data
.get("doctrineRefs")
.and_then(|v| v.as_array())
.expect("doctrineRefs array");
assert_eq!(refs.len(), 2);
assert_eq!(refs[0].as_str(), Some("D1"));
assert_eq!(refs[1].as_str(), Some("D5"));
}
#[tokio::test]
async fn dispatch_without_sink_does_not_emit() {
let (_cap, sub) = submitter();
let runner = CortexCellRunner::new(sub, vec!["agent".into()]);
let pack = ContextPack::new("no-sink");
let outcome = runner.dispatch(&pack).await.expect("dispatch ok");
assert_eq!(outcome.exit_code, Some(0));
}
fn ev(ty: &str, time: Option<&str>, data: serde_json::Value) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: format!("test-{ty}"),
source: "test".into(),
ty: ty.into(),
datacontenttype: Some("application/json".into()),
data: Some(data),
time: time.map(|s| s.to_string()),
traceparent: None,
}
}
#[test]
fn wait_for_result_timeout_returns_err() {
let dir = tempdir_for_test();
let path = dir.join("events.jsonl");
std::fs::write(
&path,
serde_json::to_string(&ev(
"dev.cellos.events.cell.lifecycle.v1.started",
Some("2026-01-01T00:00:00Z"),
serde_json::json!({ "cellId": "other-cell" }),
))
.unwrap()
+ "\n",
)
.unwrap();
let err = wait_for_result_from_jsonl("target-cell", &path, Duration::from_millis(150))
.expect_err("must time out");
let msg = format!("{err}");
assert!(
msg.contains("timeout") && msg.contains("target-cell"),
"unexpected error message: {msg}"
);
}
#[test]
fn wait_for_result_parses_destroyed_event() {
let dir = tempdir_for_test();
let path = dir.join("events.jsonl");
let lines: Vec<String> = [
ev(
LIFECYCLE_DESTROYED_TYPE,
Some("2026-01-01T00:00:00Z"),
serde_json::json!({ "cellId": "noise", "outcome": "failed" }),
),
ev(
EXPORT_COMPLETED_V1_TYPE,
Some("2026-01-01T00:00:01Z"),
serde_json::json!({
"cellId": "target-cell",
"destinationRelative": "out/report.md"
}),
),
ev(
EXPORT_COMPLETED_V2_TYPE,
Some("2026-01-01T00:00:02Z"),
serde_json::json!({
"cellId": "target-cell",
"receipt": { "destination": "out/audit.json" }
}),
),
ev(
COMMAND_COMPLETED_TYPE,
Some("2026-01-01T00:00:03Z"),
serde_json::json!({
"cellId": "target-cell",
"exitCode": 0,
"durationMs": 1234,
"argv": ["agent", "do-thing"]
}),
),
ev(
LIFECYCLE_DESTROYED_TYPE,
Some("2026-01-01T00:00:04Z"),
serde_json::json!({
"cellId": "target-cell",
"outcome": "succeeded"
}),
),
]
.iter()
.map(|e| serde_json::to_string(e).unwrap())
.collect();
std::fs::write(&path, lines.join("\n") + "\n").unwrap();
let result = wait_for_result_from_jsonl("target-cell", &path, Duration::from_secs(2))
.expect("destroyed event must parse");
assert_eq!(result.cell_id, "target-cell");
assert!(result.success);
assert_eq!(result.exit_code, 0);
assert_eq!(result.lifecycle_destroyed_at, 1_767_225_604_000);
assert_eq!(
result.export_paths,
vec!["out/report.md".to_string(), "out/audit.json".to_string()]
);
assert!(result.doctrine_refs.is_empty());
assert_eq!(result.pack_id, "");
}
#[test]
fn wait_for_result_uses_inline_events_fast_path() {
let (_cap, sub) = submitter();
let runner = CortexCellRunner::new(sub, vec!["agent".into()]);
let pack = ContextPack {
memory_digest: "pack-xyz".into(),
doctrine_refs: vec!["D5".into()],
task: "ship it".into(),
expires_at: None,
};
let outcome = CellSubmissionOutcome {
cell_id: "inline-cell".into(),
exit_code: Some(0),
lifecycle_events: vec![
ev(
COMMAND_COMPLETED_TYPE,
Some("2026-01-01T00:00:03Z"),
serde_json::json!({
"cellId": "inline-cell",
"exitCode": 7,
"durationMs": 1,
"argv": ["agent"]
}),
),
ev(
LIFECYCLE_DESTROYED_TYPE,
Some("2026-01-01T00:00:04Z"),
serde_json::json!({
"cellId": "inline-cell",
"outcome": "failed"
}),
),
],
};
let result = runner
.wait_for_result(&pack, &outcome, Duration::from_millis(10))
.expect("inline path must succeed without polling");
assert_eq!(result.cell_id, "inline-cell");
assert_eq!(result.pack_id, "pack-xyz");
assert!(!result.success);
assert_eq!(result.exit_code, 7);
assert_eq!(result.doctrine_refs, vec!["D5".to_string()]);
}
fn tempdir_for_test() -> std::path::PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let dir =
std::env::temp_dir().join(format!("cellos-cortex-wait-{nanos}-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
dir
}
}