cellos-server 0.5.0

HTTP control plane for CellOS — admission, projection over JetStream, WebSocket fan-out of CloudEvents. Pure event-sourced architecture.
Documentation
//! Shared application state for the HTTP control plane.
//!
//! `AppState` is `Clone`-able (all inner handles are `Arc`/optional) and is
//! threaded through axum handlers via `with_state`. The in-memory registry
//! is a *projection cache* over JetStream — it is intentionally not the
//! source of truth (CHATROOM.md Session 16). On startup a future version
//! will replay `cellos.events.>` to rebuild this map; for now it is
//! populated by writes from POST /v1/formations.

use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use async_nats::jetstream::context::Context as JsContext;
use async_nats::Client as NatsClient;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::debug;
use uuid::Uuid;

/// Required bearer token, validated against `Authorization: Bearer <token>`.
/// When `None` the server refuses to start (see `main.rs`).
pub type ApiToken = Arc<String>;

#[derive(Clone)]
pub struct AppState {
    /// NATS connection used both by the WebSocket bridge and by future
    /// projection replay. `Option` because tests can drive the router
    /// without a live broker.
    pub nats: Option<NatsClient>,
    /// JetStream context (ADR-0011, ADR-0015). Populated alongside
    /// `nats` once the server confirms the `CELLOS_EVENTS` stream
    /// exists. `Option` so router tests without a live broker still
    /// build the state cleanly; the WS bridge falls through to a
    /// "broker not configured" close if this is `None`.
    pub jetstream: Option<JsContext>,
    /// In-memory projection of formations. UUID-keyed for stable lookup.
    pub formations: Arc<RwLock<BTreeMap<Uuid, FormationRecord>>>,
    /// In-memory projection of cells.
    pub cells: Arc<RwLock<BTreeMap<String, CellRecord>>>,
    /// Bearer token required on every non-public route.
    pub api_token: ApiToken,
    /// Highest JetStream stream-sequence the projection has applied
    /// (ADR-0015 §D2). The snapshot endpoint reports this as `cursor`
    /// so clients can open `/ws/events?since=<cursor>` and know they
    /// will not miss any event after the snapshot.
    ///
    /// The WebSocket bridge bumps this monotonically as it forwards
    /// frames (see `ws.rs`). Until the bridge is migrated to a real
    /// JetStream consumer, this counter is per-process and resets on
    /// restart — that is acceptable for the MVP because every browser
    /// reconnect repeats the snapshot fetch.
    pub applied_cursor: Arc<AtomicU64>,
}

impl AppState {
    /// Constructor used by both `main.rs` and the test harness.
    pub fn new(nats: Option<NatsClient>, api_token: impl Into<String>) -> Self {
        Self {
            nats,
            jetstream: None,
            formations: Arc::new(RwLock::new(BTreeMap::new())),
            cells: Arc::new(RwLock::new(BTreeMap::new())),
            api_token: Arc::new(api_token.into()),
            applied_cursor: Arc::new(AtomicU64::new(0)),
        }
    }

    /// Attach a JetStream context. Called from `main.rs` after
    /// `ensure_stream` succeeds. Returns the same `AppState` for
    /// builder-style chaining in tests.
    pub fn with_jetstream(mut self, ctx: JsContext) -> Self {
        self.jetstream = Some(ctx);
        self
    }

    /// Current snapshot cursor — the highest seq the server has applied.
    ///
    /// Red-team wave 2 (LOW-W2A-1): `Acquire` is sufficient here. We need
    /// to see writes that happen-before any successful `bump_cursor`
    /// (i.e. the projection-apply write inside `apply_event_payload`);
    /// the previous `SeqCst` enforced a global total order with every
    /// other `SeqCst` operation in the process (counter `fetch_add`s in
    /// `sni_proxy`, etc.) for no extra correctness gain on this read.
    pub fn cursor(&self) -> u64 {
        self.applied_cursor.load(Ordering::Acquire)
    }

