cellos_server/state.rs
1//! Shared application state for the HTTP control plane.
2//!
3//! `AppState` is `Clone`-able (all inner handles are `Arc`/optional) and is
4//! threaded through axum handlers via `with_state`. The in-memory registry
5//! is a *projection cache* over JetStream — it is intentionally not the
6//! source of truth (CHATROOM.md Session 16). On startup a future version
7//! will replay `cellos.events.>` to rebuild this map; for now it is
8//! populated by writes from POST /v1/formations.
9
10use std::collections::BTreeMap;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13
14use async_nats::jetstream::context::Context as JsContext;
15use async_nats::Client as NatsClient;
16use serde::{Deserialize, Serialize};
17use tokio::sync::RwLock;
18use tracing::{debug, warn};
19use uuid::Uuid;
20
21/// Required bearer token, validated against `Authorization: Bearer <token>`.
22/// When `None` the server refuses to start (see `main.rs`).
23pub type ApiToken = Arc<String>;
24
25#[derive(Clone)]
26pub struct AppState {
27 /// NATS connection used both by the WebSocket bridge and by future
28 /// projection replay. `Option` because tests can drive the router
29 /// without a live broker.
30 pub nats: Option<NatsClient>,
31 /// JetStream context (ADR-0011, ADR-0015). Populated alongside
32 /// `nats` once the server confirms the `CELLOS_EVENTS` stream
33 /// exists. `Option` so router tests without a live broker still
34 /// build the state cleanly; the WS bridge falls through to a
35 /// "broker not configured" close if this is `None`.
36 pub jetstream: Option<JsContext>,
37 /// In-memory projection of formations. UUID-keyed for stable lookup.
38 pub formations: Arc<RwLock<BTreeMap<Uuid, FormationRecord>>>,
39 /// In-memory projection of cells.
40 pub cells: Arc<RwLock<BTreeMap<String, CellRecord>>>,
41 /// Bearer token required on every non-public route.
42 pub api_token: ApiToken,
43 /// Highest JetStream stream-sequence the projection has applied
44 /// (ADR-0015 §D2). The snapshot endpoint reports this as `cursor`
45 /// so clients can open `/ws/events?since=<cursor>` and know they
46 /// will not miss any event after the snapshot.
47 ///
48 /// The WebSocket bridge bumps this monotonically as it forwards
49 /// frames (see `ws.rs`). Until the bridge is migrated to a real
50 /// JetStream consumer, this counter is per-process and resets on
51 /// restart — that is acceptable for the MVP because every browser
52 /// reconnect repeats the snapshot fetch.
53 pub applied_cursor: Arc<AtomicU64>,
54}
55
56impl AppState {
57 /// Constructor used by both `main.rs` and the test harness.
58 pub fn new(nats: Option<NatsClient>, api_token: impl Into<String>) -> Self {
59 Self {
60 nats,
61 jetstream: None,
62 formations: Arc::new(RwLock::new(BTreeMap::new())),
63 cells: Arc::new(RwLock::new(BTreeMap::new())),
64 api_token: Arc::new(api_token.into()),
65 applied_cursor: Arc::new(AtomicU64::new(0)),
66 }
67 }
68
69 /// Attach a JetStream context. Called from `main.rs` after
70 /// `ensure_stream` succeeds. Returns the same `AppState` for
71 /// builder-style chaining in tests.
72 pub fn with_jetstream(mut self, ctx: JsContext) -> Self {
73 self.jetstream = Some(ctx);
74 self
75 }
76
77 /// Current snapshot cursor — the highest seq the server has applied.
78 ///
79 /// Red-team wave 2 (LOW-W2A-1): `Acquire` is sufficient here. We need
80 /// to see writes that happen-before any successful `bump_cursor`
81 /// (i.e. the projection-apply write inside `apply_event_payload`);
82 /// the previous `SeqCst` enforced a global total order with every
83 /// other `SeqCst` operation in the process (counter `fetch_add`s in
84 /// `sni_proxy`, etc.) for no extra correctness gain on this read.
85 pub fn cursor(&self) -> u64 {
86 self.applied_cursor.load(Ordering::Acquire)
87 }
88
89 /// Bump the snapshot cursor to at least `seq`. Monotonic; out-of-order
90 /// values are ignored. Used by the WebSocket bridge as it forwards
91 /// frames.
92 ///
93 /// Red-team wave 2 (LOW-W2A-1): downgraded from `SeqCst` to
94 /// `AcqRel`/`Acquire`. The CAS-loop structure (re-load on conflict,
95 /// strict `seq > current` gate) guarantees monotonicity independent
96 /// of the memory ordering chosen. `AcqRel` on success publishes the
97 /// new cursor to every subsequent `cursor()` reader (snapshot GET
98 /// handlers, primarily); `Acquire` on the reload picks up the
99 /// observed value from the winner of the conflict.
100 pub fn bump_cursor(&self, seq: u64) {
101 // CAS loop keeps the cursor monotonic across concurrent ws workers.
102 let mut current = self.applied_cursor.load(Ordering::Acquire);
103 while seq > current {
104 match self.applied_cursor.compare_exchange(
105 current,
106 seq,
107 Ordering::AcqRel,
108 Ordering::Acquire,
109 ) {
110 Ok(_) => break,
111 Err(observed) => current = observed,
112 }
113 }
114 }
115
116 /// Apply a single CloudEvent payload (raw JSON bytes from
117 /// JetStream) to the in-memory projection.
118 ///
119 /// This is shared between the boot-time replay path
120 /// (`jetstream::replay_projection`) and the live WebSocket bridge
121 /// (`ws.rs`). Centralising the apply logic guarantees that what
122 /// the server reconstructs on startup is exactly what it
123 /// applies live — there is no "replay-only" code path that could
124 /// drift from the live one.
125 ///
126 /// CloudEvent `type` discriminates the transition. Two event families
127 /// mutate state today:
128 ///
129 /// - `formation.v1.*` — server-emitted formation lifecycle, drives the
130 /// `formations` projection (ADR-0010 admission gate output).
131 /// - `cell.lifecycle.v1.*` and `cell.command.v1.completed` —
132 /// supervisor-emitted cell lifecycle (ARCH-001). Without this, the
133 /// server replays cell events from JetStream but never updates the
134 /// `cells` map and `GET /v1/cells` returns `[]`. The cell projector
135 /// landed alongside the formation projector so `cellctl get cells`
136 /// reflects what the supervisor actually ran.
137 ///
138 /// Unknown event types still advance the JetStream cursor at the
139 /// caller (the apply contract is independent of whether the
140 /// projection mutated), so audit gaps surface as `ApplyOutcome::Ignored`
141 /// in the replay log rather than silent drops.
142 pub async fn apply_event_payload(&self, payload: &[u8]) -> anyhow::Result<ApplyOutcome> {
143 let event: serde_json::Value = serde_json::from_slice(payload)
144 .map_err(|e| anyhow::anyhow!("event payload not JSON: {e}"))?;
145
146 let ce_type = event
147 .get("type")
148 .and_then(|v| v.as_str())
149 .unwrap_or_default()
150 .to_string();
151
152 // ── Cell projector (ARCH-001) ────────────────────────────────────
153 // The supervisor emits three cell-shaped event types we project on.
154 // Match these BEFORE the formation prefix gate; otherwise an
155 // unrelated change to the formation gate (e.g. a future
156 // canonical-prefix tweak) could swallow cell events.
157 if let Some(outcome) = self.apply_cell_event(&ce_type, &event).await? {
158 return Ok(outcome);
159 }
160
161 // Wave 2 red-team CRITICAL fix (CRIT-W2D-1): the canonical event-type
162 // URN family emitted by `cellos-core::events::cloud_event_v1_formation_*`
163 // is `dev.cellos.events.cell.formation.v1.*`. The legacy
164 // `io.cellos.formation.v1.*` prefix this reader used to require is
165 // not emitted by any production code path today — replayed formations
166 // would bail at this gate and the in-memory `status` projection would
167 // freeze at PENDING forever. We accept BOTH so older event archives
168 // still replay cleanly; new emitters MUST use the canonical family
169 // enumerated in `cellos-core/src/events.rs` and audited by
170 // `docs/audit-log-retention.md` (FC-74).
171 const CANONICAL_FORMATION_PREFIX: &str = "dev.cellos.events.cell.formation.v1.";
172 const LEGACY_FORMATION_PREFIX: &str = "io.cellos.formation.v1.";
173 let phase = if let Some(p) = ce_type.strip_prefix(CANONICAL_FORMATION_PREFIX) {
174 p
175 } else if let Some(p) = ce_type.strip_prefix(LEGACY_FORMATION_PREFIX) {
176 p
177 } else {
178 // Unknown event family currently no-ops the projection. The
179 // cursor still advances at the caller because the apply
180 // contract is independent of whether any state changed.
181 debug!(
182 ce_type,
183 "apply_event_payload: not a formation or cell event; ignored"
184 );
185 return Ok(ApplyOutcome::Ignored);
186 };
187
188 // The supervisor packs the formation id into the CloudEvent's
189 // `data` payload. Wave-2 fix (CRIT-W2D-1): the canonical emitter in
190 // `cellos-core::events::formation_data_v1` uses camelCase keys
191 // (`formationId`, `formationName`); the legacy snake_case
192 // (`formation_id`, `name`) shape is accepted for archive replay
193 // only. `subject` is the last-resort fallback when neither field
194 // is populated.
195 let data = event
196 .get("data")
197 .cloned()
198 .unwrap_or(serde_json::Value::Null);
199 let id_str = data
200 .get("formationId")
201 .and_then(|v| v.as_str())
202 .or_else(|| data.get("formation_id").and_then(|v| v.as_str()))
203 .or_else(|| event.get("subject").and_then(|v| v.as_str()))
204 .ok_or_else(|| anyhow::anyhow!("event missing formationId"))?;
205 let id = Uuid::parse_str(id_str)
206 .map_err(|e| anyhow::anyhow!("event formationId not a UUID ({id_str}): {e}"))?;
207
208 let name = data
209 .get("formationName")
210 .and_then(|v| v.as_str())
211 .or_else(|| data.get("name").and_then(|v| v.as_str()))
212 .unwrap_or("")
213 .to_string();
214
215 let status = match phase {
216 "created" => FormationStatus::Pending,
217 "launching" | "running" | "degraded" => FormationStatus::Running,
218 "completed" => FormationStatus::Succeeded,
219 "failed" => FormationStatus::Failed,
220 // `cancelled` has no canonical emitter constructor in
221 // `cellos-core` today (see red-team-findings-A.md); the arm is
222 // retained so a future `cloud_event_v1_formation_cancelled`
223 // lands on a working reader. The legacy prefix can still carry
224 // this phase.
225 "cancelled" => FormationStatus::Cancelled,
226 _ => {
227 debug!(
228 ce_type,
229 "apply_event_payload: unknown formation event; ignored"
230 );
231 return Ok(ApplyOutcome::Ignored);
232 }
233 };
234
235 let mut map = self.formations.write().await;
236 let entry = map.entry(id).or_insert_with(|| FormationRecord {
237 id,
238 name: name.clone(),
239 status,
240 document: data.clone(),
241 });
242 if !name.is_empty() {
243 entry.name = name;
244 }
245 entry.status = status;
246 Ok(ApplyOutcome::Applied)
247 }
248
249 /// Apply a single supervisor-emitted cell event to the `cells`
250 /// projection (ARCH-001).
251 ///
252 /// Returns:
253 /// - `Ok(Some(ApplyOutcome::Applied))` if the event matched one of the
254 /// three supported cell types and the projection mutated.
255 /// - `Ok(Some(ApplyOutcome::Ignored))` if the event matched a cell type
256 /// but lacked the data we need (e.g. missing `cellId`); the caller
257 /// still advances the cursor.
258 /// - `Ok(None)` if the event is NOT a cell event — caller falls through
259 /// to the formation projector.
260 /// - `Err(_)` for payloads that cannot be parsed at all.
261 ///
262 /// Three event types are consumed (see `cellos-core::events` and the
263 /// supervisor's emit sites in `crates/cellos-supervisor/src/supervisor.rs`):
264 /// - `dev.cellos.events.cell.lifecycle.v1.started`
265 /// - `dev.cellos.events.cell.command.v1.completed`
266 /// - `dev.cellos.events.cell.lifecycle.v1.destroyed`
267 ///
268 /// Identity / network / observability / policy cell events are not
269 /// projected here — they're audit-trail events the supervisor emits,
270 /// not cell-state transitions cellctl users care about for `get cells`.
271 async fn apply_cell_event(
272 &self,
273 ce_type: &str,
274 event: &serde_json::Value,
275 ) -> anyhow::Result<Option<ApplyOutcome>> {
276 const STARTED: &str = "dev.cellos.events.cell.lifecycle.v1.started";
277 const COMPLETED: &str = "dev.cellos.events.cell.command.v1.completed";
278 const DESTROYED: &str = "dev.cellos.events.cell.lifecycle.v1.destroyed";
279
280 let phase = match ce_type {
281 STARTED => CellPhase::Started,
282 COMPLETED => CellPhase::CommandCompleted,
283 DESTROYED => CellPhase::Destroyed,
284 _ => return Ok(None),
285 };
286
287 let data = event
288 .get("data")
289 .cloned()
290 .unwrap_or(serde_json::Value::Null);
291 let Some(cell_id) = data.get("cellId").and_then(|v| v.as_str()) else {
292 debug!(
293 ce_type,
294 "apply_cell_event: missing data.cellId; cell event ignored"
295 );
296 return Ok(Some(ApplyOutcome::Ignored));
297 };
298 let cell_id = cell_id.to_string();
299 // `time` lives on the CloudEvent envelope (RFC3339); the supervisor's
300 // `cloud_event()` helper populates it on every emit.
301 let event_time = event
302 .get("time")
303 .and_then(|v| v.as_str())
304 .map(str::to_string);
305 // RT3-MED ARCH-001-B: strict admission on `specId`. The canonical
306 // supervisor emits a non-empty `data.specId` on every cell event
307 // (`cellos-core::events::cell_*`). A cell event with the field
308 // missing or non-string is malformed; projecting it would either
309 // poison the entry's `spec_id` with `""` on first-event-wins or
310 // leave a phantom cell in the projection. Refuse to project it
311 // and surface the gap as a WARN so the audit trail keeps a
312 // record of the skipped event. Note: a cell event whose
313 // `specId` is structurally present and non-empty is accepted
314 // normally; the back-fill below still tolerates events that
315 // happen to omit fields populated on a prior event.
316 let Some(spec_id) = data.get("specId").and_then(|v| v.as_str()) else {
317 warn!(
318 ce_type,
319 cell_id = %cell_id,
320 "apply_cell_event: missing data.specId; cell event skipped (strict admission)"
321 );
322 return Ok(Some(ApplyOutcome::Ignored));
323 };
324 let spec_id = spec_id.to_string();
325 let run_id = data
326 .get("runId")
327 .and_then(|v| v.as_str())
328 .map(str::to_string);
329
330 let mut map = self.cells.write().await;
331 let entry = map.entry(cell_id.clone()).or_insert_with(|| CellRecord {
332 id: cell_id.clone(),
333 spec_id: spec_id.clone(),
334 run_id: run_id.clone(),
335 state: CellState::Pending,
336 status: String::new(),
337 formation_id: None,
338 started_at: None,
339 destroyed_at: None,
340 outcome: None,
341 exit_code: None,
342 });
343
344 // RT3-HIGH-3 (ARCH-003): once a cell is `Destroyed`, no later
345 // event may mutate its state, exit_code, destroyed_at, or
346 // outcome. JetStream replay + the live subscription can
347 // out-of-order a `started` after a `destroyed` (CHATROOM
348 // Session 17 §ordering), and without this guard the projection
349 // regressed Destroyed → Running. Terminal means terminal: log
350 // a WARN so the unexpected delivery surfaces in ops, then drop
351 // the event for projection purposes.
352 if matches!(entry.state, CellState::Destroyed) {
353 warn!(
354 ce_type = match phase {
355 CellPhase::Started => "dev.cellos.events.cell.lifecycle.v1.started",
356 CellPhase::CommandCompleted =>
357 "dev.cellos.events.cell.command.v1.completed",
358 CellPhase::Destroyed => "dev.cellos.events.cell.lifecycle.v1.destroyed",
359 },
360 cell_id = %cell_id,
361 event_time = event_time.as_deref().unwrap_or(""),
362 "apply_cell_event: post-terminal event ignored; cell already Destroyed \
363 (likely out-of-order delivery or replay reordering)"
364 );
365 // Skip projection mutation but still tell the caller we
366 // recognised the event family, so cursor advance stays in
367 // lockstep with the on-wire seq.
368 return Ok(Some(ApplyOutcome::Applied));
369 }
370
371 // Fill in fields that arrive on later events but weren't known on
372 // the first event we saw for this cell.
373 if entry.spec_id.is_empty() && !spec_id.is_empty() {
374 entry.spec_id = spec_id;
375 }
376 if entry.run_id.is_none() {
377 entry.run_id = run_id;
378 }
379
380 match phase {
381 CellPhase::Started => {
382 entry.state = CellState::Running;
383 entry.started_at = event_time;
384 }
385 CellPhase::CommandCompleted => {
386 // RT3-MED ARCH-001-A is handled by the generic terminal-state
387 // guard above (post-Destroyed events return Applied without
388 // mutating the projection). Reaching this arm means the cell
389 // is NOT yet Destroyed, so the exit_code assignment is safe.
390 if let Some(code) = data.get("exitCode").and_then(|v| v.as_i64()) {
391 entry.exit_code = Some(code as i32);
392 }
393 }
394 CellPhase::Destroyed => {
395 entry.state = CellState::Destroyed;
396 entry.destroyed_at = event_time;
397 if let Some(o) = data.get("outcome").and_then(|v| v.as_str()) {
398 entry.outcome = Some(o.to_string());
399 }
400 }
401 }
402
403 // Mirror the typed state into the legacy `status: String` field so
404 // pre-ARCH-001 consumers of `CellRecord` (and the JSON response
405 // for `GET /v1/cells`) keep a single human-readable label.
406 entry.status = entry.state.as_str().to_string();
407
408 Ok(Some(ApplyOutcome::Applied))
409 }
410}
411
412/// Internal discriminator for the three cell event types the server
413/// projects on. Pulled out so the match in `apply_cell_event` is
414/// exhaustive over a closed set, not stringly-typed.
415#[derive(Debug, Clone, Copy)]
416enum CellPhase {
417 Started,
418 CommandCompleted,
419 Destroyed,
420}
421
422/// Whether `apply_event_payload` changed the projection. Distinguishing
423/// the two lets the replay path log meaningful "applied vs skipped"
424/// counts and lets tests assert that unknown events are tolerated.
425#[derive(Debug, Clone, Copy, PartialEq, Eq)]
426pub enum ApplyOutcome {
427 /// Event matched a known type and the in-memory projection was
428 /// updated (or created).
429 Applied,
430 /// Event was structurally valid but its type/discriminant is not
431 /// owned by this projection (cell events, future families).
432 Ignored,
433}
434
435/// Lifecycle status of a formation. Mirrors the projector's state machine
436/// (CHATROOM Session 16 §state-machine-table). Only `PENDING` is emitted
437/// directly on POST; all other transitions are driven by CloudEvents
438/// observed on JetStream.
439#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
440#[serde(rename_all = "UPPERCASE")]
441pub enum FormationStatus {
442 Pending,
443 Running,
444 Succeeded,
445 Failed,
446 Cancelled,
447}
448
449/// Projected view of a formation. The full submitted document is retained
450/// so GET /v1/formations/{id} can echo the original spec — clients build
451/// their own state machines from this plus the WebSocket event stream.
452#[derive(Debug, Clone, Serialize, Deserialize)]
453pub struct FormationRecord {
454 pub id: Uuid,
455 pub name: String,
456 pub status: FormationStatus,
457 pub document: serde_json::Value,
458}
459
460/// Lifecycle state of an execution cell, derived from the supervisor's
461/// `cell.lifecycle.v1.*` event stream.
462///
463/// State transitions:
464/// - `Pending` (initial; never emitted on the wire — assigned when we
465/// first observe an event for a cell we haven't seen before, in case
466/// we eventually project a pre-spawn event family)
467/// - `Running` ← `cell.lifecycle.v1.started`
468/// - `Destroyed` ← `cell.lifecycle.v1.destroyed`
469///
470/// `cell.command.v1.completed` does NOT itself transition the state — the
471/// run is still alive until `destroyed` arrives. It only updates
472/// `exit_code` on the projected record.
473#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
474#[serde(rename_all = "lowercase")]
475pub enum CellState {
476 Pending,
477 Running,
478 Destroyed,
479}
480
481impl CellState {
482 /// Lowercase wire string, matching the serde `rename_all = "lowercase"`
483 /// representation. Used to populate `CellRecord::status` for the
484 /// human-readable label clients hit via `GET /v1/cells`.
485 pub fn as_str(self) -> &'static str {
486 match self {
487 CellState::Pending => "pending",
488 CellState::Running => "running",
489 CellState::Destroyed => "destroyed",
490 }
491 }
492}
493
494/// Projected view of an execution cell.
495///
496/// Fields are populated as cell events arrive from JetStream
497/// (`cell.lifecycle.v1.started`, `cell.command.v1.completed`,
498/// `cell.lifecycle.v1.destroyed`). The authoritative cell state lives
499/// in the event log; this struct is a query-latency cache rebuilt on
500/// every server restart by `jetstream::replay_projection`.
501///
502/// `status` is kept as a duplicated lowercase view of `state` so pre-
503/// ARCH-001 clients (and the JSON response shape) continue to see a
504/// single human-readable label without a breaking API change.
505///
506/// `formation_id` is reserved for future correlation: today the
507/// supervisor's CloudEvents do not carry a `formationId` field on the
508/// `Correlation` block (see `cellos-core::Correlation`), so this stays
509/// `None` for supervisor-emitted cells until that gap is closed
510/// upstream. The field is preserved so the wire shape doesn't churn
511/// when correlation lands.
512#[derive(Debug, Clone, Serialize, Deserialize)]
513pub struct CellRecord {
514 /// `data.cellId` from the supervisor's cell events (the projection key).
515 pub id: String,
516 /// `data.specId` from the lifecycle events — the execution-cell-spec id
517 /// the operator declared (e.g. `"e2e-stub-echo"`).
518 pub spec_id: String,
519 /// Optional supervisor run id (`data.runId`); not always populated on
520 /// the started event so we accept `None`.
521 pub run_id: Option<String>,
522 /// Typed lifecycle state.
523 pub state: CellState,
524 /// Lowercase mirror of `state` for legacy/string consumers.
525 pub status: String,
526 /// Reserved for formation correlation once the supervisor emits a
527 /// `formationId` on `Correlation`. Today the field stays `None` for
528 /// supervisor-emitted cells.
529 pub formation_id: Option<Uuid>,
530 /// CloudEvent `time` of the `cell.lifecycle.v1.started` event
531 /// (RFC3339). `None` until the started event has arrived.
532 pub started_at: Option<String>,
533 /// CloudEvent `time` of the `cell.lifecycle.v1.destroyed` event
534 /// (RFC3339). `None` until the destroyed event has arrived.
535 pub destroyed_at: Option<String>,
536 /// `data.outcome` from the destroyed event (e.g. `"succeeded"`,
537 /// `"failed"`).
538 pub outcome: Option<String>,
539 /// `data.exitCode` from the `cell.command.v1.completed` event when
540 /// the supervisor read an authenticated exit code. Omitted on forced
541 /// terminations (see `cellos-core::LifecycleTerminalState`).
542 pub exit_code: Option<i32>,
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548
549 /// Red-team wave 2 (LOW-W2A-1): `bump_cursor` must remain monotonic
550 /// after the SeqCst→AcqRel downgrade. The CAS-loop structure
551 /// guarantees this property independent of memory-ordering choice;
552 /// this test pins the contract so a future drive-by edit that
553 /// replaces the loop with a naive `store` cannot regress
554 /// silently. We hammer the state from many concurrent tokio tasks
555 /// with interleaved seq values and assert the final cursor equals
556 /// the highest seq ever submitted.
557 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
558 async fn bump_cursor_monotonic_under_concurrent_calls() {
559 let state = AppState::new(None, "test");
560 let workers = 8;
561 let per_worker = 500usize;
562 let mut handles = Vec::with_capacity(workers);
563 for w in 0..workers {
564 let s = state.clone();
565 handles.push(tokio::spawn(async move {
566 // Each worker submits seqs in an interleaved pattern so
567 // out-of-order arrivals are guaranteed across workers.
568 for i in 0..per_worker {
569 let seq = (i as u64) * (workers as u64) + (w as u64);
570 s.bump_cursor(seq);
571 }
572 }));
573 }
574 for h in handles {
575 h.await.expect("worker task");
576 }
577 let max_seq = (per_worker as u64 - 1) * (workers as u64) + (workers as u64 - 1);
578 assert_eq!(
579 state.cursor(),
580 max_seq,
581 "cursor must equal max seq across all workers; got {} expected {}",
582 state.cursor(),
583 max_seq,
584 );
585 }
586
587 /// Smaller-but-pointed test: a single stale low seq submitted AFTER
588 /// a higher one must NOT cause regression.
589 #[test]
590 fn bump_cursor_rejects_regression() {
591 // Synchronous variant — runtime is not required since the
592 // method itself is not async.
593 let state = AppState::new(None, "test");
594 state.bump_cursor(100);
595 state.bump_cursor(50); // out-of-order arrival
596 state.bump_cursor(99);
597 assert_eq!(state.cursor(), 100);
598 state.bump_cursor(101);
599 assert_eq!(state.cursor(), 101);
600 }
601
602 /// Build a minimal CloudEvent payload for a cell event with the
603 /// supervisor's canonical `data` shape. Helper for the cell-projector
604 /// guard tests below.
605 fn cell_event(ce_type: &str, cell_id: &str, spec_id: Option<&str>) -> serde_json::Value {
606 let mut data = serde_json::Map::new();
607 data.insert("cellId".to_string(), serde_json::json!(cell_id));
608 if let Some(s) = spec_id {
609 data.insert("specId".to_string(), serde_json::json!(s));
610 }
611 serde_json::json!({
612 "type": ce_type,
613 "time": "2026-05-17T12:00:00Z",
614 "data": serde_json::Value::Object(data),
615 })
616 }
617
618 /// RT3-MED ARCH-001-A: a `cell.command.v1.completed` event arriving
619 /// AFTER `cell.lifecycle.v1.destroyed` for the same cell (replay,
620 /// out-of-order JetStream delivery, late supervisor flush) must NOT
621 /// silently overwrite the projection's exit_code. The destroyed
622 /// event reported a final state — the projection must not contradict
623 /// that report. We assert the post-destruction exit_code stays
624 /// `None` (since destroyed arrived before completed in this
625 /// replay order) and the apply outcome is `Ignored` so the audit
626 /// trail surfaces the refused mutation rather than swallowing it.
627 #[tokio::test]
628 async fn cell_destroyed_then_command_completed_does_not_mutate() {
629 let state = AppState::new(None, "test");
630
631 // 1. Apply started → cell exists in Running state.
632 let started = cell_event(
633 "dev.cellos.events.cell.lifecycle.v1.started",
634 "cell-arch001a",
635 Some("e2e-stub-echo"),
636 );
637 let out = state
638 .apply_event_payload(&serde_json::to_vec(&started).unwrap())
639 .await
640 .expect("started apply");
641 assert_eq!(out, ApplyOutcome::Applied);
642
643 // 2. Apply destroyed → cell terminal.
644 let mut destroyed = cell_event(
645 "dev.cellos.events.cell.lifecycle.v1.destroyed",
646 "cell-arch001a",
647 Some("e2e-stub-echo"),
648 );
649 destroyed["data"]["outcome"] = serde_json::json!("succeeded");
650 let out = state
651 .apply_event_payload(&serde_json::to_vec(&destroyed).unwrap())
652 .await
653 .expect("destroyed apply");
654 assert_eq!(out, ApplyOutcome::Applied);
655
656 // Sanity: cell is now Destroyed, exit_code still None (no
657 // completed event has been applied yet).
658 {
659 let cells = state.cells.read().await;
660 let entry = cells.get("cell-arch001a").expect("cell present");
661 assert_eq!(entry.state, CellState::Destroyed);
662 assert_eq!(entry.exit_code, None);
663 }
664
665 // 3. Apply a LATE completed event with exitCode=42. The
666 // terminal-state guard MUST refuse to mutate exit_code.
667 let mut completed = cell_event(
668 "dev.cellos.events.cell.command.v1.completed",
669 "cell-arch001a",
670 Some("e2e-stub-echo"),
671 );
672 completed["data"]["exitCode"] = serde_json::json!(42);
673 let out = state
674 .apply_event_payload(&serde_json::to_vec(&completed).unwrap())
675 .await
676 .expect("late completed apply");
677 assert_eq!(
678 out,
679 ApplyOutcome::Applied,
680 "late completed-after-destroyed: generic terminal-state guard (RT3-HIGH-3) recognizes the event family and advances cursor, but refuses mutation"
681 );
682
683 // Final assertion: exit_code unchanged (still None), state
684 // still Destroyed.
685 let cells = state.cells.read().await;
686 let entry = cells.get("cell-arch001a").expect("cell present");
687 assert_eq!(
688 entry.exit_code, None,
689 "exit_code must NOT be mutated after destroyed (terminal-state guard)"
690 );
691 assert_eq!(entry.state, CellState::Destroyed);
692 }
693
694 /// RT3-MED ARCH-001-B: a cell event with `specId` missing from
695 /// `data` is malformed (the supervisor's canonical emitter always
696 /// populates the field). Strict admission: refuse to project the
697 /// event so we never create a phantom cell with `spec_id: ""`,
698 /// and surface the gap as `ApplyOutcome::Ignored` (caller still
699 /// advances the cursor, audit trail records the skip).
700 #[tokio::test]
701 async fn cell_event_missing_spec_id_is_skipped() {
702 let state = AppState::new(None, "test");
703
704 // Started event with NO specId field at all.
705 let started_no_spec = cell_event(
706 "dev.cellos.events.cell.lifecycle.v1.started",
707 "cell-arch001b",
708 None,
709 );
710 let out = state
711 .apply_event_payload(&serde_json::to_vec(&started_no_spec).unwrap())
712 .await
713 .expect("apply must not error on malformed event");
714 assert_eq!(
715 out,
716 ApplyOutcome::Ignored,
717 "missing specId must result in Ignored (strict admission)"
718 );
719
720 // The projection must NOT contain a phantom cell for this id.
721 let cells = state.cells.read().await;
722 assert!(
723 !cells.contains_key("cell-arch001b"),
724 "projection must not gain a phantom cell from an event missing specId"
725 );
726 assert!(
727 cells.is_empty(),
728 "projection must remain empty after a single malformed event"
729 );
730 }
731}