Skip to main content

cellos_server/
state.rs

1//! Shared application state for the HTTP control plane.
2//!
3//! `AppState` is `Clone`-able (all inner handles are `Arc`/optional) and is
4//! threaded through axum handlers via `with_state`. The in-memory registry
5//! is a *projection cache* over JetStream — it is intentionally not the
6//! source of truth (CHATROOM.md Session 16). On startup a future version
7//! will replay `cellos.events.>` to rebuild this map; for now it is
8//! populated by writes from POST /v1/formations.
9
10use std::collections::BTreeMap;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13
14use async_nats::jetstream::context::Context as JsContext;
15use async_nats::Client as NatsClient;
16use serde::{Deserialize, Serialize};
17use tokio::sync::RwLock;
18use tracing::debug;
19use uuid::Uuid;
20
21/// Required bearer token, validated against `Authorization: Bearer <token>`.
22/// When `None` the server refuses to start (see `main.rs`).
23pub type ApiToken = Arc<String>;
24
25#[derive(Clone)]
26pub struct AppState {
27    /// NATS connection used both by the WebSocket bridge and by future
28    /// projection replay. `Option` because tests can drive the router
29    /// without a live broker.
30    pub nats: Option<NatsClient>,
31    /// JetStream context (ADR-0011, ADR-0015). Populated alongside
32    /// `nats` once the server confirms the `CELLOS_EVENTS` stream
33    /// exists. `Option` so router tests without a live broker still
34    /// build the state cleanly; the WS bridge falls through to a
35    /// "broker not configured" close if this is `None`.
36    pub jetstream: Option<JsContext>,
37    /// In-memory projection of formations. UUID-keyed for stable lookup.
38    pub formations: Arc<RwLock<BTreeMap<Uuid, FormationRecord>>>,
39    /// In-memory projection of cells.
40    pub cells: Arc<RwLock<BTreeMap<String, CellRecord>>>,
41    /// Bearer token required on every non-public route.
42    pub api_token: ApiToken,
43    /// Highest JetStream stream-sequence the projection has applied
44    /// (ADR-0015 §D2). The snapshot endpoint reports this as `cursor`
45    /// so clients can open `/ws/events?since=<cursor>` and know they
46    /// will not miss any event after the snapshot.
47    ///
48    /// The WebSocket bridge bumps this monotonically as it forwards
49    /// frames (see `ws.rs`). Until the bridge is migrated to a real
50    /// JetStream consumer, this counter is per-process and resets on
51    /// restart — that is acceptable for the MVP because every browser
52    /// reconnect repeats the snapshot fetch.
53    pub applied_cursor: Arc<AtomicU64>,
54}
55
56impl AppState {
57    /// Constructor used by both `main.rs` and the test harness.
58    pub fn new(nats: Option<NatsClient>, api_token: impl Into<String>) -> Self {
59        Self {
60            nats,
61            jetstream: None,
62            formations: Arc::new(RwLock::new(BTreeMap::new())),
63            cells: Arc::new(RwLock::new(BTreeMap::new())),
64            api_token: Arc::new(api_token.into()),
65            applied_cursor: Arc::new(AtomicU64::new(0)),
66        }
67    }
68
69    /// Attach a JetStream context. Called from `main.rs` after
70    /// `ensure_stream` succeeds. Returns the same `AppState` for
71    /// builder-style chaining in tests.
72    pub fn with_jetstream(mut self, ctx: JsContext) -> Self {
73        self.jetstream = Some(ctx);
74        self
75    }
76
77    /// Current snapshot cursor — the highest seq the server has applied.
78    ///
79    /// Red-team wave 2 (LOW-W2A-1): `Acquire` is sufficient here. We need
80    /// to see writes that happen-before any successful `bump_cursor`
81    /// (i.e. the projection-apply write inside `apply_event_payload`);
82    /// the previous `SeqCst` enforced a global total order with every
83    /// other `SeqCst` operation in the process (counter `fetch_add`s in
84    /// `sni_proxy`, etc.) for no extra correctness gain on this read.
85    pub fn cursor(&self) -> u64 {
86        self.applied_cursor.load(Ordering::Acquire)
87    }
88
89    /// Bump the snapshot cursor to at least `seq`. Monotonic; out-of-order
90    /// values are ignored. Used by the WebSocket bridge as it forwards
91    /// frames.
92    ///
93    /// Red-team wave 2 (LOW-W2A-1): downgraded from `SeqCst` to
94    /// `AcqRel`/`Acquire`. The CAS-loop structure (re-load on conflict,
95    /// strict `seq > current` gate) guarantees monotonicity independent
96    /// of the memory ordering chosen. `AcqRel` on success publishes the
97    /// new cursor to every subsequent `cursor()` reader (snapshot GET
98    /// handlers, primarily); `Acquire` on the reload picks up the
99    /// observed value from the winner of the conflict.
100    pub fn bump_cursor(&self, seq: u64) {
101        // CAS loop keeps the cursor monotonic across concurrent ws workers.
102        let mut current = self.applied_cursor.load(Ordering::Acquire);
103        while seq > current {
104            match self.applied_cursor.compare_exchange(
105                current,
106                seq,
107                Ordering::AcqRel,
108                Ordering::Acquire,
109            ) {
110                Ok(_) => break,
111                Err(observed) => current = observed,
112            }
113        }
114    }
115
116    /// Apply a single CloudEvent payload (raw JSON bytes from
117    /// JetStream) to the in-memory projection.
118    ///
119    /// This is shared between the boot-time replay path
120    /// (`jetstream::replay_projection`) and the live WebSocket bridge
121    /// (`ws.rs`). Centralising the apply logic guarantees that what
122    /// the server reconstructs on startup is exactly what it
123    /// applies live — there is no "replay-only" code path that could
124    /// drift from the live one.
125    ///
126    /// CloudEvent `type` discriminates the transition. Two event families
127    /// mutate state today:
128    ///
129    /// - `formation.v1.*` — server-emitted formation lifecycle, drives the
130    ///   `formations` projection (ADR-0010 admission gate output).
131    /// - `cell.lifecycle.v1.*` and `cell.command.v1.completed` —
132    ///   supervisor-emitted cell lifecycle (ARCH-001). Without this, the
133    ///   server replays cell events from JetStream but never updates the
134    ///   `cells` map and `GET /v1/cells` returns `[]`. The cell projector
135    ///   landed alongside the formation projector so `cellctl get cells`
136    ///   reflects what the supervisor actually ran.
137    ///
138    /// Unknown event types still advance the JetStream cursor at the
139    /// caller (the apply contract is independent of whether the
140    /// projection mutated), so audit gaps surface as `ApplyOutcome::Ignored`
141    /// in the replay log rather than silent drops.
142    pub async fn apply_event_payload(&self, payload: &[u8]) -> anyhow::Result<ApplyOutcome> {
143        let event: serde_json::Value = serde_json::from_slice(payload)
144            .map_err(|e| anyhow::anyhow!("event payload not JSON: {e}"))?;
145
146        let ce_type = event
147            .get("type")
148            .and_then(|v| v.as_str())
149            .unwrap_or_default()
150            .to_string();
151
152        // ── Cell projector (ARCH-001) ────────────────────────────────────
153        // The supervisor emits three cell-shaped event types we project on.
154        // Match these BEFORE the formation prefix gate; otherwise an
155        // unrelated change to the formation gate (e.g. a future
156        // canonical-prefix tweak) could swallow cell events.
157        if let Some(outcome) = self.apply_cell_event(&ce_type, &event).await? {
158            return Ok(outcome);
159        }
160
161        // Wave 2 red-team CRITICAL fix (CRIT-W2D-1): the canonical event-type
162        // URN family emitted by `cellos-core::events::cloud_event_v1_formation_*`
163        // is `dev.cellos.events.cell.formation.v1.*`. The legacy
164        // `io.cellos.formation.v1.*` prefix this reader used to require is
165        // not emitted by any production code path today — replayed formations
166        // would bail at this gate and the in-memory `status` projection would
167        // freeze at PENDING forever. We accept BOTH so older event archives
168        // still replay cleanly; new emitters MUST use the canonical family
169        // enumerated in `cellos-core/src/events.rs` and audited by
170        // `docs/audit-log-retention.md` (FC-74).
171        const CANONICAL_FORMATION_PREFIX: &str = "dev.cellos.events.cell.formation.v1.";
172        const LEGACY_FORMATION_PREFIX: &str = "io.cellos.formation.v1.";
173        let phase = if let Some(p) = ce_type.strip_prefix(CANONICAL_FORMATION_PREFIX) {
174            p
175        } else if let Some(p) = ce_type.strip_prefix(LEGACY_FORMATION_PREFIX) {
176            p
177        } else {
178            // Unknown event family currently no-ops the projection. The
179            // cursor still advances at the caller because the apply
180            // contract is independent of whether any state changed.
181            debug!(
182                ce_type,
183                "apply_event_payload: not a formation or cell event; ignored"
184            );
185            return Ok(ApplyOutcome::Ignored);
186        };
187
188        // The supervisor packs the formation id into the CloudEvent's
189        // `data` payload. Wave-2 fix (CRIT-W2D-1): the canonical emitter in
190        // `cellos-core::events::formation_data_v1` uses camelCase keys
191        // (`formationId`, `formationName`); the legacy snake_case
192        // (`formation_id`, `name`) shape is accepted for archive replay
193        // only. `subject` is the last-resort fallback when neither field
194        // is populated.
195        let data = event
196            .get("data")
197            .cloned()
198            .unwrap_or(serde_json::Value::Null);
199        let id_str = data
200            .get("formationId")
201            .and_then(|v| v.as_str())
202            .or_else(|| data.get("formation_id").and_then(|v| v.as_str()))
203            .or_else(|| event.get("subject").and_then(|v| v.as_str()))
204            .ok_or_else(|| anyhow::anyhow!("event missing formationId"))?;
205        let id = Uuid::parse_str(id_str)
206            .map_err(|e| anyhow::anyhow!("event formationId not a UUID ({id_str}): {e}"))?;
207
208        let name = data
209            .get("formationName")
210            .and_then(|v| v.as_str())
211            .or_else(|| data.get("name").and_then(|v| v.as_str()))
212            .unwrap_or("")
213            .to_string();
214
215        let status = match phase {
216            "created" => FormationStatus::Pending,
217            "launching" | "running" | "degraded" => FormationStatus::Running,
218            "completed" => FormationStatus::Succeeded,
219            "failed" => FormationStatus::Failed,
220            // `cancelled` has no canonical emitter constructor in
221            // `cellos-core` today (see red-team-findings-A.md); the arm is
222            // retained so a future `cloud_event_v1_formation_cancelled`
223            // lands on a working reader. The legacy prefix can still carry
224            // this phase.
225            "cancelled" => FormationStatus::Cancelled,
226            _ => {
227                debug!(
228                    ce_type,
229                    "apply_event_payload: unknown formation event; ignored"
230                );
231                return Ok(ApplyOutcome::Ignored);
232            }
233        };
234
235        let mut map = self.formations.write().await;
236        let entry = map.entry(id).or_insert_with(|| FormationRecord {
237            id,
238            name: name.clone(),
239            status,
240            document: data.clone(),
241        });
242        if !name.is_empty() {
243            entry.name = name;
244        }
245        entry.status = status;
246        Ok(ApplyOutcome::Applied)
247    }
248
249    /// Apply a single supervisor-emitted cell event to the `cells`
250    /// projection (ARCH-001).
251    ///
252    /// Returns:
253    /// - `Ok(Some(ApplyOutcome::Applied))` if the event matched one of the
254    ///   three supported cell types and the projection mutated.
255    /// - `Ok(Some(ApplyOutcome::Ignored))` if the event matched a cell type
256    ///   but lacked the data we need (e.g. missing `cellId`); the caller
257    ///   still advances the cursor.
258    /// - `Ok(None)` if the event is NOT a cell event — caller falls through
259    ///   to the formation projector.
260    /// - `Err(_)` for payloads that cannot be parsed at all.
261    ///
262    /// Three event types are consumed (see `cellos-core::events` and the
263    /// supervisor's emit sites in `crates/cellos-supervisor/src/supervisor.rs`):
264    /// - `dev.cellos.events.cell.lifecycle.v1.started`
265    /// - `dev.cellos.events.cell.command.v1.completed`
266    /// - `dev.cellos.events.cell.lifecycle.v1.destroyed`
267    ///
268    /// Identity / network / observability / policy cell events are not
269    /// projected here — they're audit-trail events the supervisor emits,
270    /// not cell-state transitions cellctl users care about for `get cells`.
271    async fn apply_cell_event(
272        &self,
273        ce_type: &str,
274        event: &serde_json::Value,
275    ) -> anyhow::Result<Option<ApplyOutcome>> {
276        const STARTED: &str = "dev.cellos.events.cell.lifecycle.v1.started";
277        const COMPLETED: &str = "dev.cellos.events.cell.command.v1.completed";
278        const DESTROYED: &str = "dev.cellos.events.cell.lifecycle.v1.destroyed";
279
280        let phase = match ce_type {
281            STARTED => CellPhase::Started,
282            COMPLETED => CellPhase::CommandCompleted,
283            DESTROYED => CellPhase::Destroyed,
284            _ => return Ok(None),
285        };
286
287        let data = event
288            .get("data")
289            .cloned()
290            .unwrap_or(serde_json::Value::Null);
291        let Some(cell_id) = data.get("cellId").and_then(|v| v.as_str()) else {
292            debug!(
293                ce_type,
294                "apply_cell_event: missing data.cellId; cell event ignored"
295            );
296            return Ok(Some(ApplyOutcome::Ignored));
297        };
298        let cell_id = cell_id.to_string();
299        // `time` lives on the CloudEvent envelope (RFC3339); the supervisor's
300        // `cloud_event()` helper populates it on every emit.
301        let event_time = event
302            .get("time")
303            .and_then(|v| v.as_str())
304            .map(str::to_string);
305        let spec_id = data
306            .get("specId")
307            .and_then(|v| v.as_str())
308            .unwrap_or("")
309            .to_string();
310        let run_id = data
311            .get("runId")
312            .and_then(|v| v.as_str())
313            .map(str::to_string);
314
315        let mut map = self.cells.write().await;
316        let entry = map.entry(cell_id.clone()).or_insert_with(|| CellRecord {
317            id: cell_id.clone(),
318            spec_id: spec_id.clone(),
319            run_id: run_id.clone(),
320            state: CellState::Pending,
321            status: String::new(),
322            formation_id: None,
323            started_at: None,
324            destroyed_at: None,
325            outcome: None,
326            exit_code: None,
327        });
328
329        // Fill in fields that arrive on later events but weren't known on
330        // the first event we saw for this cell.
331        if entry.spec_id.is_empty() && !spec_id.is_empty() {
332            entry.spec_id = spec_id;
333        }
334        if entry.run_id.is_none() {
335            entry.run_id = run_id;
336        }
337
338        match phase {
339            CellPhase::Started => {
340                entry.state = CellState::Running;
341                entry.started_at = event_time;
342            }
343            CellPhase::CommandCompleted => {
344                if let Some(code) = data.get("exitCode").and_then(|v| v.as_i64()) {
345                    entry.exit_code = Some(code as i32);
346                }
347            }
348            CellPhase::Destroyed => {
349                entry.state = CellState::Destroyed;
350                entry.destroyed_at = event_time;
351                if let Some(o) = data.get("outcome").and_then(|v| v.as_str()) {
352                    entry.outcome = Some(o.to_string());
353                }
354            }
355        }
356
357        // Mirror the typed state into the legacy `status: String` field so
358        // pre-ARCH-001 consumers of `CellRecord` (and the JSON response
359        // for `GET /v1/cells`) keep a single human-readable label.
360        entry.status = entry.state.as_str().to_string();
361
362        Ok(Some(ApplyOutcome::Applied))
363    }
364}
365
366/// Internal discriminator for the three cell event types the server
367/// projects on. Pulled out so the match in `apply_cell_event` is
368/// exhaustive over a closed set, not stringly-typed.
369#[derive(Debug, Clone, Copy)]
370enum CellPhase {
371    Started,
372    CommandCompleted,
373    Destroyed,
374}
375
376/// Whether `apply_event_payload` changed the projection. Distinguishing
377/// the two lets the replay path log meaningful "applied vs skipped"
378/// counts and lets tests assert that unknown events are tolerated.
379#[derive(Debug, Clone, Copy, PartialEq, Eq)]
380pub enum ApplyOutcome {
381    /// Event matched a known type and the in-memory projection was
382    /// updated (or created).
383    Applied,
384    /// Event was structurally valid but its type/discriminant is not
385    /// owned by this projection (cell events, future families).
386    Ignored,
387}
388
389/// Lifecycle status of a formation. Mirrors the projector's state machine
390/// (CHATROOM Session 16 §state-machine-table). Only `PENDING` is emitted
391/// directly on POST; all other transitions are driven by CloudEvents
392/// observed on JetStream.
393#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
394#[serde(rename_all = "UPPERCASE")]
395pub enum FormationStatus {
396    Pending,
397    Running,
398    Succeeded,
399    Failed,
400    Cancelled,
401}
402
403/// Projected view of a formation. The full submitted document is retained
404/// so GET /v1/formations/{id} can echo the original spec — clients build
405/// their own state machines from this plus the WebSocket event stream.
406#[derive(Debug, Clone, Serialize, Deserialize)]
407pub struct FormationRecord {
408    pub id: Uuid,
409    pub name: String,
410    pub status: FormationStatus,
411    pub document: serde_json::Value,
412}
413
414/// Lifecycle state of an execution cell, derived from the supervisor's
415/// `cell.lifecycle.v1.*` event stream.
416///
417/// State transitions:
418/// - `Pending` (initial; never emitted on the wire — assigned when we
419///   first observe an event for a cell we haven't seen before, in case
420///   we eventually project a pre-spawn event family)
421/// - `Running` ← `cell.lifecycle.v1.started`
422/// - `Destroyed` ← `cell.lifecycle.v1.destroyed`
423///
424/// `cell.command.v1.completed` does NOT itself transition the state — the
425/// run is still alive until `destroyed` arrives. It only updates
426/// `exit_code` on the projected record.
427#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
428#[serde(rename_all = "lowercase")]
429pub enum CellState {
430    Pending,
431    Running,
432    Destroyed,
433}
434
435impl CellState {
436    /// Lowercase wire string, matching the serde `rename_all = "lowercase"`
437    /// representation. Used to populate `CellRecord::status` for the
438    /// human-readable label clients hit via `GET /v1/cells`.
439    pub fn as_str(self) -> &'static str {
440        match self {
441            CellState::Pending => "pending",
442            CellState::Running => "running",
443            CellState::Destroyed => "destroyed",
444        }
445    }
446}
447
448/// Projected view of an execution cell.
449///
450/// Fields are populated as cell events arrive from JetStream
451/// (`cell.lifecycle.v1.started`, `cell.command.v1.completed`,
452/// `cell.lifecycle.v1.destroyed`). The authoritative cell state lives
453/// in the event log; this struct is a query-latency cache rebuilt on
454/// every server restart by `jetstream::replay_projection`.
455///
456/// `status` is kept as a duplicated lowercase view of `state` so pre-
457/// ARCH-001 clients (and the JSON response shape) continue to see a
458/// single human-readable label without a breaking API change.
459///
460/// `formation_id` is reserved for future correlation: today the
461/// supervisor's CloudEvents do not carry a `formationId` field on the
462/// `Correlation` block (see `cellos-core::Correlation`), so this stays
463/// `None` for supervisor-emitted cells until that gap is closed
464/// upstream. The field is preserved so the wire shape doesn't churn
465/// when correlation lands.
466#[derive(Debug, Clone, Serialize, Deserialize)]
467pub struct CellRecord {
468    /// `data.cellId` from the supervisor's cell events (the projection key).
469    pub id: String,
470    /// `data.specId` from the lifecycle events — the execution-cell-spec id
471    /// the operator declared (e.g. `"e2e-stub-echo"`).
472    pub spec_id: String,
473    /// Optional supervisor run id (`data.runId`); not always populated on
474    /// the started event so we accept `None`.
475    pub run_id: Option<String>,
476    /// Typed lifecycle state.
477    pub state: CellState,
478    /// Lowercase mirror of `state` for legacy/string consumers.
479    pub status: String,
480    /// Reserved for formation correlation once the supervisor emits a
481    /// `formationId` on `Correlation`. Today the field stays `None` for
482    /// supervisor-emitted cells.
483    pub formation_id: Option<Uuid>,
484    /// CloudEvent `time` of the `cell.lifecycle.v1.started` event
485    /// (RFC3339). `None` until the started event has arrived.
486    pub started_at: Option<String>,
487    /// CloudEvent `time` of the `cell.lifecycle.v1.destroyed` event
488    /// (RFC3339). `None` until the destroyed event has arrived.
489    pub destroyed_at: Option<String>,
490    /// `data.outcome` from the destroyed event (e.g. `"succeeded"`,
491    /// `"failed"`).
492    pub outcome: Option<String>,
493    /// `data.exitCode` from the `cell.command.v1.completed` event when
494    /// the supervisor read an authenticated exit code. Omitted on forced
495    /// terminations (see `cellos-core::LifecycleTerminalState`).
496    pub exit_code: Option<i32>,
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502
503    /// Red-team wave 2 (LOW-W2A-1): `bump_cursor` must remain monotonic
504    /// after the SeqCst→AcqRel downgrade. The CAS-loop structure
505    /// guarantees this property independent of memory-ordering choice;
506    /// this test pins the contract so a future drive-by edit that
507    /// replaces the loop with a naive `store` cannot regress
508    /// silently. We hammer the state from many concurrent tokio tasks
509    /// with interleaved seq values and assert the final cursor equals
510    /// the highest seq ever submitted.
511    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
512    async fn bump_cursor_monotonic_under_concurrent_calls() {
513        let state = AppState::new(None, "test");
514        let workers = 8;
515        let per_worker = 500usize;
516        let mut handles = Vec::with_capacity(workers);
517        for w in 0..workers {
518            let s = state.clone();
519            handles.push(tokio::spawn(async move {
520                // Each worker submits seqs in an interleaved pattern so
521                // out-of-order arrivals are guaranteed across workers.
522                for i in 0..per_worker {
523                    let seq = (i as u64) * (workers as u64) + (w as u64);
524                    s.bump_cursor(seq);
525                }
526            }));
527        }
528        for h in handles {
529            h.await.expect("worker task");
530        }
531        let max_seq = (per_worker as u64 - 1) * (workers as u64) + (workers as u64 - 1);
532        assert_eq!(
533            state.cursor(),
534            max_seq,
535            "cursor must equal max seq across all workers; got {} expected {}",
536            state.cursor(),
537            max_seq,
538        );
539    }
540
541    /// Smaller-but-pointed test: a single stale low seq submitted AFTER
542    /// a higher one must NOT cause regression.
543    #[test]
544    fn bump_cursor_rejects_regression() {
545        // Synchronous variant — runtime is not required since the
546        // method itself is not async.
547        let state = AppState::new(None, "test");
548        state.bump_cursor(100);
549        state.bump_cursor(50); // out-of-order arrival
550        state.bump_cursor(99);
551        assert_eq!(state.cursor(), 100);
552        state.bump_cursor(101);
553        assert_eq!(state.cursor(), 101);
554    }
555}