    /// Bump the snapshot cursor to at least `seq`. Monotonic; out-of-order
    /// values are ignored. Used by the WebSocket bridge as it forwards
    /// frames.
    ///
    /// Red-team wave 2 (LOW-W2A-1): downgraded from `SeqCst` to
    /// `AcqRel`/`Acquire`. The CAS-loop structure (re-load on conflict,
    /// strict `seq > current` gate) guarantees monotonicity independent
    /// of the memory ordering chosen. `AcqRel` on success publishes the
    /// new cursor to every subsequent `cursor()` reader (snapshot GET
    /// handlers, primarily); `Acquire` on the reload picks up the
    /// observed value from the winner of the conflict.
    pub fn bump_cursor(&self, seq: u64) {
        // CAS loop keeps the cursor monotonic across concurrent ws workers.
        let mut current = self.applied_cursor.load(Ordering::Acquire);
        while seq > current {
            match self.applied_cursor.compare_exchange(
                current,
                seq,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => break,
                Err(observed) => current = observed,
            }
        }
    }

    /// Apply a single CloudEvent payload (raw JSON bytes from
    /// JetStream) to the in-memory projection.
    ///
    /// This is shared between the boot-time replay path
    /// (`jetstream::replay_projection`) and the live WebSocket bridge
    /// (`ws.rs`). Centralising the apply logic guarantees that what
    /// the server reconstructs on startup is exactly what it
    /// applies live — there is no "replay-only" code path that could
    /// drift from the live one.
    ///
    /// CloudEvent `type` discriminates the transition; only the
    /// `formation.v1.*` family currently mutates the projection. Cell
    /// events arrive (and the cursor still advances over them) but
    /// the `cells` map is populated by future work and we do not
    /// invent shape here.
    pub async fn apply_event_payload(&self, payload: &[u8]) -> anyhow::Result<ApplyOutcome> {
        let event: serde_json::Value = serde_json::from_slice(payload)
            .map_err(|e| anyhow::anyhow!("event payload not JSON: {e}"))?;

        let ce_type = event
            .get("type")
            .and_then(|v| v.as_str())
            .unwrap_or_default()
            .to_string();
        // Wave 2 red-team CRITICAL fix (CRIT-W2D-1): the canonical event-type
        // URN family emitted by `cellos-core::events::cloud_event_v1_formation_*`
        // is `dev.cellos.events.cell.formation.v1.*`. The legacy
        // `io.cellos.formation.v1.*` prefix this reader used to require is
        // not emitted by any production code path today — replayed formations
        // would bail at this gate and the in-memory `status` projection would
        // freeze at PENDING forever. We accept BOTH so older event archives
        // still replay cleanly; new emitters MUST use the canonical family
        // enumerated in `cellos-core/src/events.rs` and audited by
        // `docs/audit-log-retention.md` (FC-74).
        const CANONICAL_FORMATION_PREFIX: &str = "dev.cellos.events.cell.formation.v1.";
        const LEGACY_FORMATION_PREFIX: &str = "io.cellos.formation.v1.";
        let phase = if let Some(p) = ce_type.strip_prefix(CANONICAL_FORMATION_PREFIX) {
            p
        } else if let Some(p) = ce_type.strip_prefix(LEGACY_FORMATION_PREFIX) {
            p
        } else {
            // Cell events / unknown types currently no-op the
            // projection. The cursor still advances at the caller
            // because the apply contract is independent of whether
            // any state changed.
            debug!(
                ce_type,
                "apply_event_payload: not a formation event; ignored"
            );
            return Ok(ApplyOutcome::Ignored);
        };

        // The supervisor packs the formation id into the CloudEvent's
        // `data` payload. Wave-2 fix (CRIT-W2D-1): the canonical emitter in
        // `cellos-core::events::formation_data_v1` uses camelCase keys
        // (`formationId`, `formationName`); the legacy snake_case
        // (`formation_id`, `name`) shape is accepted for archive replay
        // only. `subject` is the last-resort fallback when neither field
        // is populated.
        let data = event
            .get("data")
            .cloned()
            .unwrap_or(serde_json::Value::Null);
        let id_str = data
            .get("formationId")
            .and_then(|v| v.as_str())
            .or_else(|| data.get("formation_id").and_then(|v| v.as_str()))
            .or_else(|| event.get("subject").and_then(|v| v.as_str()))
            .ok_or_else(|| anyhow::anyhow!("event missing formationId"))?;
        let id = Uuid::parse_str(id_str)
            .map_err(|e| anyhow::anyhow!("event formationId not a UUID ({id_str}): {e}"))?;

        let name = data
            .get("formationName")
            .and_then(|v| v.as_str())
            .or_else(|| data.get("name").and_then(|v| v.as_str()))
            .unwrap_or("")
            .to_string();

        let status = match phase {
            "created" => FormationStatus::Pending,
            "launching" | "running" | "degraded" => FormationStatus::Running,
            "completed" => FormationStatus::Succeeded,
            "failed" => FormationStatus::Failed,
            // `cancelled` has no canonical emitter constructor in
            // `cellos-core` today (see red-team-findings-A.md); the arm is
            // retained so a future `cloud_event_v1_formation_cancelled`
            // lands on a working reader. The legacy prefix can still carry
            // this phase.
            "cancelled" => FormationStatus::Cancelled,
            _ => {
                debug!(
                    ce_type,
                    "apply_event_payload: unknown formation event; ignored"
                );
                return Ok(ApplyOutcome::Ignored);
            }
        };

        let mut map = self.formations.write().await;
        let entry = map.entry(id).or_insert_with(|| FormationRecord {
            id,
            name: name.clone(),
            status,
            document: data.clone(),
        });
        if !name.is_empty() {
            entry.name = name;
        }
        entry.status = status;
        Ok(ApplyOutcome::Applied)
    }
}

