use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_nats::jetstream::context::Context as JsContext;
use async_nats::Client as NatsClient;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::{debug, warn};
use uuid::Uuid;
pub type ApiToken = Arc<String>;
#[derive(Clone)]
pub struct AppState {
pub nats: Option<NatsClient>,
pub jetstream: Option<JsContext>,
pub formations: Arc<RwLock<BTreeMap<Uuid, FormationRecord>>>,
pub cells: Arc<RwLock<BTreeMap<String, CellRecord>>>,
pub api_token: ApiToken,
pub applied_cursor: Arc<AtomicU64>,
}
impl AppState {
pub fn new(nats: Option<NatsClient>, api_token: impl Into<String>) -> Self {
Self {
nats,
jetstream: None,
formations: Arc::new(RwLock::new(BTreeMap::new())),
cells: Arc::new(RwLock::new(BTreeMap::new())),
api_token: Arc::new(api_token.into()),
applied_cursor: Arc::new(AtomicU64::new(0)),
}
}
pub fn with_jetstream(mut self, ctx: JsContext) -> Self {
self.jetstream = Some(ctx);
self
}
pub fn cursor(&self) -> u64 {
self.applied_cursor.load(Ordering::Acquire)
}
pub fn bump_cursor(&self, seq: u64) {
let mut current = self.applied_cursor.load(Ordering::Acquire);
while seq > current {
match self.applied_cursor.compare_exchange(
current,
seq,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(observed) => current = observed,
}
}
}
pub async fn apply_event_payload(&self, payload: &[u8]) -> anyhow::Result<ApplyOutcome> {
let event: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| anyhow::anyhow!("event payload not JSON: {e}"))?;
let ce_type = event
.get("type")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
if let Some(outcome) = self.apply_cell_event(&ce_type, &event).await? {
return Ok(outcome);
}
const CANONICAL_FORMATION_PREFIX: &str = "dev.cellos.events.cell.formation.v1.";
const LEGACY_FORMATION_PREFIX: &str = "io.cellos.formation.v1.";
let phase = if let Some(p) = ce_type.strip_prefix(CANONICAL_FORMATION_PREFIX) {
p
} else if let Some(p) = ce_type.strip_prefix(LEGACY_FORMATION_PREFIX) {
p
} else {
debug!(
ce_type,
"apply_event_payload: not a formation or cell event; ignored"
);
return Ok(ApplyOutcome::Ignored);
};
let data = event
.get("data")
.cloned()
.unwrap_or(serde_json::Value::Null);
let id_str = data
.get("formationId")
.and_then(|v| v.as_str())
.or_else(|| data.get("formation_id").and_then(|v| v.as_str()))
.or_else(|| event.get("subject").and_then(|v| v.as_str()))
.ok_or_else(|| anyhow::anyhow!("event missing formationId"))?;
let id = Uuid::parse_str(id_str)
.map_err(|e| anyhow::anyhow!("event formationId not a UUID ({id_str}): {e}"))?;
let name = data
.get("formationName")
.and_then(|v| v.as_str())
.or_else(|| data.get("name").and_then(|v| v.as_str()))
.unwrap_or("")
.to_string();
let status = match phase {
"created" => FormationStatus::Pending,
"launching" | "running" | "degraded" => FormationStatus::Running,
"completed" => FormationStatus::Succeeded,
"failed" => FormationStatus::Failed,
"cancelled" => FormationStatus::Cancelled,
_ => {
debug!(
ce_type,
"apply_event_payload: unknown formation event; ignored"
);
return Ok(ApplyOutcome::Ignored);
}
};
let mut map = self.formations.write().await;
let entry = map.entry(id).or_insert_with(|| FormationRecord {
id,
name: name.clone(),
status,
document: data.clone(),
});
if !name.is_empty() {
entry.name = name;
}
entry.status = status;
Ok(ApplyOutcome::Applied)
}
async fn apply_cell_event(
&self,
ce_type: &str,
event: &serde_json::Value,
) -> anyhow::Result<Option<ApplyOutcome>> {
const STARTED: &str = "dev.cellos.events.cell.lifecycle.v1.started";
const COMPLETED: &str = "dev.cellos.events.cell.command.v1.completed";
const DESTROYED: &str = "dev.cellos.events.cell.lifecycle.v1.destroyed";
let phase = match ce_type {
STARTED => CellPhase::Started,
COMPLETED => CellPhase::CommandCompleted,
DESTROYED => CellPhase::Destroyed,
_ => return Ok(None),
};
let data = event
.get("data")
.cloned()
.unwrap_or(serde_json::Value::Null);
let Some(cell_id) = data.get("cellId").and_then(|v| v.as_str()) else {
debug!(
ce_type,
"apply_cell_event: missing data.cellId; cell event ignored"
);
return Ok(Some(ApplyOutcome::Ignored));
};
let cell_id = cell_id.to_string();
let event_time = event
.get("time")
.and_then(|v| v.as_str())
.map(str::to_string);
let Some(spec_id) = data.get("specId").and_then(|v| v.as_str()) else {
warn!(
ce_type,
cell_id = %cell_id,
"apply_cell_event: missing data.specId; cell event skipped (strict admission)"
);
return Ok(Some(ApplyOutcome::Ignored));
};
let spec_id = spec_id.to_string();
let run_id = data
.get("runId")
.and_then(|v| v.as_str())
.map(str::to_string);
let mut map = self.cells.write().await;
let entry = map.entry(cell_id.clone()).or_insert_with(|| CellRecord {
id: cell_id.clone(),
spec_id: spec_id.clone(),
run_id: run_id.clone(),
state: CellState::Pending,
status: String::new(),
formation_id: None,
started_at: None,
destroyed_at: None,
outcome: None,
exit_code: None,
});
if matches!(entry.state, CellState::Destroyed) {
warn!(
ce_type = match phase {
CellPhase::Started => "dev.cellos.events.cell.lifecycle.v1.started",
CellPhase::CommandCompleted =>
"dev.cellos.events.cell.command.v1.completed",
CellPhase::Destroyed => "dev.cellos.events.cell.lifecycle.v1.destroyed",
},
cell_id = %cell_id,
event_time = event_time.as_deref().unwrap_or(""),
"apply_cell_event: post-terminal event ignored; cell already Destroyed \
(likely out-of-order delivery or replay reordering)"
);
return Ok(Some(ApplyOutcome::Applied));
}
if entry.spec_id.is_empty() && !spec_id.is_empty() {
entry.spec_id = spec_id;
}
if entry.run_id.is_none() {
entry.run_id = run_id;
}
match phase {
CellPhase::Started => {
entry.state = CellState::Running;
entry.started_at = event_time;
}
CellPhase::CommandCompleted => {
if let Some(code) = data.get("exitCode").and_then(|v| v.as_i64()) {
entry.exit_code = Some(code as i32);
}
}
CellPhase::Destroyed => {
entry.state = CellState::Destroyed;
entry.destroyed_at = event_time;
if let Some(o) = data.get("outcome").and_then(|v| v.as_str()) {
entry.outcome = Some(o.to_string());
}
}
}
entry.status = entry.state.as_str().to_string();
Ok(Some(ApplyOutcome::Applied))
}
}
#[derive(Debug, Clone, Copy)]
enum CellPhase {
Started,
CommandCompleted,
Destroyed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApplyOutcome {
Applied,
Ignored,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "UPPERCASE")]
pub enum FormationStatus {
Pending,
Running,
Succeeded,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FormationRecord {
pub id: Uuid,
pub name: String,
pub status: FormationStatus,
pub document: serde_json::Value,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum CellState {
Pending,
Running,
Destroyed,
}
impl CellState {
pub fn as_str(self) -> &'static str {
match self {
CellState::Pending => "pending",
CellState::Running => "running",
CellState::Destroyed => "destroyed",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CellRecord {
pub id: String,
pub spec_id: String,
pub run_id: Option<String>,
pub state: CellState,
pub status: String,
pub formation_id: Option<Uuid>,
pub started_at: Option<String>,
pub destroyed_at: Option<String>,
pub outcome: Option<String>,
pub exit_code: Option<i32>,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn bump_cursor_monotonic_under_concurrent_calls() {
let state = AppState::new(None, "test");
let workers = 8;
let per_worker = 500usize;
let mut handles = Vec::with_capacity(workers);
for w in 0..workers {
let s = state.clone();
handles.push(tokio::spawn(async move {
for i in 0..per_worker {
let seq = (i as u64) * (workers as u64) + (w as u64);
s.bump_cursor(seq);
}
}));
}
for h in handles {
h.await.expect("worker task");
}
let max_seq = (per_worker as u64 - 1) * (workers as u64) + (workers as u64 - 1);
assert_eq!(
state.cursor(),
max_seq,
"cursor must equal max seq across all workers; got {} expected {}",
state.cursor(),
max_seq,
);
}
#[test]
fn bump_cursor_rejects_regression() {
let state = AppState::new(None, "test");
state.bump_cursor(100);
state.bump_cursor(50); state.bump_cursor(99);
assert_eq!(state.cursor(), 100);
state.bump_cursor(101);
assert_eq!(state.cursor(), 101);
}
fn cell_event(ce_type: &str, cell_id: &str, spec_id: Option<&str>) -> serde_json::Value {
let mut data = serde_json::Map::new();
data.insert("cellId".to_string(), serde_json::json!(cell_id));
if let Some(s) = spec_id {
data.insert("specId".to_string(), serde_json::json!(s));
}
serde_json::json!({
"type": ce_type,
"time": "2026-05-17T12:00:00Z",
"data": serde_json::Value::Object(data),
})
}
#[tokio::test]
async fn cell_destroyed_then_command_completed_does_not_mutate() {
let state = AppState::new(None, "test");
let started = cell_event(
"dev.cellos.events.cell.lifecycle.v1.started",
"cell-arch001a",
Some("e2e-stub-echo"),
);
let out = state
.apply_event_payload(&serde_json::to_vec(&started).unwrap())
.await
.expect("started apply");
assert_eq!(out, ApplyOutcome::Applied);
let mut destroyed = cell_event(
"dev.cellos.events.cell.lifecycle.v1.destroyed",
"cell-arch001a",
Some("e2e-stub-echo"),
);
destroyed["data"]["outcome"] = serde_json::json!("succeeded");
let out = state
.apply_event_payload(&serde_json::to_vec(&destroyed).unwrap())
.await
.expect("destroyed apply");
assert_eq!(out, ApplyOutcome::Applied);
{
let cells = state.cells.read().await;
let entry = cells.get("cell-arch001a").expect("cell present");
assert_eq!(entry.state, CellState::Destroyed);
assert_eq!(entry.exit_code, None);
}
let mut completed = cell_event(
"dev.cellos.events.cell.command.v1.completed",
"cell-arch001a",
Some("e2e-stub-echo"),
);
completed["data"]["exitCode"] = serde_json::json!(42);
let out = state
.apply_event_payload(&serde_json::to_vec(&completed).unwrap())
.await
.expect("late completed apply");
assert_eq!(
out,
ApplyOutcome::Applied,
"late completed-after-destroyed: generic terminal-state guard (RT3-HIGH-3) recognizes the event family and advances cursor, but refuses mutation"
);
let cells = state.cells.read().await;
let entry = cells.get("cell-arch001a").expect("cell present");
assert_eq!(
entry.exit_code, None,
"exit_code must NOT be mutated after destroyed (terminal-state guard)"
);
assert_eq!(entry.state, CellState::Destroyed);
}
#[tokio::test]
async fn cell_event_missing_spec_id_is_skipped() {
let state = AppState::new(None, "test");
let started_no_spec = cell_event(
"dev.cellos.events.cell.lifecycle.v1.started",
"cell-arch001b",
None,
);
let out = state
.apply_event_payload(&serde_json::to_vec(&started_no_spec).unwrap())
.await
.expect("apply must not error on malformed event");
assert_eq!(
out,
ApplyOutcome::Ignored,
"missing specId must result in Ignored (strict admission)"
);
let cells = state.cells.read().await;
assert!(
!cells.contains_key("cell-arch001b"),
"projection must not gain a phantom cell from an event missing specId"
);
assert!(
cells.is_empty(),
"projection must remain empty after a single malformed event"
);
}
}