cellos_server/state.rs
1//! Shared application state for the HTTP control plane.
2//!
3//! `AppState` is `Clone`-able (all inner handles are `Arc`/optional) and is
4//! threaded through axum handlers via `with_state`. The in-memory registry
5//! is a *projection cache* over JetStream — it is intentionally not the
6//! source of truth (CHATROOM.md Session 16). On startup a future version
7//! will replay `cellos.events.>` to rebuild this map; for now it is
8//! populated by writes from POST /v1/formations.
9
10use std::collections::BTreeMap;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13
14use async_nats::jetstream::context::Context as JsContext;
15use async_nats::Client as NatsClient;
16use serde::{Deserialize, Serialize};
17use tokio::sync::RwLock;
18use tracing::debug;
19use uuid::Uuid;
20
21/// Required bearer token, validated against `Authorization: Bearer <token>`.
22/// When `None` the server refuses to start (see `main.rs`).
23pub type ApiToken = Arc<String>;
24
25#[derive(Clone)]
26pub struct AppState {
27 /// NATS connection used both by the WebSocket bridge and by future
28 /// projection replay. `Option` because tests can drive the router
29 /// without a live broker.
30 pub nats: Option<NatsClient>,
31 /// JetStream context (ADR-0011, ADR-0015). Populated alongside
32 /// `nats` once the server confirms the `CELLOS_EVENTS` stream
33 /// exists. `Option` so router tests without a live broker still
34 /// build the state cleanly; the WS bridge falls through to a
35 /// "broker not configured" close if this is `None`.
36 pub jetstream: Option<JsContext>,
37 /// In-memory projection of formations. UUID-keyed for stable lookup.
38 pub formations: Arc<RwLock<BTreeMap<Uuid, FormationRecord>>>,
39 /// In-memory projection of cells.
40 pub cells: Arc<RwLock<BTreeMap<String, CellRecord>>>,
41 /// Bearer token required on every non-public route.
42 pub api_token: ApiToken,
43 /// Highest JetStream stream-sequence the projection has applied
44 /// (ADR-0015 §D2). The snapshot endpoint reports this as `cursor`
45 /// so clients can open `/ws/events?since=<cursor>` and know they
46 /// will not miss any event after the snapshot.
47 ///
48 /// The WebSocket bridge bumps this monotonically as it forwards
49 /// frames (see `ws.rs`). Until the bridge is migrated to a real
50 /// JetStream consumer, this counter is per-process and resets on
51 /// restart — that is acceptable for the MVP because every browser
52 /// reconnect repeats the snapshot fetch.
53 pub applied_cursor: Arc<AtomicU64>,
54}
55
56impl AppState {
57 /// Constructor used by both `main.rs` and the test harness.
58 pub fn new(nats: Option<NatsClient>, api_token: impl Into<String>) -> Self {
59 Self {
60 nats,
61 jetstream: None,
62 formations: Arc::new(RwLock::new(BTreeMap::new())),
63 cells: Arc::new(RwLock::new(BTreeMap::new())),
64 api_token: Arc::new(api_token.into()),
65 applied_cursor: Arc::new(AtomicU64::new(0)),
66 }
67 }
68
69 /// Attach a JetStream context. Called from `main.rs` after
70 /// `ensure_stream` succeeds. Returns the same `AppState` for
71 /// builder-style chaining in tests.
72 pub fn with_jetstream(mut self, ctx: JsContext) -> Self {
73 self.jetstream = Some(ctx);
74 self
75 }
76
77 /// Current snapshot cursor — the highest seq the server has applied.
78 ///
79 /// Red-team wave 2 (LOW-W2A-1): `Acquire` is sufficient here. We need
80 /// to see writes that happen-before any successful `bump_cursor`
81 /// (i.e. the projection-apply write inside `apply_event_payload`);
82 /// the previous `SeqCst` enforced a global total order with every
83 /// other `SeqCst` operation in the process (counter `fetch_add`s in
84 /// `sni_proxy`, etc.) for no extra correctness gain on this read.
85 pub fn cursor(&self) -> u64 {
86 self.applied_cursor.load(Ordering::Acquire)
87 }
88
89 /// Bump the snapshot cursor to at least `seq`. Monotonic; out-of-order
90 /// values are ignored. Used by the WebSocket bridge as it forwards
91 /// frames.
92 ///
93 /// Red-team wave 2 (LOW-W2A-1): downgraded from `SeqCst` to
94 /// `AcqRel`/`Acquire`. The CAS-loop structure (re-load on conflict,
95 /// strict `seq > current` gate) guarantees monotonicity independent
96 /// of the memory ordering chosen. `AcqRel` on success publishes the
97 /// new cursor to every subsequent `cursor()` reader (snapshot GET
98 /// handlers, primarily); `Acquire` on the reload picks up the
99 /// observed value from the winner of the conflict.
100 pub fn bump_cursor(&self, seq: u64) {
101 // CAS loop keeps the cursor monotonic across concurrent ws workers.
102 let mut current = self.applied_cursor.load(Ordering::Acquire);
103 while seq > current {
104 match self.applied_cursor.compare_exchange(
105 current,
106 seq,
107 Ordering::AcqRel,
108 Ordering::Acquire,
109 ) {
110 Ok(_) => break,
111 Err(observed) => current = observed,
112 }
113 }
114 }
115
116 /// Apply a single CloudEvent payload (raw JSON bytes from
117 /// JetStream) to the in-memory projection.
118 ///
119 /// This is shared between the boot-time replay path
120 /// (`jetstream::replay_projection`) and the live WebSocket bridge
121 /// (`ws.rs`). Centralising the apply logic guarantees that what
122 /// the server reconstructs on startup is exactly what it
123 /// applies live — there is no "replay-only" code path that could
124 /// drift from the live one.
125 ///
126 /// CloudEvent `type` discriminates the transition. Two event families
127 /// mutate state today:
128 ///
129 /// - `formation.v1.*` — server-emitted formation lifecycle, drives the
130 /// `formations` projection (ADR-0010 admission gate output).
131 /// - `cell.lifecycle.v1.*` and `cell.command.v1.completed` —
132 /// supervisor-emitted cell lifecycle (ARCH-001). Without this, the
133 /// server replays cell events from JetStream but never updates the
134 /// `cells` map and `GET /v1/cells` returns `[]`. The cell projector
135 /// landed alongside the formation projector so `cellctl get cells`
136 /// reflects what the supervisor actually ran.
137 ///
138 /// Unknown event types still advance the JetStream cursor at the
139 /// caller (the apply contract is independent of whether the
140 /// projection mutated), so audit gaps surface as `ApplyOutcome::Ignored`
141 /// in the replay log rather than silent drops.
142 pub async fn apply_event_payload(&self, payload: &[u8]) -> anyhow::Result<ApplyOutcome> {
143 let event: serde_json::Value = serde_json::from_slice(payload)
144 .map_err(|e| anyhow::anyhow!("event payload not JSON: {e}"))?;
145
146 let ce_type = event
147 .get("type")
148 .and_then(|v| v.as_str())
149 .unwrap_or_default()
150 .to_string();
151
152 // ── Cell projector (ARCH-001) ────────────────────────────────────
153 // The supervisor emits three cell-shaped event types we project on.
154 // Match these BEFORE the formation prefix gate; otherwise an
155 // unrelated change to the formation gate (e.g. a future
156 // canonical-prefix tweak) could swallow cell events.
157 if let Some(outcome) = self.apply_cell_event(&ce_type, &event).await? {
158 return Ok(outcome);
159 }
160
161 // Wave 2 red-team CRITICAL fix (CRIT-W2D-1): the canonical event-type
162 // URN family emitted by `cellos-core::events::cloud_event_v1_formation_*`
163 // is `dev.cellos.events.cell.formation.v1.*`. The legacy
164 // `io.cellos.formation.v1.*` prefix this reader used to require is
165 // not emitted by any production code path today — replayed formations
166 // would bail at this gate and the in-memory `status` projection would
167 // freeze at PENDING forever. We accept BOTH so older event archives
168 // still replay cleanly; new emitters MUST use the canonical family
169 // enumerated in `cellos-core/src/events.rs` and audited by
170 // `docs/audit-log-retention.md` (FC-74).
171 const CANONICAL_FORMATION_PREFIX: &str = "dev.cellos.events.cell.formation.v1.";
172 const LEGACY_FORMATION_PREFIX: &str = "io.cellos.formation.v1.";
173 let phase = if let Some(p) = ce_type.strip_prefix(CANONICAL_FORMATION_PREFIX) {
174 p
175 } else if let Some(p) = ce_type.strip_prefix(LEGACY_FORMATION_PREFIX) {
176 p
177 } else {
178 // Unknown event family currently no-ops the projection. The
179 // cursor still advances at the caller because the apply
180 // contract is independent of whether any state changed.
181 debug!(
182 ce_type,
183 "apply_event_payload: not a formation or cell event; ignored"
184 );
185 return Ok(ApplyOutcome::Ignored);
186 };
187
188 // The supervisor packs the formation id into the CloudEvent's
189 // `data` payload. Wave-2 fix (CRIT-W2D-1): the canonical emitter in
190 // `cellos-core::events::formation_data_v1` uses camelCase keys
191 // (`formationId`, `formationName`); the legacy snake_case
192 // (`formation_id`, `name`) shape is accepted for archive replay
193 // only. `subject` is the last-resort fallback when neither field
194 // is populated.
195 let data = event
196 .get("data")
197 .cloned()
198 .unwrap_or(serde_json::Value::Null);
199 let id_str = data
200 .get("formationId")
201 .and_then(|v| v.as_str())
202 .or_else(|| data.get("formation_id").and_then(|v| v.as_str()))
203 .or_else(|| event.get("subject").and_then(|v| v.as_str()))
204 .ok_or_else(|| anyhow::anyhow!("event missing formationId"))?;
205 let id = Uuid::parse_str(id_str)
206 .map_err(|e| anyhow::anyhow!("event formationId not a UUID ({id_str}): {e}"))?;
207
208 let name = data
209 .get("formationName")
210 .and_then(|v| v.as_str())
211 .or_else(|| data.get("name").and_then(|v| v.as_str()))
212 .unwrap_or("")
213 .to_string();
214
215 let status = match phase {
216 "created" => FormationStatus::Pending,
217 "launching" | "running" | "degraded" => FormationStatus::Running,
218 "completed" => FormationStatus::Succeeded,
219 "failed" => FormationStatus::Failed,
220 // `cancelled` has no canonical emitter constructor in
221 // `cellos-core` today (see red-team-findings-A.md); the arm is
222 // retained so a future `cloud_event_v1_formation_cancelled`
223 // lands on a working reader. The legacy prefix can still carry
224 // this phase.
225 "cancelled" => FormationStatus::Cancelled,
226 _ => {
227 debug!(
228 ce_type,
229 "apply_event_payload: unknown formation event; ignored"
230 );
231 return Ok(ApplyOutcome::Ignored);
232 }
233 };
234
235 let mut map = self.formations.write().await;
236 let entry = map.entry(id).or_insert_with(|| FormationRecord {
237 id,
238 name: name.clone(),
239 status,
240 document: data.clone(),
241 });
242 if !name.is_empty() {
243 entry.name = name;
244 }
245 entry.status = status;
246 Ok(ApplyOutcome::Applied)
247 }
248
249 /// Apply a single supervisor-emitted cell event to the `cells`
250 /// projection (ARCH-001).
251 ///
252 /// Returns:
253 /// - `Ok(Some(ApplyOutcome::Applied))` if the event matched one of the
254 /// three supported cell types and the projection mutated.
255 /// - `Ok(Some(ApplyOutcome::Ignored))` if the event matched a cell type
256 /// but lacked the data we need (e.g. missing `cellId`); the caller
257 /// still advances the cursor.
258 /// - `Ok(None)` if the event is NOT a cell event — caller falls through
259 /// to the formation projector.
260 /// - `Err(_)` for payloads that cannot be parsed at all.
261 ///
262 /// Three event types are consumed (see `cellos-core::events` and the
263 /// supervisor's emit sites in `crates/cellos-supervisor/src/supervisor.rs`):
264 /// - `dev.cellos.events.cell.lifecycle.v1.started`
265 /// - `dev.cellos.events.cell.command.v1.completed`
266 /// - `dev.cellos.events.cell.lifecycle.v1.destroyed`
267 ///
268 /// Identity / network / observability / policy cell events are not
269 /// projected here — they're audit-trail events the supervisor emits,
270 /// not cell-state transitions cellctl users care about for `get cells`.
271 async fn apply_cell_event(
272 &self,
273 ce_type: &str,
274 event: &serde_json::Value,
275 ) -> anyhow::Result<Option<ApplyOutcome>> {
276 const STARTED: &str = "dev.cellos.events.cell.lifecycle.v1.started";
277 const COMPLETED: &str = "dev.cellos.events.cell.command.v1.completed";
278 const DESTROYED: &str = "dev.cellos.events.cell.lifecycle.v1.destroyed";
279
280 let phase = match ce_type {
281 STARTED => CellPhase::Started,
282 COMPLETED => CellPhase::CommandCompleted,
283 DESTROYED => CellPhase::Destroyed,
284 _ => return Ok(None),
285 };
286
287 let data = event
288 .get("data")
289 .cloned()
290 .unwrap_or(serde_json::Value::Null);
291 let Some(cell_id) = data.get("cellId").and_then(|v| v.as_str()) else {
292 debug!(
293 ce_type,
294 "apply_cell_event: missing data.cellId; cell event ignored"
295 );
296 return Ok(Some(ApplyOutcome::Ignored));
297 };
298 let cell_id = cell_id.to_string();
299 // `time` lives on the CloudEvent envelope (RFC3339); the supervisor's
300 // `cloud_event()` helper populates it on every emit.
301 let event_time = event
302 .get("time")
303 .and_then(|v| v.as_str())
304 .map(str::to_string);
305 let spec_id = data
306 .get("specId")
307 .and_then(|v| v.as_str())
308 .unwrap_or("")
309 .to_string();
310 let run_id = data
311 .get("runId")
312 .and_then(|v| v.as_str())
313 .map(str::to_string);
314
315 let mut map = self.cells.write().await;
316 let entry = map.entry(cell_id.clone()).or_insert_with(|| CellRecord {
317 id: cell_id.clone(),
318 spec_id: spec_id.clone(),
319 run_id: run_id.clone(),
320 state: CellState::Pending,
321 status: String::new(),
322 formation_id: None,
323 started_at: None,
324 destroyed_at: None,
325 outcome: None,
326 exit_code: None,
327 });
328
329 // Fill in fields that arrive on later events but weren't known on
330 // the first event we saw for this cell.
331 if entry.spec_id.is_empty() && !spec_id.is_empty() {
332 entry.spec_id = spec_id;
333 }
334 if entry.run_id.is_none() {
335 entry.run_id = run_id;
336 }
337
338 match phase {
339 CellPhase::Started => {
340 entry.state = CellState::Running;
341 entry.started_at = event_time;
342 }
343 CellPhase::CommandCompleted => {
344 if let Some(code) = data.get("exitCode").and_then(|v| v.as_i64()) {
345 entry.exit_code = Some(code as i32);
346 }
347 }
348 CellPhase::Destroyed => {
349 entry.state = CellState::Destroyed;
350 entry.destroyed_at = event_time;
351 if let Some(o) = data.get("outcome").and_then(|v| v.as_str()) {
352 entry.outcome = Some(o.to_string());
353 }
354 }
355 }
356
357 // Mirror the typed state into the legacy `status: String` field so
358 // pre-ARCH-001 consumers of `CellRecord` (and the JSON response
359 // for `GET /v1/cells`) keep a single human-readable label.
360 entry.status = entry.state.as_str().to_string();
361
362 Ok(Some(ApplyOutcome::Applied))
363 }
364}
365
366/// Internal discriminator for the three cell event types the server
367/// projects on. Pulled out so the match in `apply_cell_event` is
368/// exhaustive over a closed set, not stringly-typed.
369#[derive(Debug, Clone, Copy)]
370enum CellPhase {
371 Started,
372 CommandCompleted,
373 Destroyed,
374}
375
376/// Whether `apply_event_payload` changed the projection. Distinguishing
377/// the two lets the replay path log meaningful "applied vs skipped"
378/// counts and lets tests assert that unknown events are tolerated.
379#[derive(Debug, Clone, Copy, PartialEq, Eq)]
380pub enum ApplyOutcome {
381 /// Event matched a known type and the in-memory projection was
382 /// updated (or created).
383 Applied,
384 /// Event was structurally valid but its type/discriminant is not
385 /// owned by this projection (cell events, future families).
386 Ignored,
387}
388
389/// Lifecycle status of a formation. Mirrors the projector's state machine
390/// (CHATROOM Session 16 §state-machine-table). Only `PENDING` is emitted
391/// directly on POST; all other transitions are driven by CloudEvents
392/// observed on JetStream.
393#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
394#[serde(rename_all = "UPPERCASE")]
395pub enum FormationStatus {
396 Pending,
397 Running,
398 Succeeded,
399 Failed,
400 Cancelled,
401}
402
403/// Projected view of a formation. The full submitted document is retained
404/// so GET /v1/formations/{id} can echo the original spec — clients build
405/// their own state machines from this plus the WebSocket event stream.
406#[derive(Debug, Clone, Serialize, Deserialize)]
407pub struct FormationRecord {
408 pub id: Uuid,
409 pub name: String,
410 pub status: FormationStatus,
411 pub document: serde_json::Value,
412}
413
414/// Lifecycle state of an execution cell, derived from the supervisor's
415/// `cell.lifecycle.v1.*` event stream.
416///
417/// State transitions:
418/// - `Pending` (initial; never emitted on the wire — assigned when we
419/// first observe an event for a cell we haven't seen before, in case
420/// we eventually project a pre-spawn event family)
421/// - `Running` ← `cell.lifecycle.v1.started`
422/// - `Destroyed` ← `cell.lifecycle.v1.destroyed`
423///
424/// `cell.command.v1.completed` does NOT itself transition the state — the
425/// run is still alive until `destroyed` arrives. It only updates
426/// `exit_code` on the projected record.
427#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
428#[serde(rename_all = "lowercase")]
429pub enum CellState {
430 Pending,
431 Running,
432 Destroyed,
433}
434
435impl CellState {
436 /// Lowercase wire string, matching the serde `rename_all = "lowercase"`
437 /// representation. Used to populate `CellRecord::status` for the
438 /// human-readable label clients hit via `GET /v1/cells`.
439 pub fn as_str(self) -> &'static str {
440 match self {
441 CellState::Pending => "pending",
442 CellState::Running => "running",
443 CellState::Destroyed => "destroyed",
444 }
445 }
446}
447
448/// Projected view of an execution cell.
449///
450/// Fields are populated as cell events arrive from JetStream
451/// (`cell.lifecycle.v1.started`, `cell.command.v1.completed`,
452/// `cell.lifecycle.v1.destroyed`). The authoritative cell state lives
453/// in the event log; this struct is a query-latency cache rebuilt on
454/// every server restart by `jetstream::replay_projection`.
455///
456/// `status` is kept as a duplicated lowercase view of `state` so pre-
457/// ARCH-001 clients (and the JSON response shape) continue to see a
458/// single human-readable label without a breaking API change.
459///
460/// `formation_id` is reserved for future correlation: today the
461/// supervisor's CloudEvents do not carry a `formationId` field on the
462/// `Correlation` block (see `cellos-core::Correlation`), so this stays
463/// `None` for supervisor-emitted cells until that gap is closed
464/// upstream. The field is preserved so the wire shape doesn't churn
465/// when correlation lands.
466#[derive(Debug, Clone, Serialize, Deserialize)]
467pub struct CellRecord {
468 /// `data.cellId` from the supervisor's cell events (the projection key).
469 pub id: String,
470 /// `data.specId` from the lifecycle events — the execution-cell-spec id
471 /// the operator declared (e.g. `"e2e-stub-echo"`).
472 pub spec_id: String,
473 /// Optional supervisor run id (`data.runId`); not always populated on
474 /// the started event so we accept `None`.
475 pub run_id: Option<String>,
476 /// Typed lifecycle state.
477 pub state: CellState,
478 /// Lowercase mirror of `state` for legacy/string consumers.
479 pub status: String,
480 /// Reserved for formation correlation once the supervisor emits a
481 /// `formationId` on `Correlation`. Today the field stays `None` for
482 /// supervisor-emitted cells.
483 pub formation_id: Option<Uuid>,
484 /// CloudEvent `time` of the `cell.lifecycle.v1.started` event
485 /// (RFC3339). `None` until the started event has arrived.
486 pub started_at: Option<String>,
487 /// CloudEvent `time` of the `cell.lifecycle.v1.destroyed` event
488 /// (RFC3339). `None` until the destroyed event has arrived.
489 pub destroyed_at: Option<String>,
490 /// `data.outcome` from the destroyed event (e.g. `"succeeded"`,
491 /// `"failed"`).
492 pub outcome: Option<String>,
493 /// `data.exitCode` from the `cell.command.v1.completed` event when
494 /// the supervisor read an authenticated exit code. Omitted on forced
495 /// terminations (see `cellos-core::LifecycleTerminalState`).
496 pub exit_code: Option<i32>,
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502
503 /// Red-team wave 2 (LOW-W2A-1): `bump_cursor` must remain monotonic
504 /// after the SeqCst→AcqRel downgrade. The CAS-loop structure
505 /// guarantees this property independent of memory-ordering choice;
506 /// this test pins the contract so a future drive-by edit that
507 /// replaces the loop with a naive `store` cannot regress
508 /// silently. We hammer the state from many concurrent tokio tasks
509 /// with interleaved seq values and assert the final cursor equals
510 /// the highest seq ever submitted.
511 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
512 async fn bump_cursor_monotonic_under_concurrent_calls() {
513 let state = AppState::new(None, "test");
514 let workers = 8;
515 let per_worker = 500usize;
516 let mut handles = Vec::with_capacity(workers);
517 for w in 0..workers {
518 let s = state.clone();
519 handles.push(tokio::spawn(async move {
520 // Each worker submits seqs in an interleaved pattern so
521 // out-of-order arrivals are guaranteed across workers.
522 for i in 0..per_worker {
523 let seq = (i as u64) * (workers as u64) + (w as u64);
524 s.bump_cursor(seq);
525 }
526 }));
527 }
528 for h in handles {
529 h.await.expect("worker task");
530 }
531 let max_seq = (per_worker as u64 - 1) * (workers as u64) + (workers as u64 - 1);
532 assert_eq!(
533 state.cursor(),
534 max_seq,
535 "cursor must equal max seq across all workers; got {} expected {}",
536 state.cursor(),
537 max_seq,
538 );
539 }
540
541 /// Smaller-but-pointed test: a single stale low seq submitted AFTER
542 /// a higher one must NOT cause regression.
543 #[test]
544 fn bump_cursor_rejects_regression() {
545 // Synchronous variant — runtime is not required since the
546 // method itself is not async.
547 let state = AppState::new(None, "test");
548 state.bump_cursor(100);
549 state.bump_cursor(50); // out-of-order arrival
550 state.bump_cursor(99);
551 assert_eq!(state.cursor(), 100);
552 state.bump_cursor(101);
553 assert_eq!(state.cursor(), 101);
554 }
555}