/// Whether `apply_event_payload` changed the projection. Distinguishing
/// the two lets the replay path log meaningful "applied vs skipped"
/// counts and lets tests assert that unknown events are tolerated.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApplyOutcome {
    /// Event matched a known type and the in-memory projection was
    /// updated (or created).
    Applied,
    /// Event was structurally valid but its type/discriminant is not
    /// owned by this projection (cell events, future families).
    Ignored,
}

/// Lifecycle status of a formation. Mirrors the projector's state machine
/// (CHATROOM Session 16 §state-machine-table). Only `PENDING` is emitted
/// directly on POST; all other transitions are driven by CloudEvents
/// observed on JetStream.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "UPPERCASE")]
pub enum FormationStatus {
    Pending,
    Running,
    Succeeded,
    Failed,
    Cancelled,
}

/// Projected view of a formation. The full submitted document is retained
/// so GET /v1/formations/{id} can echo the original spec — clients build
/// their own state machines from this plus the WebSocket event stream.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FormationRecord {
    pub id: Uuid,
    pub name: String,
    pub status: FormationStatus,
    pub document: serde_json::Value,
}

/// Projected view of an execution cell. Fields are kept minimal — the
/// authoritative cell state is in JetStream; this is just a cache for
/// `GET /v1/cells` list latency.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CellRecord {
    pub id: String,
    pub formation_id: Option<Uuid>,
    pub status: String,
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Red-team wave 2 (LOW-W2A-1): `bump_cursor` must remain monotonic
    /// after the SeqCst→AcqRel downgrade. The CAS-loop structure
    /// guarantees this property independent of memory-ordering choice;
    /// this test pins the contract so a future drive-by edit that
    /// replaces the loop with a naive `store` cannot regress
    /// silently. We hammer the state from many concurrent tokio tasks
    /// with interleaved seq values and assert the final cursor equals
    /// the highest seq ever submitted.
    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
    async fn bump_cursor_monotonic_under_concurrent_calls() {
        let state = AppState::new(None, "test");
        let workers = 8;
        let per_worker = 500usize;
        let mut handles = Vec::with_capacity(workers);
        for w in 0..workers {
            let s = state.clone();
            handles.push(tokio::spawn(async move {
                // Each worker submits seqs in an interleaved pattern so
                // out-of-order arrivals are guaranteed across workers.
                for i in 0..per_worker {
                    let seq = (i as u64) * (workers as u64) + (w as u64);
                    s.bump_cursor(seq);
                }
            }));
        }
        for h in handles {
            h.await.expect("worker task");
        }
        let max_seq = (per_worker as u64 - 1) * (workers as u64) + (workers as u64 - 1);
        assert_eq!(
            state.cursor(),
            max_seq,
            "cursor must equal max seq across all workers; got {} expected {}",
            state.cursor(),
            max_seq,
        );
    }

    /// Smaller-but-pointed test: a single stale low seq submitted AFTER
    /// a higher one must NOT cause regression.
    #[test]
    fn bump_cursor_rejects_regression() {
        // Synchronous variant — runtime is not required since the
        // method itself is not async.
        let state = AppState::new(None, "test");
        state.bump_cursor(100);
        state.bump_cursor(50); // out-of-order arrival
        state.bump_cursor(99);
        assert_eq!(state.cursor(), 100);
        state.bump_cursor(101);
        assert_eq!(state.cursor(), 101);
    }
}