cellos_server/state.rs
1//! Shared application state for the HTTP control plane.
2//!
3//! `AppState` is `Clone`-able (all inner handles are `Arc`/optional) and is
4//! threaded through axum handlers via `with_state`. The in-memory registry
5//! is a *projection cache* over JetStream — it is intentionally not the
6//! source of truth (CHATROOM.md Session 16). On startup a future version
7//! will replay `cellos.events.>` to rebuild this map; for now it is
8//! populated by writes from POST /v1/formations.
9
10use std::collections::BTreeMap;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13
14use async_nats::jetstream::context::Context as JsContext;
15use async_nats::Client as NatsClient;
16use serde::{Deserialize, Serialize};
17use tokio::sync::RwLock;
18use tracing::debug;
19use uuid::Uuid;
20
21/// Required bearer token, validated against `Authorization: Bearer <token>`.
22/// When `None` the server refuses to start (see `main.rs`).
23pub type ApiToken = Arc<String>;
24
25#[derive(Clone)]
26pub struct AppState {
27 /// NATS connection used both by the WebSocket bridge and by future
28 /// projection replay. `Option` because tests can drive the router
29 /// without a live broker.
30 pub nats: Option<NatsClient>,
31 /// JetStream context (ADR-0011, ADR-0015). Populated alongside
32 /// `nats` once the server confirms the `CELLOS_EVENTS` stream
33 /// exists. `Option` so router tests without a live broker still
34 /// build the state cleanly; the WS bridge falls through to a
35 /// "broker not configured" close if this is `None`.
36 pub jetstream: Option<JsContext>,
37 /// In-memory projection of formations. UUID-keyed for stable lookup.
38 pub formations: Arc<RwLock<BTreeMap<Uuid, FormationRecord>>>,
39 /// In-memory projection of cells.
40 pub cells: Arc<RwLock<BTreeMap<String, CellRecord>>>,
41 /// Bearer token required on every non-public route.
42 pub api_token: ApiToken,
43 /// Highest JetStream stream-sequence the projection has applied
44 /// (ADR-0015 §D2). The snapshot endpoint reports this as `cursor`
45 /// so clients can open `/ws/events?since=<cursor>` and know they
46 /// will not miss any event after the snapshot.
47 ///
48 /// The WebSocket bridge bumps this monotonically as it forwards
49 /// frames (see `ws.rs`). Until the bridge is migrated to a real
50 /// JetStream consumer, this counter is per-process and resets on
51 /// restart — that is acceptable for the MVP because every browser
52 /// reconnect repeats the snapshot fetch.
53 pub applied_cursor: Arc<AtomicU64>,
54}
55
56impl AppState {
57 /// Constructor used by both `main.rs` and the test harness.
58 pub fn new(nats: Option<NatsClient>, api_token: impl Into<String>) -> Self {
59 Self {
60 nats,
61 jetstream: None,
62 formations: Arc::new(RwLock::new(BTreeMap::new())),
63 cells: Arc::new(RwLock::new(BTreeMap::new())),
64 api_token: Arc::new(api_token.into()),
65 applied_cursor: Arc::new(AtomicU64::new(0)),
66 }
67 }
68
69 /// Attach a JetStream context. Called from `main.rs` after
70 /// `ensure_stream` succeeds. Returns the same `AppState` for
71 /// builder-style chaining in tests.
72 pub fn with_jetstream(mut self, ctx: JsContext) -> Self {
73 self.jetstream = Some(ctx);
74 self
75 }
76
77 /// Current snapshot cursor — the highest seq the server has applied.
78 ///
79 /// Red-team wave 2 (LOW-W2A-1): `Acquire` is sufficient here. We need
80 /// to see writes that happen-before any successful `bump_cursor`
81 /// (i.e. the projection-apply write inside `apply_event_payload`);
82 /// the previous `SeqCst` enforced a global total order with every
83 /// other `SeqCst` operation in the process (counter `fetch_add`s in
84 /// `sni_proxy`, etc.) for no extra correctness gain on this read.
85 pub fn cursor(&self) -> u64 {
86 self.applied_cursor.load(Ordering::Acquire)
87 }
88
89 /// Bump the snapshot cursor to at least `seq`. Monotonic; out-of-order
90 /// values are ignored. Used by the WebSocket bridge as it forwards
91 /// frames.
92 ///
93 /// Red-team wave 2 (LOW-W2A-1): downgraded from `SeqCst` to
94 /// `AcqRel`/`Acquire`. The CAS-loop structure (re-load on conflict,
95 /// strict `seq > current` gate) guarantees monotonicity independent
96 /// of the memory ordering chosen. `AcqRel` on success publishes the
97 /// new cursor to every subsequent `cursor()` reader (snapshot GET
98 /// handlers, primarily); `Acquire` on the reload picks up the
99 /// observed value from the winner of the conflict.
100 pub fn bump_cursor(&self, seq: u64) {
101 // CAS loop keeps the cursor monotonic across concurrent ws workers.
102 let mut current = self.applied_cursor.load(Ordering::Acquire);
103 while seq > current {
104 match self.applied_cursor.compare_exchange(
105 current,
106 seq,
107 Ordering::AcqRel,
108 Ordering::Acquire,
109 ) {
110 Ok(_) => break,
111 Err(observed) => current = observed,
112 }
113 }
114 }
115
116 /// Apply a single CloudEvent payload (raw JSON bytes from
117 /// JetStream) to the in-memory projection.
118 ///
119 /// This is shared between the boot-time replay path
120 /// (`jetstream::replay_projection`) and the live WebSocket bridge
121 /// (`ws.rs`). Centralising the apply logic guarantees that what
122 /// the server reconstructs on startup is exactly what it
123 /// applies live — there is no "replay-only" code path that could
124 /// drift from the live one.
125 ///
126 /// CloudEvent `type` discriminates the transition; only the
127 /// `formation.v1.*` family currently mutates the projection. Cell
128 /// events arrive (and the cursor still advances over them) but
129 /// the `cells` map is populated by future work and we do not
130 /// invent shape here.
131 pub async fn apply_event_payload(&self, payload: &[u8]) -> anyhow::Result<ApplyOutcome> {
132 let event: serde_json::Value = serde_json::from_slice(payload)
133 .map_err(|e| anyhow::anyhow!("event payload not JSON: {e}"))?;
134
135 let ce_type = event
136 .get("type")
137 .and_then(|v| v.as_str())
138 .unwrap_or_default()
139 .to_string();
140 // Wave 2 red-team CRITICAL fix (CRIT-W2D-1): the canonical event-type
141 // URN family emitted by `cellos-core::events::cloud_event_v1_formation_*`
142 // is `dev.cellos.events.cell.formation.v1.*`. The legacy
143 // `io.cellos.formation.v1.*` prefix this reader used to require is
144 // not emitted by any production code path today — replayed formations
145 // would bail at this gate and the in-memory `status` projection would
146 // freeze at PENDING forever. We accept BOTH so older event archives
147 // still replay cleanly; new emitters MUST use the canonical family
148 // enumerated in `cellos-core/src/events.rs` and audited by
149 // `docs/audit-log-retention.md` (FC-74).
150 const CANONICAL_FORMATION_PREFIX: &str = "dev.cellos.events.cell.formation.v1.";
151 const LEGACY_FORMATION_PREFIX: &str = "io.cellos.formation.v1.";
152 let phase = if let Some(p) = ce_type.strip_prefix(CANONICAL_FORMATION_PREFIX) {
153 p
154 } else if let Some(p) = ce_type.strip_prefix(LEGACY_FORMATION_PREFIX) {
155 p
156 } else {
157 // Cell events / unknown types currently no-op the
158 // projection. The cursor still advances at the caller
159 // because the apply contract is independent of whether
160 // any state changed.
161 debug!(
162 ce_type,
163 "apply_event_payload: not a formation event; ignored"
164 );
165 return Ok(ApplyOutcome::Ignored);
166 };
167
168 // The supervisor packs the formation id into the CloudEvent's
169 // `data` payload. Wave-2 fix (CRIT-W2D-1): the canonical emitter in
170 // `cellos-core::events::formation_data_v1` uses camelCase keys
171 // (`formationId`, `formationName`); the legacy snake_case
172 // (`formation_id`, `name`) shape is accepted for archive replay
173 // only. `subject` is the last-resort fallback when neither field
174 // is populated.
175 let data = event
176 .get("data")
177 .cloned()
178 .unwrap_or(serde_json::Value::Null);
179 let id_str = data
180 .get("formationId")
181 .and_then(|v| v.as_str())
182 .or_else(|| data.get("formation_id").and_then(|v| v.as_str()))
183 .or_else(|| event.get("subject").and_then(|v| v.as_str()))
184 .ok_or_else(|| anyhow::anyhow!("event missing formationId"))?;
185 let id = Uuid::parse_str(id_str)
186 .map_err(|e| anyhow::anyhow!("event formationId not a UUID ({id_str}): {e}"))?;
187
188 let name = data
189 .get("formationName")
190 .and_then(|v| v.as_str())
191 .or_else(|| data.get("name").and_then(|v| v.as_str()))
192 .unwrap_or("")
193 .to_string();
194
195 let status = match phase {
196 "created" => FormationStatus::Pending,
197 "launching" | "running" | "degraded" => FormationStatus::Running,
198 "completed" => FormationStatus::Succeeded,
199 "failed" => FormationStatus::Failed,
200 // `cancelled` has no canonical emitter constructor in
201 // `cellos-core` today (see red-team-findings-A.md); the arm is
202 // retained so a future `cloud_event_v1_formation_cancelled`
203 // lands on a working reader. The legacy prefix can still carry
204 // this phase.
205 "cancelled" => FormationStatus::Cancelled,
206 _ => {
207 debug!(
208 ce_type,
209 "apply_event_payload: unknown formation event; ignored"
210 );
211 return Ok(ApplyOutcome::Ignored);
212 }
213 };
214
215 let mut map = self.formations.write().await;
216 let entry = map.entry(id).or_insert_with(|| FormationRecord {
217 id,
218 name: name.clone(),
219 status,
220 document: data.clone(),
221 });
222 if !name.is_empty() {
223 entry.name = name;
224 }
225 entry.status = status;
226 Ok(ApplyOutcome::Applied)
227 }
228}
229
230/// Whether `apply_event_payload` changed the projection. Distinguishing
231/// the two lets the replay path log meaningful "applied vs skipped"
232/// counts and lets tests assert that unknown events are tolerated.
233#[derive(Debug, Clone, Copy, PartialEq, Eq)]
234pub enum ApplyOutcome {
235 /// Event matched a known type and the in-memory projection was
236 /// updated (or created).
237 Applied,
238 /// Event was structurally valid but its type/discriminant is not
239 /// owned by this projection (cell events, future families).
240 Ignored,
241}
242
243/// Lifecycle status of a formation. Mirrors the projector's state machine
244/// (CHATROOM Session 16 §state-machine-table). Only `PENDING` is emitted
245/// directly on POST; all other transitions are driven by CloudEvents
246/// observed on JetStream.
247#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
248#[serde(rename_all = "UPPERCASE")]
249pub enum FormationStatus {
250 Pending,
251 Running,
252 Succeeded,
253 Failed,
254 Cancelled,
255}
256
257/// Projected view of a formation. The full submitted document is retained
258/// so GET /v1/formations/{id} can echo the original spec — clients build
259/// their own state machines from this plus the WebSocket event stream.
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct FormationRecord {
262 pub id: Uuid,
263 pub name: String,
264 pub status: FormationStatus,
265 pub document: serde_json::Value,
266}
267
268/// Projected view of an execution cell. Fields are kept minimal — the
269/// authoritative cell state is in JetStream; this is just a cache for
270/// `GET /v1/cells` list latency.
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct CellRecord {
273 pub id: String,
274 pub formation_id: Option<Uuid>,
275 pub status: String,
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281
282 /// Red-team wave 2 (LOW-W2A-1): `bump_cursor` must remain monotonic
283 /// after the SeqCst→AcqRel downgrade. The CAS-loop structure
284 /// guarantees this property independent of memory-ordering choice;
285 /// this test pins the contract so a future drive-by edit that
286 /// replaces the loop with a naive `store` cannot regress
287 /// silently. We hammer the state from many concurrent tokio tasks
288 /// with interleaved seq values and assert the final cursor equals
289 /// the highest seq ever submitted.
290 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
291 async fn bump_cursor_monotonic_under_concurrent_calls() {
292 let state = AppState::new(None, "test");
293 let workers = 8;
294 let per_worker = 500usize;
295 let mut handles = Vec::with_capacity(workers);
296 for w in 0..workers {
297 let s = state.clone();
298 handles.push(tokio::spawn(async move {
299 // Each worker submits seqs in an interleaved pattern so
300 // out-of-order arrivals are guaranteed across workers.
301 for i in 0..per_worker {
302 let seq = (i as u64) * (workers as u64) + (w as u64);
303 s.bump_cursor(seq);
304 }
305 }));
306 }
307 for h in handles {
308 h.await.expect("worker task");
309 }
310 let max_seq = (per_worker as u64 - 1) * (workers as u64) + (workers as u64 - 1);
311 assert_eq!(
312 state.cursor(),
313 max_seq,
314 "cursor must equal max seq across all workers; got {} expected {}",
315 state.cursor(),
316 max_seq,
317 );
318 }
319
320 /// Smaller-but-pointed test: a single stale low seq submitted AFTER
321 /// a higher one must NOT cause regression.
322 #[test]
323 fn bump_cursor_rejects_regression() {
324 // Synchronous variant — runtime is not required since the
325 // method itself is not async.
326 let state = AppState::new(None, "test");
327 state.bump_cursor(100);
328 state.bump_cursor(50); // out-of-order arrival
329 state.bump_cursor(99);
330 assert_eq!(state.cursor(), 100);
331 state.bump_cursor(101);
332 assert_eq!(state.cursor(), 101);
333 }
334}