Skip to main content

cellos_server/
state.rs

1//! Shared application state for the HTTP control plane.
2//!
3//! `AppState` is `Clone`-able (all inner handles are `Arc`/optional) and is
4//! threaded through axum handlers via `with_state`. The in-memory registry
5//! is a *projection cache* over JetStream — it is intentionally not the
6//! source of truth (CHATROOM.md Session 16). On startup a future version
7//! will replay `cellos.events.>` to rebuild this map; for now it is
8//! populated by writes from POST /v1/formations.
9
10use std::collections::BTreeMap;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13
14use async_nats::jetstream::context::Context as JsContext;
15use async_nats::Client as NatsClient;
16use serde::{Deserialize, Serialize};
17use tokio::sync::RwLock;
18use tracing::{debug, warn};
19use uuid::Uuid;
20
21/// Required bearer token, validated against `Authorization: Bearer <token>`.
22/// When `None` the server refuses to start (see `main.rs`).
23pub type ApiToken = Arc<String>;
24
25#[derive(Clone)]
26pub struct AppState {
27    /// NATS connection used both by the WebSocket bridge and by future
28    /// projection replay. `Option` because tests can drive the router
29    /// without a live broker.
30    pub nats: Option<NatsClient>,
31    /// JetStream context (ADR-0011, ADR-0015). Populated alongside
32    /// `nats` once the server confirms the `CELLOS_EVENTS` stream
33    /// exists. `Option` so router tests without a live broker still
34    /// build the state cleanly; the WS bridge falls through to a
35    /// "broker not configured" close if this is `None`.
36    pub jetstream: Option<JsContext>,
37    /// In-memory projection of formations. UUID-keyed for stable lookup.
38    pub formations: Arc<RwLock<BTreeMap<Uuid, FormationRecord>>>,
39    /// In-memory projection of cells.
40    pub cells: Arc<RwLock<BTreeMap<String, CellRecord>>>,
41    /// Bearer token required on every non-public route.
42    pub api_token: ApiToken,
43    /// Highest JetStream stream-sequence the projection has applied
44    /// (ADR-0015 §D2). The snapshot endpoint reports this as `cursor`
45    /// so clients can open `/ws/events?since=<cursor>` and know they
46    /// will not miss any event after the snapshot.
47    ///
48    /// The WebSocket bridge bumps this monotonically as it forwards
49    /// frames (see `ws.rs`). Until the bridge is migrated to a real
50    /// JetStream consumer, this counter is per-process and resets on
51    /// restart — that is acceptable for the MVP because every browser
52    /// reconnect repeats the snapshot fetch.
53    pub applied_cursor: Arc<AtomicU64>,
54}
55
56impl AppState {
57    /// Constructor used by both `main.rs` and the test harness.
58    pub fn new(nats: Option<NatsClient>, api_token: impl Into<String>) -> Self {
59        Self {
60            nats,
61            jetstream: None,
62            formations: Arc::new(RwLock::new(BTreeMap::new())),
63            cells: Arc::new(RwLock::new(BTreeMap::new())),
64            api_token: Arc::new(api_token.into()),
65            applied_cursor: Arc::new(AtomicU64::new(0)),
66        }
67    }
68
69    /// Attach a JetStream context. Called from `main.rs` after
70    /// `ensure_stream` succeeds. Returns the same `AppState` for
71    /// builder-style chaining in tests.
72    pub fn with_jetstream(mut self, ctx: JsContext) -> Self {
73        self.jetstream = Some(ctx);
74        self
75    }
76
77    /// Current snapshot cursor — the highest seq the server has applied.
78    ///
79    /// Red-team wave 2 (LOW-W2A-1): `Acquire` is sufficient here. We need
80    /// to see writes that happen-before any successful `bump_cursor`
81    /// (i.e. the projection-apply write inside `apply_event_payload`);
82    /// the previous `SeqCst` enforced a global total order with every
83    /// other `SeqCst` operation in the process (counter `fetch_add`s in
84    /// `sni_proxy`, etc.) for no extra correctness gain on this read.
85    pub fn cursor(&self) -> u64 {
86        self.applied_cursor.load(Ordering::Acquire)
87    }
88
89    /// Bump the snapshot cursor to at least `seq`. Monotonic; out-of-order
90    /// values are ignored. Used by the WebSocket bridge as it forwards
91    /// frames.
92    ///
93    /// Red-team wave 2 (LOW-W2A-1): downgraded from `SeqCst` to
94    /// `AcqRel`/`Acquire`. The CAS-loop structure (re-load on conflict,
95    /// strict `seq > current` gate) guarantees monotonicity independent
96    /// of the memory ordering chosen. `AcqRel` on success publishes the
97    /// new cursor to every subsequent `cursor()` reader (snapshot GET
98    /// handlers, primarily); `Acquire` on the reload picks up the
99    /// observed value from the winner of the conflict.
100    pub fn bump_cursor(&self, seq: u64) {
101        // CAS loop keeps the cursor monotonic across concurrent ws workers.
102        let mut current = self.applied_cursor.load(Ordering::Acquire);
103        while seq > current {
104            match self.applied_cursor.compare_exchange(
105                current,
106                seq,
107                Ordering::AcqRel,
108                Ordering::Acquire,
109            ) {
110                Ok(_) => break,
111                Err(observed) => current = observed,
112            }
113        }
114    }
115
116    /// Apply a single CloudEvent payload (raw JSON bytes from
117    /// JetStream) to the in-memory projection.
118    ///
119    /// This is shared between the boot-time replay path
120    /// (`jetstream::replay_projection`) and the live WebSocket bridge
121    /// (`ws.rs`). Centralising the apply logic guarantees that what
122    /// the server reconstructs on startup is exactly what it
123    /// applies live — there is no "replay-only" code path that could
124    /// drift from the live one.
125    ///
126    /// CloudEvent `type` discriminates the transition. Two event families
127    /// mutate state today:
128    ///
129    /// - `formation.v1.*` — server-emitted formation lifecycle, drives the
130    ///   `formations` projection (ADR-0010 admission gate output).
131    /// - `cell.lifecycle.v1.*` and `cell.command.v1.completed` —
132    ///   supervisor-emitted cell lifecycle (ARCH-001). Without this, the
133    ///   server replays cell events from JetStream but never updates the
134    ///   `cells` map and `GET /v1/cells` returns `[]`. The cell projector
135    ///   landed alongside the formation projector so `cellctl get cells`
136    ///   reflects what the supervisor actually ran.
137    ///
138    /// Unknown event types still advance the JetStream cursor at the
139    /// caller (the apply contract is independent of whether the
140    /// projection mutated), so audit gaps surface as `ApplyOutcome::Ignored`
141    /// in the replay log rather than silent drops.
142    pub async fn apply_event_payload(&self, payload: &[u8]) -> anyhow::Result<ApplyOutcome> {
143        let event: serde_json::Value = serde_json::from_slice(payload)
144            .map_err(|e| anyhow::anyhow!("event payload not JSON: {e}"))?;
145
146        let ce_type = event
147            .get("type")
148            .and_then(|v| v.as_str())
149            .unwrap_or_default()
150            .to_string();
151
152        // ── Cell projector (ARCH-001) ────────────────────────────────────
153        // The supervisor emits three cell-shaped event types we project on.
154        // Match these BEFORE the formation prefix gate; otherwise an
155        // unrelated change to the formation gate (e.g. a future
156        // canonical-prefix tweak) could swallow cell events.
157        if let Some(outcome) = self.apply_cell_event(&ce_type, &event).await? {
158            return Ok(outcome);
159        }
160
161        // Wave 2 red-team CRITICAL fix (CRIT-W2D-1): the canonical event-type
162        // URN family emitted by `cellos-core::events::cloud_event_v1_formation_*`
163        // is `dev.cellos.events.cell.formation.v1.*`. The legacy
164        // `io.cellos.formation.v1.*` prefix this reader used to require is
165        // not emitted by any production code path today — replayed formations
166        // would bail at this gate and the in-memory `status` projection would
167        // freeze at PENDING forever. We accept BOTH so older event archives
168        // still replay cleanly; new emitters MUST use the canonical family
169        // enumerated in `cellos-core/src/events.rs` and audited by
170        // `docs/audit-log-retention.md` (FC-74).
171        const CANONICAL_FORMATION_PREFIX: &str = "dev.cellos.events.cell.formation.v1.";
172        const LEGACY_FORMATION_PREFIX: &str = "io.cellos.formation.v1.";
173        let phase = if let Some(p) = ce_type.strip_prefix(CANONICAL_FORMATION_PREFIX) {
174            p
175        } else if let Some(p) = ce_type.strip_prefix(LEGACY_FORMATION_PREFIX) {
176            p
177        } else {
178            // Unknown event family currently no-ops the projection. The
179            // cursor still advances at the caller because the apply
180            // contract is independent of whether any state changed.
181            debug!(
182                ce_type,
183                "apply_event_payload: not a formation or cell event; ignored"
184            );
185            return Ok(ApplyOutcome::Ignored);
186        };
187
188        // The supervisor packs the formation id into the CloudEvent's
189        // `data` payload. Wave-2 fix (CRIT-W2D-1): the canonical emitter in
190        // `cellos-core::events::formation_data_v1` uses camelCase keys
191        // (`formationId`, `formationName`); the legacy snake_case
192        // (`formation_id`, `name`) shape is accepted for archive replay
193        // only. `subject` is the last-resort fallback when neither field
194        // is populated.
195        let data = event
196            .get("data")
197            .cloned()
198            .unwrap_or(serde_json::Value::Null);
199        let id_str = data
200            .get("formationId")
201            .and_then(|v| v.as_str())
202            .or_else(|| data.get("formation_id").and_then(|v| v.as_str()))
203            .or_else(|| event.get("subject").and_then(|v| v.as_str()))
204            .ok_or_else(|| anyhow::anyhow!("event missing formationId"))?;
205        let id = Uuid::parse_str(id_str)
206            .map_err(|e| anyhow::anyhow!("event formationId not a UUID ({id_str}): {e}"))?;
207
208        let name = data
209            .get("formationName")
210            .and_then(|v| v.as_str())
211            .or_else(|| data.get("name").and_then(|v| v.as_str()))
212            .unwrap_or("")
213            .to_string();
214
215        let status = match phase {
216            "created" => FormationStatus::Pending,
217            "launching" | "running" | "degraded" => FormationStatus::Running,
218            "completed" => FormationStatus::Succeeded,
219            "failed" => FormationStatus::Failed,
220            // `cancelled` has no canonical emitter constructor in
221            // `cellos-core` today (see red-team-findings-A.md); the arm is
222            // retained so a future `cloud_event_v1_formation_cancelled`
223            // lands on a working reader. The legacy prefix can still carry
224            // this phase.
225            "cancelled" => FormationStatus::Cancelled,
226            _ => {
227                debug!(
228                    ce_type,
229                    "apply_event_payload: unknown formation event; ignored"
230                );
231                return Ok(ApplyOutcome::Ignored);
232            }
233        };
234
235        let mut map = self.formations.write().await;
236        let entry = map.entry(id).or_insert_with(|| FormationRecord {
237            id,
238            name: name.clone(),
239            status,
240            document: data.clone(),
241        });
242        if !name.is_empty() {
243            entry.name = name;
244        }
245        entry.status = status;
246        Ok(ApplyOutcome::Applied)
247    }
248
249    /// Apply a single supervisor-emitted cell event to the `cells`
250    /// projection (ARCH-001).
251    ///
252    /// Returns:
253    /// - `Ok(Some(ApplyOutcome::Applied))` if the event matched one of the
254    ///   three supported cell types and the projection mutated.
255    /// - `Ok(Some(ApplyOutcome::Ignored))` if the event matched a cell type
256    ///   but lacked the data we need (e.g. missing `cellId`); the caller
257    ///   still advances the cursor.
258    /// - `Ok(None)` if the event is NOT a cell event — caller falls through
259    ///   to the formation projector.
260    /// - `Err(_)` for payloads that cannot be parsed at all.
261    ///
262    /// Three event types are consumed (see `cellos-core::events` and the
263    /// supervisor's emit sites in `crates/cellos-supervisor/src/supervisor.rs`):
264    /// - `dev.cellos.events.cell.lifecycle.v1.started`
265    /// - `dev.cellos.events.cell.command.v1.completed`
266    /// - `dev.cellos.events.cell.lifecycle.v1.destroyed`
267    ///
268    /// Identity / network / observability / policy cell events are not
269    /// projected here — they're audit-trail events the supervisor emits,
270    /// not cell-state transitions cellctl users care about for `get cells`.
271    async fn apply_cell_event(
272        &self,
273        ce_type: &str,
274        event: &serde_json::Value,
275    ) -> anyhow::Result<Option<ApplyOutcome>> {
276        const STARTED: &str = "dev.cellos.events.cell.lifecycle.v1.started";
277        const COMPLETED: &str = "dev.cellos.events.cell.command.v1.completed";
278        const DESTROYED: &str = "dev.cellos.events.cell.lifecycle.v1.destroyed";
279
280        let phase = match ce_type {
281            STARTED => CellPhase::Started,
282            COMPLETED => CellPhase::CommandCompleted,
283            DESTROYED => CellPhase::Destroyed,
284            _ => return Ok(None),
285        };
286
287        let data = event
288            .get("data")
289            .cloned()
290            .unwrap_or(serde_json::Value::Null);
291        let Some(cell_id) = data.get("cellId").and_then(|v| v.as_str()) else {
292            debug!(
293                ce_type,
294                "apply_cell_event: missing data.cellId; cell event ignored"
295            );
296            return Ok(Some(ApplyOutcome::Ignored));
297        };
298        let cell_id = cell_id.to_string();
299        // `time` lives on the CloudEvent envelope (RFC3339); the supervisor's
300        // `cloud_event()` helper populates it on every emit.
301        let event_time = event
302            .get("time")
303            .and_then(|v| v.as_str())
304            .map(str::to_string);
305        // RT3-MED ARCH-001-B: strict admission on `specId`. The canonical
306        // supervisor emits a non-empty `data.specId` on every cell event
307        // (`cellos-core::events::cell_*`). A cell event with the field
308        // missing or non-string is malformed; projecting it would either
309        // poison the entry's `spec_id` with `""` on first-event-wins or
310        // leave a phantom cell in the projection. Refuse to project it
311        // and surface the gap as a WARN so the audit trail keeps a
312        // record of the skipped event. Note: a cell event whose
313        // `specId` is structurally present and non-empty is accepted
314        // normally; the back-fill below still tolerates events that
315        // happen to omit fields populated on a prior event.
316        let Some(spec_id) = data.get("specId").and_then(|v| v.as_str()) else {
317            warn!(
318                ce_type,
319                cell_id = %cell_id,
320                "apply_cell_event: missing data.specId; cell event skipped (strict admission)"
321            );
322            return Ok(Some(ApplyOutcome::Ignored));
323        };
324        let spec_id = spec_id.to_string();
325        let run_id = data
326            .get("runId")
327            .and_then(|v| v.as_str())
328            .map(str::to_string);
329
330        let mut map = self.cells.write().await;
331        let entry = map.entry(cell_id.clone()).or_insert_with(|| CellRecord {
332            id: cell_id.clone(),
333            spec_id: spec_id.clone(),
334            run_id: run_id.clone(),
335            state: CellState::Pending,
336            status: String::new(),
337            formation_id: None,
338            started_at: None,
339            destroyed_at: None,
340            outcome: None,
341            exit_code: None,
342        });
343
344        // RT3-HIGH-3 (ARCH-003): once a cell is `Destroyed`, no later
345        // event may mutate its state, exit_code, destroyed_at, or
346        // outcome. JetStream replay + the live subscription can
347        // out-of-order a `started` after a `destroyed` (CHATROOM
348        // Session 17 §ordering), and without this guard the projection
349        // regressed Destroyed → Running. Terminal means terminal: log
350        // a WARN so the unexpected delivery surfaces in ops, then drop
351        // the event for projection purposes.
352        if matches!(entry.state, CellState::Destroyed) {
353            warn!(
354                ce_type = match phase {
355                    CellPhase::Started => "dev.cellos.events.cell.lifecycle.v1.started",
356                    CellPhase::CommandCompleted =>
357                        "dev.cellos.events.cell.command.v1.completed",
358                    CellPhase::Destroyed => "dev.cellos.events.cell.lifecycle.v1.destroyed",
359                },
360                cell_id = %cell_id,
361                event_time = event_time.as_deref().unwrap_or(""),
362                "apply_cell_event: post-terminal event ignored; cell already Destroyed \
363                 (likely out-of-order delivery or replay reordering)"
364            );
365            // Skip projection mutation but still tell the caller we
366            // recognised the event family, so cursor advance stays in
367            // lockstep with the on-wire seq.
368            return Ok(Some(ApplyOutcome::Applied));
369        }
370
371        // Fill in fields that arrive on later events but weren't known on
372        // the first event we saw for this cell.
373        if entry.spec_id.is_empty() && !spec_id.is_empty() {
374            entry.spec_id = spec_id;
375        }
376        if entry.run_id.is_none() {
377            entry.run_id = run_id;
378        }
379
380        match phase {
381            CellPhase::Started => {
382                entry.state = CellState::Running;
383                entry.started_at = event_time;
384            }
385            CellPhase::CommandCompleted => {
386                // RT3-MED ARCH-001-A is handled by the generic terminal-state
387                // guard above (post-Destroyed events return Applied without
388                // mutating the projection). Reaching this arm means the cell
389                // is NOT yet Destroyed, so the exit_code assignment is safe.
390                if let Some(code) = data.get("exitCode").and_then(|v| v.as_i64()) {
391                    entry.exit_code = Some(code as i32);
392                }
393            }
394            CellPhase::Destroyed => {
395                entry.state = CellState::Destroyed;
396                entry.destroyed_at = event_time;
397                if let Some(o) = data.get("outcome").and_then(|v| v.as_str()) {
398                    entry.outcome = Some(o.to_string());
399                }
400            }
401        }
402
403        // Mirror the typed state into the legacy `status: String` field so
404        // pre-ARCH-001 consumers of `CellRecord` (and the JSON response
405        // for `GET /v1/cells`) keep a single human-readable label.
406        entry.status = entry.state.as_str().to_string();
407
408        Ok(Some(ApplyOutcome::Applied))
409    }
410}
411
412/// Internal discriminator for the three cell event types the server
413/// projects on. Pulled out so the match in `apply_cell_event` is
414/// exhaustive over a closed set, not stringly-typed.
415#[derive(Debug, Clone, Copy)]
416enum CellPhase {
417    Started,
418    CommandCompleted,
419    Destroyed,
420}
421
422/// Whether `apply_event_payload` changed the projection. Distinguishing
423/// the two lets the replay path log meaningful "applied vs skipped"
424/// counts and lets tests assert that unknown events are tolerated.
425#[derive(Debug, Clone, Copy, PartialEq, Eq)]
426pub enum ApplyOutcome {
427    /// Event matched a known type and the in-memory projection was
428    /// updated (or created).
429    Applied,
430    /// Event was structurally valid but its type/discriminant is not
431    /// owned by this projection (cell events, future families).
432    Ignored,
433}
434
435/// Lifecycle status of a formation. Mirrors the projector's state machine
436/// (CHATROOM Session 16 §state-machine-table). Only `PENDING` is emitted
437/// directly on POST; all other transitions are driven by CloudEvents
438/// observed on JetStream.
439#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
440#[serde(rename_all = "UPPERCASE")]
441pub enum FormationStatus {
442    Pending,
443    Running,
444    Succeeded,
445    Failed,
446    Cancelled,
447}
448
449/// Projected view of a formation. The full submitted document is retained
450/// so GET /v1/formations/{id} can echo the original spec — clients build
451/// their own state machines from this plus the WebSocket event stream.
452#[derive(Debug, Clone, Serialize, Deserialize)]
453pub struct FormationRecord {
454    pub id: Uuid,
455    pub name: String,
456    pub status: FormationStatus,
457    pub document: serde_json::Value,
458}
459
460/// Lifecycle state of an execution cell, derived from the supervisor's
461/// `cell.lifecycle.v1.*` event stream.
462///
463/// State transitions:
464/// - `Pending` (initial; never emitted on the wire — assigned when we
465///   first observe an event for a cell we haven't seen before, in case
466///   we eventually project a pre-spawn event family)
467/// - `Running` ← `cell.lifecycle.v1.started`
468/// - `Destroyed` ← `cell.lifecycle.v1.destroyed`
469///
470/// `cell.command.v1.completed` does NOT itself transition the state — the
471/// run is still alive until `destroyed` arrives. It only updates
472/// `exit_code` on the projected record.
473#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
474#[serde(rename_all = "lowercase")]
475pub enum CellState {
476    Pending,
477    Running,
478    Destroyed,
479}
480
481impl CellState {
482    /// Lowercase wire string, matching the serde `rename_all = "lowercase"`
483    /// representation. Used to populate `CellRecord::status` for the
484    /// human-readable label clients hit via `GET /v1/cells`.
485    pub fn as_str(self) -> &'static str {
486        match self {
487            CellState::Pending => "pending",
488            CellState::Running => "running",
489            CellState::Destroyed => "destroyed",
490        }
491    }
492}
493
494/// Projected view of an execution cell.
495///
496/// Fields are populated as cell events arrive from JetStream
497/// (`cell.lifecycle.v1.started`, `cell.command.v1.completed`,
498/// `cell.lifecycle.v1.destroyed`). The authoritative cell state lives
499/// in the event log; this struct is a query-latency cache rebuilt on
500/// every server restart by `jetstream::replay_projection`.
501///
502/// `status` is kept as a duplicated lowercase view of `state` so pre-
503/// ARCH-001 clients (and the JSON response shape) continue to see a
504/// single human-readable label without a breaking API change.
505///
506/// `formation_id` is reserved for future correlation: today the
507/// supervisor's CloudEvents do not carry a `formationId` field on the
508/// `Correlation` block (see `cellos-core::Correlation`), so this stays
509/// `None` for supervisor-emitted cells until that gap is closed
510/// upstream. The field is preserved so the wire shape doesn't churn
511/// when correlation lands.
512#[derive(Debug, Clone, Serialize, Deserialize)]
513pub struct CellRecord {
514    /// `data.cellId` from the supervisor's cell events (the projection key).
515    pub id: String,
516    /// `data.specId` from the lifecycle events — the execution-cell-spec id
517    /// the operator declared (e.g. `"e2e-stub-echo"`).
518    pub spec_id: String,
519    /// Optional supervisor run id (`data.runId`); not always populated on
520    /// the started event so we accept `None`.
521    pub run_id: Option<String>,
522    /// Typed lifecycle state.
523    pub state: CellState,
524    /// Lowercase mirror of `state` for legacy/string consumers.
525    pub status: String,
526    /// Reserved for formation correlation once the supervisor emits a
527    /// `formationId` on `Correlation`. Today the field stays `None` for
528    /// supervisor-emitted cells.
529    pub formation_id: Option<Uuid>,
530    /// CloudEvent `time` of the `cell.lifecycle.v1.started` event
531    /// (RFC3339). `None` until the started event has arrived.
532    pub started_at: Option<String>,
533    /// CloudEvent `time` of the `cell.lifecycle.v1.destroyed` event
534    /// (RFC3339). `None` until the destroyed event has arrived.
535    pub destroyed_at: Option<String>,
536    /// `data.outcome` from the destroyed event (e.g. `"succeeded"`,
537    /// `"failed"`).
538    pub outcome: Option<String>,
539    /// `data.exitCode` from the `cell.command.v1.completed` event when
540    /// the supervisor read an authenticated exit code. Omitted on forced
541    /// terminations (see `cellos-core::LifecycleTerminalState`).
542    pub exit_code: Option<i32>,
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548
549    /// Red-team wave 2 (LOW-W2A-1): `bump_cursor` must remain monotonic
550    /// after the SeqCst→AcqRel downgrade. The CAS-loop structure
551    /// guarantees this property independent of memory-ordering choice;
552    /// this test pins the contract so a future drive-by edit that
553    /// replaces the loop with a naive `store` cannot regress
554    /// silently. We hammer the state from many concurrent tokio tasks
555    /// with interleaved seq values and assert the final cursor equals
556    /// the highest seq ever submitted.
557    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
558    async fn bump_cursor_monotonic_under_concurrent_calls() {
559        let state = AppState::new(None, "test");
560        let workers = 8;
561        let per_worker = 500usize;
562        let mut handles = Vec::with_capacity(workers);
563        for w in 0..workers {
564            let s = state.clone();
565            handles.push(tokio::spawn(async move {
566                // Each worker submits seqs in an interleaved pattern so
567                // out-of-order arrivals are guaranteed across workers.
568                for i in 0..per_worker {
569                    let seq = (i as u64) * (workers as u64) + (w as u64);
570                    s.bump_cursor(seq);
571                }
572            }));
573        }
574        for h in handles {
575            h.await.expect("worker task");
576        }
577        let max_seq = (per_worker as u64 - 1) * (workers as u64) + (workers as u64 - 1);
578        assert_eq!(
579            state.cursor(),
580            max_seq,
581            "cursor must equal max seq across all workers; got {} expected {}",
582            state.cursor(),
583            max_seq,
584        );
585    }
586
587    /// Smaller-but-pointed test: a single stale low seq submitted AFTER
588    /// a higher one must NOT cause regression.
589    #[test]
590    fn bump_cursor_rejects_regression() {
591        // Synchronous variant — runtime is not required since the
592        // method itself is not async.
593        let state = AppState::new(None, "test");
594        state.bump_cursor(100);
595        state.bump_cursor(50); // out-of-order arrival
596        state.bump_cursor(99);
597        assert_eq!(state.cursor(), 100);
598        state.bump_cursor(101);
599        assert_eq!(state.cursor(), 101);
600    }
601
602    /// Build a minimal CloudEvent payload for a cell event with the
603    /// supervisor's canonical `data` shape. Helper for the cell-projector
604    /// guard tests below.
605    fn cell_event(ce_type: &str, cell_id: &str, spec_id: Option<&str>) -> serde_json::Value {
606        let mut data = serde_json::Map::new();
607        data.insert("cellId".to_string(), serde_json::json!(cell_id));
608        if let Some(s) = spec_id {
609            data.insert("specId".to_string(), serde_json::json!(s));
610        }
611        serde_json::json!({
612            "type": ce_type,
613            "time": "2026-05-17T12:00:00Z",
614            "data": serde_json::Value::Object(data),
615        })
616    }
617
618    /// RT3-MED ARCH-001-A: a `cell.command.v1.completed` event arriving
619    /// AFTER `cell.lifecycle.v1.destroyed` for the same cell (replay,
620    /// out-of-order JetStream delivery, late supervisor flush) must NOT
621    /// silently overwrite the projection's exit_code. The destroyed
622    /// event reported a final state — the projection must not contradict
623    /// that report. We assert the post-destruction exit_code stays
624    /// `None` (since destroyed arrived before completed in this
625    /// replay order) and the apply outcome is `Ignored` so the audit
626    /// trail surfaces the refused mutation rather than swallowing it.
627    #[tokio::test]
628    async fn cell_destroyed_then_command_completed_does_not_mutate() {
629        let state = AppState::new(None, "test");
630
631        // 1. Apply started → cell exists in Running state.
632        let started = cell_event(
633            "dev.cellos.events.cell.lifecycle.v1.started",
634            "cell-arch001a",
635            Some("e2e-stub-echo"),
636        );
637        let out = state
638            .apply_event_payload(&serde_json::to_vec(&started).unwrap())
639            .await
640            .expect("started apply");
641        assert_eq!(out, ApplyOutcome::Applied);
642
643        // 2. Apply destroyed → cell terminal.
644        let mut destroyed = cell_event(
645            "dev.cellos.events.cell.lifecycle.v1.destroyed",
646            "cell-arch001a",
647            Some("e2e-stub-echo"),
648        );
649        destroyed["data"]["outcome"] = serde_json::json!("succeeded");
650        let out = state
651            .apply_event_payload(&serde_json::to_vec(&destroyed).unwrap())
652            .await
653            .expect("destroyed apply");
654        assert_eq!(out, ApplyOutcome::Applied);
655
656        // Sanity: cell is now Destroyed, exit_code still None (no
657        // completed event has been applied yet).
658        {
659            let cells = state.cells.read().await;
660            let entry = cells.get("cell-arch001a").expect("cell present");
661            assert_eq!(entry.state, CellState::Destroyed);
662            assert_eq!(entry.exit_code, None);
663        }
664
665        // 3. Apply a LATE completed event with exitCode=42. The
666        //    terminal-state guard MUST refuse to mutate exit_code.
667        let mut completed = cell_event(
668            "dev.cellos.events.cell.command.v1.completed",
669            "cell-arch001a",
670            Some("e2e-stub-echo"),
671        );
672        completed["data"]["exitCode"] = serde_json::json!(42);
673        let out = state
674            .apply_event_payload(&serde_json::to_vec(&completed).unwrap())
675            .await
676            .expect("late completed apply");
677        assert_eq!(
678            out,
679            ApplyOutcome::Applied,
680            "late completed-after-destroyed: generic terminal-state guard (RT3-HIGH-3) recognizes the event family and advances cursor, but refuses mutation"
681        );
682
683        // Final assertion: exit_code unchanged (still None), state
684        // still Destroyed.
685        let cells = state.cells.read().await;
686        let entry = cells.get("cell-arch001a").expect("cell present");
687        assert_eq!(
688            entry.exit_code, None,
689            "exit_code must NOT be mutated after destroyed (terminal-state guard)"
690        );
691        assert_eq!(entry.state, CellState::Destroyed);
692    }
693
694    /// RT3-MED ARCH-001-B: a cell event with `specId` missing from
695    /// `data` is malformed (the supervisor's canonical emitter always
696    /// populates the field). Strict admission: refuse to project the
697    /// event so we never create a phantom cell with `spec_id: ""`,
698    /// and surface the gap as `ApplyOutcome::Ignored` (caller still
699    /// advances the cursor, audit trail records the skip).
700    #[tokio::test]
701    async fn cell_event_missing_spec_id_is_skipped() {
702        let state = AppState::new(None, "test");
703
704        // Started event with NO specId field at all.
705        let started_no_spec = cell_event(
706            "dev.cellos.events.cell.lifecycle.v1.started",
707            "cell-arch001b",
708            None,
709        );
710        let out = state
711            .apply_event_payload(&serde_json::to_vec(&started_no_spec).unwrap())
712            .await
713            .expect("apply must not error on malformed event");
714        assert_eq!(
715            out,
716            ApplyOutcome::Ignored,
717            "missing specId must result in Ignored (strict admission)"
718        );
719
720        // The projection must NOT contain a phantom cell for this id.
721        let cells = state.cells.read().await;
722        assert!(
723            !cells.contains_key("cell-arch001b"),
724            "projection must not gain a phantom cell from an event missing specId"
725        );
726        assert!(
727            cells.is_empty(),
728            "projection must remain empty after a single malformed event"
729        );
730    }
731}