Skip to main content

cellos_server/
state.rs

1//! Shared application state for the HTTP control plane.
2//!
3//! `AppState` is `Clone`-able (all inner handles are `Arc`/optional) and is
4//! threaded through axum handlers via `with_state`. The in-memory registry
5//! is a *projection cache* over JetStream — it is intentionally not the
6//! source of truth (CHATROOM.md Session 16). On startup a future version
7//! will replay `cellos.events.>` to rebuild this map; for now it is
8//! populated by writes from POST /v1/formations.
9
10use std::collections::BTreeMap;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13
14use async_nats::jetstream::context::Context as JsContext;
15use async_nats::Client as NatsClient;
16use serde::{Deserialize, Serialize};
17use tokio::sync::RwLock;
18use tracing::debug;
19use uuid::Uuid;
20
21/// Required bearer token, validated against `Authorization: Bearer <token>`.
22/// When `None` the server refuses to start (see `main.rs`).
23pub type ApiToken = Arc<String>;
24
25#[derive(Clone)]
26pub struct AppState {
27    /// NATS connection used both by the WebSocket bridge and by future
28    /// projection replay. `Option` because tests can drive the router
29    /// without a live broker.
30    pub nats: Option<NatsClient>,
31    /// JetStream context (ADR-0011, ADR-0015). Populated alongside
32    /// `nats` once the server confirms the `CELLOS_EVENTS` stream
33    /// exists. `Option` so router tests without a live broker still
34    /// build the state cleanly; the WS bridge falls through to a
35    /// "broker not configured" close if this is `None`.
36    pub jetstream: Option<JsContext>,
37    /// In-memory projection of formations. UUID-keyed for stable lookup.
38    pub formations: Arc<RwLock<BTreeMap<Uuid, FormationRecord>>>,
39    /// In-memory projection of cells.
40    pub cells: Arc<RwLock<BTreeMap<String, CellRecord>>>,
41    /// Bearer token required on every non-public route.
42    pub api_token: ApiToken,
43    /// Highest JetStream stream-sequence the projection has applied
44    /// (ADR-0015 §D2). The snapshot endpoint reports this as `cursor`
45    /// so clients can open `/ws/events?since=<cursor>` and know they
46    /// will not miss any event after the snapshot.
47    ///
48    /// The WebSocket bridge bumps this monotonically as it forwards
49    /// frames (see `ws.rs`). Until the bridge is migrated to a real
50    /// JetStream consumer, this counter is per-process and resets on
51    /// restart — that is acceptable for the MVP because every browser
52    /// reconnect repeats the snapshot fetch.
53    pub applied_cursor: Arc<AtomicU64>,
54}
55
56impl AppState {
57    /// Constructor used by both `main.rs` and the test harness.
58    pub fn new(nats: Option<NatsClient>, api_token: impl Into<String>) -> Self {
59        Self {
60            nats,
61            jetstream: None,
62            formations: Arc::new(RwLock::new(BTreeMap::new())),
63            cells: Arc::new(RwLock::new(BTreeMap::new())),
64            api_token: Arc::new(api_token.into()),
65            applied_cursor: Arc::new(AtomicU64::new(0)),
66        }
67    }
68
69    /// Attach a JetStream context. Called from `main.rs` after
70    /// `ensure_stream` succeeds. Returns the same `AppState` for
71    /// builder-style chaining in tests.
72    pub fn with_jetstream(mut self, ctx: JsContext) -> Self {
73        self.jetstream = Some(ctx);
74        self
75    }
76
77    /// Current snapshot cursor — the highest seq the server has applied.
78    ///
79    /// Red-team wave 2 (LOW-W2A-1): `Acquire` is sufficient here. We need
80    /// to see writes that happen-before any successful `bump_cursor`
81    /// (i.e. the projection-apply write inside `apply_event_payload`);
82    /// the previous `SeqCst` enforced a global total order with every
83    /// other `SeqCst` operation in the process (counter `fetch_add`s in
84    /// `sni_proxy`, etc.) for no extra correctness gain on this read.
85    pub fn cursor(&self) -> u64 {
86        self.applied_cursor.load(Ordering::Acquire)
87    }
88
89    /// Bump the snapshot cursor to at least `seq`. Monotonic; out-of-order
90    /// values are ignored. Used by the WebSocket bridge as it forwards
91    /// frames.
92    ///
93    /// Red-team wave 2 (LOW-W2A-1): downgraded from `SeqCst` to
94    /// `AcqRel`/`Acquire`. The CAS-loop structure (re-load on conflict,
95    /// strict `seq > current` gate) guarantees monotonicity independent
96    /// of the memory ordering chosen. `AcqRel` on success publishes the
97    /// new cursor to every subsequent `cursor()` reader (snapshot GET
98    /// handlers, primarily); `Acquire` on the reload picks up the
99    /// observed value from the winner of the conflict.
100    pub fn bump_cursor(&self, seq: u64) {
101        // CAS loop keeps the cursor monotonic across concurrent ws workers.
102        let mut current = self.applied_cursor.load(Ordering::Acquire);
103        while seq > current {
104            match self.applied_cursor.compare_exchange(
105                current,
106                seq,
107                Ordering::AcqRel,
108                Ordering::Acquire,
109            ) {
110                Ok(_) => break,
111                Err(observed) => current = observed,
112            }
113        }
114    }
115
116    /// Apply a single CloudEvent payload (raw JSON bytes from
117    /// JetStream) to the in-memory projection.
118    ///
119    /// This is shared between the boot-time replay path
120    /// (`jetstream::replay_projection`) and the live WebSocket bridge
121    /// (`ws.rs`). Centralising the apply logic guarantees that what
122    /// the server reconstructs on startup is exactly what it
123    /// applies live — there is no "replay-only" code path that could
124    /// drift from the live one.
125    ///
126    /// CloudEvent `type` discriminates the transition; only the
127    /// `formation.v1.*` family currently mutates the projection. Cell
128    /// events arrive (and the cursor still advances over them) but
129    /// the `cells` map is populated by future work and we do not
130    /// invent shape here.
131    pub async fn apply_event_payload(&self, payload: &[u8]) -> anyhow::Result<ApplyOutcome> {
132        let event: serde_json::Value = serde_json::from_slice(payload)
133            .map_err(|e| anyhow::anyhow!("event payload not JSON: {e}"))?;
134
135        let ce_type = event
136            .get("type")
137            .and_then(|v| v.as_str())
138            .unwrap_or_default()
139            .to_string();
140        // Wave 2 red-team CRITICAL fix (CRIT-W2D-1): the canonical event-type
141        // URN family emitted by `cellos-core::events::cloud_event_v1_formation_*`
142        // is `dev.cellos.events.cell.formation.v1.*`. The legacy
143        // `io.cellos.formation.v1.*` prefix this reader used to require is
144        // not emitted by any production code path today — replayed formations
145        // would bail at this gate and the in-memory `status` projection would
146        // freeze at PENDING forever. We accept BOTH so older event archives
147        // still replay cleanly; new emitters MUST use the canonical family
148        // enumerated in `cellos-core/src/events.rs` and audited by
149        // `docs/audit-log-retention.md` (FC-74).
150        const CANONICAL_FORMATION_PREFIX: &str = "dev.cellos.events.cell.formation.v1.";
151        const LEGACY_FORMATION_PREFIX: &str = "io.cellos.formation.v1.";
152        let phase = if let Some(p) = ce_type.strip_prefix(CANONICAL_FORMATION_PREFIX) {
153            p
154        } else if let Some(p) = ce_type.strip_prefix(LEGACY_FORMATION_PREFIX) {
155            p
156        } else {
157            // Cell events / unknown types currently no-op the
158            // projection. The cursor still advances at the caller
159            // because the apply contract is independent of whether
160            // any state changed.
161            debug!(
162                ce_type,
163                "apply_event_payload: not a formation event; ignored"
164            );
165            return Ok(ApplyOutcome::Ignored);
166        };
167
168        // The supervisor packs the formation id into the CloudEvent's
169        // `data` payload. Wave-2 fix (CRIT-W2D-1): the canonical emitter in
170        // `cellos-core::events::formation_data_v1` uses camelCase keys
171        // (`formationId`, `formationName`); the legacy snake_case
172        // (`formation_id`, `name`) shape is accepted for archive replay
173        // only. `subject` is the last-resort fallback when neither field
174        // is populated.
175        let data = event
176            .get("data")
177            .cloned()
178            .unwrap_or(serde_json::Value::Null);
179        let id_str = data
180            .get("formationId")
181            .and_then(|v| v.as_str())
182            .or_else(|| data.get("formation_id").and_then(|v| v.as_str()))
183            .or_else(|| event.get("subject").and_then(|v| v.as_str()))
184            .ok_or_else(|| anyhow::anyhow!("event missing formationId"))?;
185        let id = Uuid::parse_str(id_str)
186            .map_err(|e| anyhow::anyhow!("event formationId not a UUID ({id_str}): {e}"))?;
187
188        let name = data
189            .get("formationName")
190            .and_then(|v| v.as_str())
191            .or_else(|| data.get("name").and_then(|v| v.as_str()))
192            .unwrap_or("")
193            .to_string();
194
195        let status = match phase {
196            "created" => FormationStatus::Pending,
197            "launching" | "running" | "degraded" => FormationStatus::Running,
198            "completed" => FormationStatus::Succeeded,
199            "failed" => FormationStatus::Failed,
200            // `cancelled` has no canonical emitter constructor in
201            // `cellos-core` today (see red-team-findings-A.md); the arm is
202            // retained so a future `cloud_event_v1_formation_cancelled`
203            // lands on a working reader. The legacy prefix can still carry
204            // this phase.
205            "cancelled" => FormationStatus::Cancelled,
206            _ => {
207                debug!(
208                    ce_type,
209                    "apply_event_payload: unknown formation event; ignored"
210                );
211                return Ok(ApplyOutcome::Ignored);
212            }
213        };
214
215        let mut map = self.formations.write().await;
216        let entry = map.entry(id).or_insert_with(|| FormationRecord {
217            id,
218            name: name.clone(),
219            status,
220            document: data.clone(),
221        });
222        if !name.is_empty() {
223            entry.name = name;
224        }
225        entry.status = status;
226        Ok(ApplyOutcome::Applied)
227    }
228}
229
230/// Whether `apply_event_payload` changed the projection. Distinguishing
231/// the two lets the replay path log meaningful "applied vs skipped"
232/// counts and lets tests assert that unknown events are tolerated.
233#[derive(Debug, Clone, Copy, PartialEq, Eq)]
234pub enum ApplyOutcome {
235    /// Event matched a known type and the in-memory projection was
236    /// updated (or created).
237    Applied,
238    /// Event was structurally valid but its type/discriminant is not
239    /// owned by this projection (cell events, future families).
240    Ignored,
241}
242
243/// Lifecycle status of a formation. Mirrors the projector's state machine
244/// (CHATROOM Session 16 §state-machine-table). Only `PENDING` is emitted
245/// directly on POST; all other transitions are driven by CloudEvents
246/// observed on JetStream.
247#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
248#[serde(rename_all = "UPPERCASE")]
249pub enum FormationStatus {
250    Pending,
251    Running,
252    Succeeded,
253    Failed,
254    Cancelled,
255}
256
257/// Projected view of a formation. The full submitted document is retained
258/// so GET /v1/formations/{id} can echo the original spec — clients build
259/// their own state machines from this plus the WebSocket event stream.
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct FormationRecord {
262    pub id: Uuid,
263    pub name: String,
264    pub status: FormationStatus,
265    pub document: serde_json::Value,
266}
267
268/// Projected view of an execution cell. Fields are kept minimal — the
269/// authoritative cell state is in JetStream; this is just a cache for
270/// `GET /v1/cells` list latency.
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct CellRecord {
273    pub id: String,
274    pub formation_id: Option<Uuid>,
275    pub status: String,
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281
282    /// Red-team wave 2 (LOW-W2A-1): `bump_cursor` must remain monotonic
283    /// after the SeqCst→AcqRel downgrade. The CAS-loop structure
284    /// guarantees this property independent of memory-ordering choice;
285    /// this test pins the contract so a future drive-by edit that
286    /// replaces the loop with a naive `store` cannot regress
287    /// silently. We hammer the state from many concurrent tokio tasks
288    /// with interleaved seq values and assert the final cursor equals
289    /// the highest seq ever submitted.
290    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
291    async fn bump_cursor_monotonic_under_concurrent_calls() {
292        let state = AppState::new(None, "test");
293        let workers = 8;
294        let per_worker = 500usize;
295        let mut handles = Vec::with_capacity(workers);
296        for w in 0..workers {
297            let s = state.clone();
298            handles.push(tokio::spawn(async move {
299                // Each worker submits seqs in an interleaved pattern so
300                // out-of-order arrivals are guaranteed across workers.
301                for i in 0..per_worker {
302                    let seq = (i as u64) * (workers as u64) + (w as u64);
303                    s.bump_cursor(seq);
304                }
305            }));
306        }
307        for h in handles {
308            h.await.expect("worker task");
309        }
310        let max_seq = (per_worker as u64 - 1) * (workers as u64) + (workers as u64 - 1);
311        assert_eq!(
312            state.cursor(),
313            max_seq,
314            "cursor must equal max seq across all workers; got {} expected {}",
315            state.cursor(),
316            max_seq,
317        );
318    }
319
320    /// Smaller-but-pointed test: a single stale low seq submitted AFTER
321    /// a higher one must NOT cause regression.
322    #[test]
323    fn bump_cursor_rejects_regression() {
324        // Synchronous variant — runtime is not required since the
325        // method itself is not async.
326        let state = AppState::new(None, "test");
327        state.bump_cursor(100);
328        state.bump_cursor(50); // out-of-order arrival
329        state.bump_cursor(99);
330        assert_eq!(state.cursor(), 100);
331        state.bump_cursor(101);
332        assert_eq!(state.cursor(), 101);
333    }
334}