use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_nats::jetstream::context::Context as JsContext;
use async_nats::Client as NatsClient;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::debug;
use uuid::Uuid;
pub type ApiToken = Arc<String>;
#[derive(Clone)]
pub struct AppState {
pub nats: Option<NatsClient>,
pub jetstream: Option<JsContext>,
pub formations: Arc<RwLock<BTreeMap<Uuid, FormationRecord>>>,
pub cells: Arc<RwLock<BTreeMap<String, CellRecord>>>,
pub api_token: ApiToken,
pub applied_cursor: Arc<AtomicU64>,
}
impl AppState {
pub fn new(nats: Option<NatsClient>, api_token: impl Into<String>) -> Self {
Self {
nats,
jetstream: None,
formations: Arc::new(RwLock::new(BTreeMap::new())),
cells: Arc::new(RwLock::new(BTreeMap::new())),
api_token: Arc::new(api_token.into()),
applied_cursor: Arc::new(AtomicU64::new(0)),
}
}
pub fn with_jetstream(mut self, ctx: JsContext) -> Self {
self.jetstream = Some(ctx);
self
}
pub fn cursor(&self) -> u64 {
self.applied_cursor.load(Ordering::Acquire)
}
pub fn bump_cursor(&self, seq: u64) {
let mut current = self.applied_cursor.load(Ordering::Acquire);
while seq > current {
match self.applied_cursor.compare_exchange(
current,
seq,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(observed) => current = observed,
}
}
}
pub async fn apply_event_payload(&self, payload: &[u8]) -> anyhow::Result<ApplyOutcome> {
let event: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| anyhow::anyhow!("event payload not JSON: {e}"))?;
let ce_type = event
.get("type")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
if let Some(outcome) = self.apply_cell_event(&ce_type, &event).await? {
return Ok(outcome);
}
const CANONICAL_FORMATION_PREFIX: &str = "dev.cellos.events.cell.formation.v1.";
const LEGACY_FORMATION_PREFIX: &str = "io.cellos.formation.v1.";
let phase = if let Some(p) = ce_type.strip_prefix(CANONICAL_FORMATION_PREFIX) {
p
} else if let Some(p) = ce_type.strip_prefix(LEGACY_FORMATION_PREFIX) {
p
} else {
debug!(
ce_type,
"apply_event_payload: not a formation or cell event; ignored"
);
return Ok(ApplyOutcome::Ignored);
};
let data = event
.get("data")
.cloned()
.unwrap_or(serde_json::Value::Null);
let id_str = data
.get("formationId")
.and_then(|v| v.as_str())
.or_else(|| data.get("formation_id").and_then(|v| v.as_str()))
.or_else(|| event.get("subject").and_then(|v| v.as_str()))
.ok_or_else(|| anyhow::anyhow!("event missing formationId"))?;
let id = Uuid::parse_str(id_str)
.map_err(|e| anyhow::anyhow!("event formationId not a UUID ({id_str}): {e}"))?;
let name = data
.get("formationName")
.and_then(|v| v.as_str())
.or_else(|| data.get("name").and_then(|v| v.as_str()))
.unwrap_or("")
.to_string();
let status = match phase {
"created" => FormationStatus::Pending,
"launching" | "running" | "degraded" => FormationStatus::Running,
"completed" => FormationStatus::Succeeded,
"failed" => FormationStatus::Failed,
"cancelled" => FormationStatus::Cancelled,
_ => {
debug!(
ce_type,
"apply_event_payload: unknown formation event; ignored"
);
return Ok(ApplyOutcome::Ignored);
}
};
let mut map = self.formations.write().await;
let entry = map.entry(id).or_insert_with(|| FormationRecord {
id,
name: name.clone(),
status,
document: data.clone(),
});
if !name.is_empty() {
entry.name = name;
}
entry.status = status;
Ok(ApplyOutcome::Applied)
}
async fn apply_cell_event(
&self,
ce_type: &str,
event: &serde_json::Value,
) -> anyhow::Result<Option<ApplyOutcome>> {
const STARTED: &str = "dev.cellos.events.cell.lifecycle.v1.started";
const COMPLETED: &str = "dev.cellos.events.cell.command.v1.completed";
const DESTROYED: &str = "dev.cellos.events.cell.lifecycle.v1.destroyed";
let phase = match ce_type {
STARTED => CellPhase::Started,
COMPLETED => CellPhase::CommandCompleted,
DESTROYED => CellPhase::Destroyed,
_ => return Ok(None),
};
let data = event
.get("data")
.cloned()
.unwrap_or(serde_json::Value::Null);
let Some(cell_id) = data.get("cellId").and_then(|v| v.as_str()) else {
debug!(
ce_type,
"apply_cell_event: missing data.cellId; cell event ignored"
);
return Ok(Some(ApplyOutcome::Ignored));
};
let cell_id = cell_id.to_string();
let event_time = event
.get("time")
.and_then(|v| v.as_str())
.map(str::to_string);
let spec_id = data
.get("specId")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let run_id = data
.get("runId")
.and_then(|v| v.as_str())
.map(str::to_string);
let mut map = self.cells.write().await;
let entry = map.entry(cell_id.clone()).or_insert_with(|| CellRecord {
id: cell_id.clone(),
spec_id: spec_id.clone(),
run_id: run_id.clone(),
state: CellState::Pending,
status: String::new(),
formation_id: None,
started_at: None,
destroyed_at: None,
outcome: None,
exit_code: None,
});
if entry.spec_id.is_empty() && !spec_id.is_empty() {
entry.spec_id = spec_id;
}
if entry.run_id.is_none() {
entry.run_id = run_id;
}
match phase {
CellPhase::Started => {
entry.state = CellState::Running;
entry.started_at = event_time;
}
CellPhase::CommandCompleted => {
if let Some(code) = data.get("exitCode").and_then(|v| v.as_i64()) {
entry.exit_code = Some(code as i32);
}
}
CellPhase::Destroyed => {
entry.state = CellState::Destroyed;
entry.destroyed_at = event_time;
if let Some(o) = data.get("outcome").and_then(|v| v.as_str()) {
entry.outcome = Some(o.to_string());
}
}
}
entry.status = entry.state.as_str().to_string();
Ok(Some(ApplyOutcome::Applied))
}
}
#[derive(Debug, Clone, Copy)]
enum CellPhase {
Started,
CommandCompleted,
Destroyed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApplyOutcome {
Applied,
Ignored,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "UPPERCASE")]
pub enum FormationStatus {
Pending,
Running,
Succeeded,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FormationRecord {
pub id: Uuid,
pub name: String,
pub status: FormationStatus,
pub document: serde_json::Value,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum CellState {
Pending,
Running,
Destroyed,
}
impl CellState {
pub fn as_str(self) -> &'static str {
match self {
CellState::Pending => "pending",
CellState::Running => "running",
CellState::Destroyed => "destroyed",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CellRecord {
pub id: String,
pub spec_id: String,
pub run_id: Option<String>,
pub state: CellState,
pub status: String,
pub formation_id: Option<Uuid>,
pub started_at: Option<String>,
pub destroyed_at: Option<String>,
pub outcome: Option<String>,
pub exit_code: Option<i32>,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn bump_cursor_monotonic_under_concurrent_calls() {
let state = AppState::new(None, "test");
let workers = 8;
let per_worker = 500usize;
let mut handles = Vec::with_capacity(workers);
for w in 0..workers {
let s = state.clone();
handles.push(tokio::spawn(async move {
for i in 0..per_worker {
let seq = (i as u64) * (workers as u64) + (w as u64);
s.bump_cursor(seq);
}
}));
}
for h in handles {
h.await.expect("worker task");
}
let max_seq = (per_worker as u64 - 1) * (workers as u64) + (workers as u64 - 1);
assert_eq!(
state.cursor(),
max_seq,
"cursor must equal max seq across all workers; got {} expected {}",
state.cursor(),
max_seq,
);
}
#[test]
fn bump_cursor_rejects_regression() {
let state = AppState::new(None, "test");
state.bump_cursor(100);
state.bump_cursor(50); state.bump_cursor(99);
assert_eq!(state.cursor(), 100);
state.bump_cursor(101);
assert_eq!(state.cursor(), 101);
}
}