cellos-server 0.5.2

HTTP control plane for CellOS — admission, projection over JetStream, WebSocket fan-out of CloudEvents. Pure event-sourced architecture.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
//! Shared application state for the HTTP control plane.
//!
//! `AppState` is `Clone`-able (all inner handles are `Arc`/optional) and is
//! threaded through axum handlers via `with_state`. The in-memory registry
//! is a *projection cache* over JetStream — it is intentionally not the
//! source of truth (CHATROOM.md Session 16). On startup a future version
//! will replay `cellos.events.>` to rebuild this map; for now it is
//! populated by writes from POST /v1/formations.

use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use async_nats::jetstream::context::Context as JsContext;
use async_nats::Client as NatsClient;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::{debug, warn};
use uuid::Uuid;

/// Required bearer token, validated against `Authorization: Bearer <token>`.
/// When `None` the server refuses to start (see `main.rs`).
pub type ApiToken = Arc<String>;

#[derive(Clone)]
pub struct AppState {
    /// NATS connection used both by the WebSocket bridge and by future
    /// projection replay. `Option` because tests can drive the router
    /// without a live broker.
    pub nats: Option<NatsClient>,
    /// JetStream context (ADR-0011, ADR-0015). Populated alongside
    /// `nats` once the server confirms the `CELLOS_EVENTS` stream
    /// exists. `Option` so router tests without a live broker still
    /// build the state cleanly; the WS bridge falls through to a
    /// "broker not configured" close if this is `None`.
    pub jetstream: Option<JsContext>,
    /// In-memory projection of formations. UUID-keyed for stable lookup.
    pub formations: Arc<RwLock<BTreeMap<Uuid, FormationRecord>>>,
    /// In-memory projection of cells.
    pub cells: Arc<RwLock<BTreeMap<String, CellRecord>>>,
    /// Bearer token required on every non-public route.
    pub api_token: ApiToken,
    /// Highest JetStream stream-sequence the projection has applied
    /// (ADR-0015 §D2). The snapshot endpoint reports this as `cursor`
    /// so clients can open `/ws/events?since=<cursor>` and know they
    /// will not miss any event after the snapshot.
    ///
    /// The WebSocket bridge bumps this monotonically as it forwards
    /// frames (see `ws.rs`). Until the bridge is migrated to a real
    /// JetStream consumer, this counter is per-process and resets on
    /// restart — that is acceptable for the MVP because every browser
    /// reconnect repeats the snapshot fetch.
    pub applied_cursor: Arc<AtomicU64>,
}

impl AppState {
    /// Constructor used by both `main.rs` and the test harness.
    pub fn new(nats: Option<NatsClient>, api_token: impl Into<String>) -> Self {
        Self {
            nats,
            jetstream: None,
            formations: Arc::new(RwLock::new(BTreeMap::new())),
            cells: Arc::new(RwLock::new(BTreeMap::new())),
            api_token: Arc::new(api_token.into()),
            applied_cursor: Arc::new(AtomicU64::new(0)),
        }
    }

    /// Attach a JetStream context. Called from `main.rs` after
    /// `ensure_stream` succeeds. Returns the same `AppState` for
    /// builder-style chaining in tests.
    pub fn with_jetstream(mut self, ctx: JsContext) -> Self {
        self.jetstream = Some(ctx);
        self
    }

    /// Current snapshot cursor — the highest seq the server has applied.
    ///
    /// Red-team wave 2 (LOW-W2A-1): `Acquire` is sufficient here. We need
    /// to see writes that happen-before any successful `bump_cursor`
    /// (i.e. the projection-apply write inside `apply_event_payload`);
    /// the previous `SeqCst` enforced a global total order with every
    /// other `SeqCst` operation in the process (counter `fetch_add`s in
    /// `sni_proxy`, etc.) for no extra correctness gain on this read.
    pub fn cursor(&self) -> u64 {
        self.applied_cursor.load(Ordering::Acquire)
    }

    /// Bump the snapshot cursor to at least `seq`. Monotonic; out-of-order
    /// values are ignored. Used by the WebSocket bridge as it forwards
    /// frames.
    ///
    /// Red-team wave 2 (LOW-W2A-1): downgraded from `SeqCst` to
    /// `AcqRel`/`Acquire`. The CAS-loop structure (re-load on conflict,
    /// strict `seq > current` gate) guarantees monotonicity independent
    /// of the memory ordering chosen. `AcqRel` on success publishes the
    /// new cursor to every subsequent `cursor()` reader (snapshot GET
    /// handlers, primarily); `Acquire` on the reload picks up the
    /// observed value from the winner of the conflict.
    pub fn bump_cursor(&self, seq: u64) {
        // CAS loop keeps the cursor monotonic across concurrent ws workers.
        let mut current = self.applied_cursor.load(Ordering::Acquire);
        while seq > current {
            match self.applied_cursor.compare_exchange(
                current,
                seq,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => break,
                Err(observed) => current = observed,
            }
        }
    }

    /// Apply a single CloudEvent payload (raw JSON bytes from
    /// JetStream) to the in-memory projection.
    ///
    /// This is shared between the boot-time replay path
    /// (`jetstream::replay_projection`) and the live WebSocket bridge
    /// (`ws.rs`). Centralising the apply logic guarantees that what
    /// the server reconstructs on startup is exactly what it
    /// applies live — there is no "replay-only" code path that could
    /// drift from the live one.
    ///
    /// CloudEvent `type` discriminates the transition. Two event families
    /// mutate state today:
    ///
    /// - `formation.v1.*` — server-emitted formation lifecycle, drives the
    ///   `formations` projection (ADR-0010 admission gate output).
    /// - `cell.lifecycle.v1.*` and `cell.command.v1.completed` —
    ///   supervisor-emitted cell lifecycle (ARCH-001). Without this, the
    ///   server replays cell events from JetStream but never updates the
    ///   `cells` map and `GET /v1/cells` returns `[]`. The cell projector
    ///   landed alongside the formation projector so `cellctl get cells`
    ///   reflects what the supervisor actually ran.
    ///
    /// Unknown event types still advance the JetStream cursor at the
    /// caller (the apply contract is independent of whether the
    /// projection mutated), so audit gaps surface as `ApplyOutcome::Ignored`
    /// in the replay log rather than silent drops.
    pub async fn apply_event_payload(&self, payload: &[u8]) -> anyhow::Result<ApplyOutcome> {
        let event: serde_json::Value = serde_json::from_slice(payload)
            .map_err(|e| anyhow::anyhow!("event payload not JSON: {e}"))?;

        let ce_type = event
            .get("type")
            .and_then(|v| v.as_str())
            .unwrap_or_default()
            .to_string();

        // ── Cell projector (ARCH-001) ────────────────────────────────────
        // The supervisor emits three cell-shaped event types we project on.
        // Match these BEFORE the formation prefix gate; otherwise an
        // unrelated change to the formation gate (e.g. a future
        // canonical-prefix tweak) could swallow cell events.
        if let Some(outcome) = self.apply_cell_event(&ce_type, &event).await? {
            return Ok(outcome);
        }

        // Wave 2 red-team CRITICAL fix (CRIT-W2D-1): the canonical event-type
        // URN family emitted by `cellos-core::events::cloud_event_v1_formation_*`
        // is `dev.cellos.events.cell.formation.v1.*`. The legacy
        // `io.cellos.formation.v1.*` prefix this reader used to require is
        // not emitted by any production code path today — replayed formations
        // would bail at this gate and the in-memory `status` projection would
        // freeze at PENDING forever. We accept BOTH so older event archives
        // still replay cleanly; new emitters MUST use the canonical family
        // enumerated in `cellos-core/src/events.rs` and audited by
        // `docs/audit-log-retention.md` (FC-74).
        const CANONICAL_FORMATION_PREFIX: &str = "dev.cellos.events.cell.formation.v1.";
        const LEGACY_FORMATION_PREFIX: &str = "io.cellos.formation.v1.";
        let phase = if let Some(p) = ce_type.strip_prefix(CANONICAL_FORMATION_PREFIX) {
            p
        } else if let Some(p) = ce_type.strip_prefix(LEGACY_FORMATION_PREFIX) {
            p
        } else {
            // Unknown event family currently no-ops the projection. The
            // cursor still advances at the caller because the apply
            // contract is independent of whether any state changed.
            debug!(
                ce_type,
                "apply_event_payload: not a formation or cell event; ignored"
            );
            return Ok(ApplyOutcome::Ignored);
        };

        // The supervisor packs the formation id into the CloudEvent's
        // `data` payload. Wave-2 fix (CRIT-W2D-1): the canonical emitter in
        // `cellos-core::events::formation_data_v1` uses camelCase keys
        // (`formationId`, `formationName`); the legacy snake_case
        // (`formation_id`, `name`) shape is accepted for archive replay
        // only. `subject` is the last-resort fallback when neither field
        // is populated.
        let data = event
            .get("data")
            .cloned()
            .unwrap_or(serde_json::Value::Null);
        let id_str = data
            .get("formationId")
            .and_then(|v| v.as_str())
            .or_else(|| data.get("formation_id").and_then(|v| v.as_str()))
            .or_else(|| event.get("subject").and_then(|v| v.as_str()))
            .ok_or_else(|| anyhow::anyhow!("event missing formationId"))?;
        let id = Uuid::parse_str(id_str)
            .map_err(|e| anyhow::anyhow!("event formationId not a UUID ({id_str}): {e}"))?;

        let name = data
            .get("formationName")
            .and_then(|v| v.as_str())
            .or_else(|| data.get("name").and_then(|v| v.as_str()))
            .unwrap_or("")
            .to_string();

        let status = match phase {
            "created" => FormationStatus::Pending,
            "launching" | "running" | "degraded" => FormationStatus::Running,
            "completed" => FormationStatus::Succeeded,
            "failed" => FormationStatus::Failed,
            // `cancelled` has no canonical emitter constructor in
            // `cellos-core` today (see red-team-findings-A.md); the arm is
            // retained so a future `cloud_event_v1_formation_cancelled`
            // lands on a working reader. The legacy prefix can still carry
            // this phase.
            "cancelled" => FormationStatus::Cancelled,
            _ => {
                debug!(
                    ce_type,
                    "apply_event_payload: unknown formation event; ignored"
                );
                return Ok(ApplyOutcome::Ignored);
            }
        };

        let mut map = self.formations.write().await;
        let entry = map.entry(id).or_insert_with(|| FormationRecord {
            id,
            name: name.clone(),
            status,
            document: data.clone(),
        });
        if !name.is_empty() {
            entry.name = name;
        }
        entry.status = status;
        Ok(ApplyOutcome::Applied)
    }

    /// Apply a single supervisor-emitted cell event to the `cells`
    /// projection (ARCH-001).
    ///
    /// Returns:
    /// - `Ok(Some(ApplyOutcome::Applied))` if the event matched one of the
    ///   three supported cell types and the projection mutated.
    /// - `Ok(Some(ApplyOutcome::Ignored))` if the event matched a cell type
    ///   but lacked the data we need (e.g. missing `cellId`); the caller
    ///   still advances the cursor.
    /// - `Ok(None)` if the event is NOT a cell event — caller falls through
    ///   to the formation projector.
    /// - `Err(_)` for payloads that cannot be parsed at all.
    ///
    /// Three event types are consumed (see `cellos-core::events` and the
    /// supervisor's emit sites in `crates/cellos-supervisor/src/supervisor.rs`):
    /// - `dev.cellos.events.cell.lifecycle.v1.started`
    /// - `dev.cellos.events.cell.command.v1.completed`
    /// - `dev.cellos.events.cell.lifecycle.v1.destroyed`
    ///
    /// Identity / network / observability / policy cell events are not
    /// projected here — they're audit-trail events the supervisor emits,
    /// not cell-state transitions cellctl users care about for `get cells`.
    async fn apply_cell_event(
        &self,
        ce_type: &str,
        event: &serde_json::Value,
    ) -> anyhow::Result<Option<ApplyOutcome>> {
        const STARTED: &str = "dev.cellos.events.cell.lifecycle.v1.started";
        const COMPLETED: &str = "dev.cellos.events.cell.command.v1.completed";
        const DESTROYED: &str = "dev.cellos.events.cell.lifecycle.v1.destroyed";

        let phase = match ce_type {
            STARTED => CellPhase::Started,
            COMPLETED => CellPhase::CommandCompleted,
            DESTROYED => CellPhase::Destroyed,
            _ => return Ok(None),
        };

        let data = event
            .get("data")
            .cloned()
            .unwrap_or(serde_json::Value::Null);
        let Some(cell_id) = data.get("cellId").and_then(|v| v.as_str()) else {
            debug!(
                ce_type,
                "apply_cell_event: missing data.cellId; cell event ignored"
            );
            return Ok(Some(ApplyOutcome::Ignored));
        };
        let cell_id = cell_id.to_string();
        // `time` lives on the CloudEvent envelope (RFC3339); the supervisor's
        // `cloud_event()` helper populates it on every emit.
        let event_time = event
            .get("time")
            .and_then(|v| v.as_str())
            .map(str::to_string);
        // RT3-MED ARCH-001-B: strict admission on `specId`. The canonical
        // supervisor emits a non-empty `data.specId` on every cell event
        // (`cellos-core::events::cell_*`). A cell event with the field
        // missing or non-string is malformed; projecting it would either
        // poison the entry's `spec_id` with `""` on first-event-wins or
        // leave a phantom cell in the projection. Refuse to project it
        // and surface the gap as a WARN so the audit trail keeps a
        // record of the skipped event. Note: a cell event whose
        // `specId` is structurally present and non-empty is accepted
        // normally; the back-fill below still tolerates events that
        // happen to omit fields populated on a prior event.
        let Some(spec_id) = data.get("specId").and_then(|v| v.as_str()) else {
            warn!(
                ce_type,
                cell_id = %cell_id,
                "apply_cell_event: missing data.specId; cell event skipped (strict admission)"
            );
            return Ok(Some(ApplyOutcome::Ignored));
        };
        let spec_id = spec_id.to_string();
        let run_id = data
            .get("runId")
            .and_then(|v| v.as_str())
            .map(str::to_string);

        let mut map = self.cells.write().await;
        let entry = map.entry(cell_id.clone()).or_insert_with(|| CellRecord {
            id: cell_id.clone(),
            spec_id: spec_id.clone(),
            run_id: run_id.clone(),
            state: CellState::Pending,
            status: String::new(),
            formation_id: None,
            started_at: None,
            destroyed_at: None,
            outcome: None,
            exit_code: None,
        });

        // RT3-HIGH-3 (ARCH-003): once a cell is `Destroyed`, no later
        // event may mutate its state, exit_code, destroyed_at, or
        // outcome. JetStream replay + the live subscription can
        // out-of-order a `started` after a `destroyed` (CHATROOM
        // Session 17 §ordering), and without this guard the projection
        // regressed Destroyed → Running. Terminal means terminal: log
        // a WARN so the unexpected delivery surfaces in ops, then drop
        // the event for projection purposes.
        if matches!(entry.state, CellState::Destroyed) {
            warn!(
                ce_type = match phase {
                    CellPhase::Started => "dev.cellos.events.cell.lifecycle.v1.started",
                    CellPhase::CommandCompleted =>
                        "dev.cellos.events.cell.command.v1.completed",
                    CellPhase::Destroyed => "dev.cellos.events.cell.lifecycle.v1.destroyed",
                },
                cell_id = %cell_id,
                event_time = event_time.as_deref().unwrap_or(""),
                "apply_cell_event: post-terminal event ignored; cell already Destroyed \
                 (likely out-of-order delivery or replay reordering)"
            );
            // Skip projection mutation but still tell the caller we
            // recognised the event family, so cursor advance stays in
            // lockstep with the on-wire seq.
            return Ok(Some(ApplyOutcome::Applied));
        }

        // Fill in fields that arrive on later events but weren't known on
        // the first event we saw for this cell.
        if entry.spec_id.is_empty() && !spec_id.is_empty() {
            entry.spec_id = spec_id;
        }
        if entry.run_id.is_none() {
            entry.run_id = run_id;
        }

        match phase {
            CellPhase::Started => {
                entry.state = CellState::Running;
                entry.started_at = event_time;
            }
            CellPhase::CommandCompleted => {
                // RT3-MED ARCH-001-A is handled by the generic terminal-state
                // guard above (post-Destroyed events return Applied without
                // mutating the projection). Reaching this arm means the cell
                // is NOT yet Destroyed, so the exit_code assignment is safe.
                if let Some(code) = data.get("exitCode").and_then(|v| v.as_i64()) {
                    entry.exit_code = Some(code as i32);
                }
            }
            CellPhase::Destroyed => {
                entry.state = CellState::Destroyed;
                entry.destroyed_at = event_time;
                if let Some(o) = data.get("outcome").and_then(|v| v.as_str()) {
                    entry.outcome = Some(o.to_string());
                }
            }
        }

        // Mirror the typed state into the legacy `status: String` field so
        // pre-ARCH-001 consumers of `CellRecord` (and the JSON response
        // for `GET /v1/cells`) keep a single human-readable label.
        entry.status = entry.state.as_str().to_string();

        Ok(Some(ApplyOutcome::Applied))
    }
}

/// Internal discriminator for the three cell event types the server
/// projects on. Pulled out so the match in `apply_cell_event` is
/// exhaustive over a closed set, not stringly-typed.
#[derive(Debug, Clone, Copy)]
enum CellPhase {
    Started,
    CommandCompleted,
    Destroyed,
}

/// Whether `apply_event_payload` changed the projection. Distinguishing
/// the two lets the replay path log meaningful "applied vs skipped"
/// counts and lets tests assert that unknown events are tolerated.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApplyOutcome {
    /// Event matched a known type and the in-memory projection was
    /// updated (or created).
    Applied,
    /// Event was structurally valid but its type/discriminant is not
    /// owned by this projection (cell events, future families).
    Ignored,
}

/// Lifecycle status of a formation. Mirrors the projector's state machine
/// (CHATROOM Session 16 §state-machine-table). Only `PENDING` is emitted
/// directly on POST; all other transitions are driven by CloudEvents
/// observed on JetStream.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "UPPERCASE")]
pub enum FormationStatus {
    Pending,
    Running,
    Succeeded,
    Failed,
    Cancelled,
}

/// Projected view of a formation. The full submitted document is retained
/// so GET /v1/formations/{id} can echo the original spec — clients build
/// their own state machines from this plus the WebSocket event stream.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FormationRecord {
    pub id: Uuid,
    pub name: String,
    pub status: FormationStatus,
    pub document: serde_json::Value,
}

/// Lifecycle state of an execution cell, derived from the supervisor's
/// `cell.lifecycle.v1.*` event stream.
///
/// State transitions:
/// - `Pending` (initial; never emitted on the wire — assigned when we
///   first observe an event for a cell we haven't seen before, in case
///   we eventually project a pre-spawn event family)
/// - `Running` ← `cell.lifecycle.v1.started`
/// - `Destroyed` ← `cell.lifecycle.v1.destroyed`
///
/// `cell.command.v1.completed` does NOT itself transition the state — the
/// run is still alive until `destroyed` arrives. It only updates
/// `exit_code` on the projected record.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum CellState {
    Pending,
    Running,
    Destroyed,
}

impl CellState {
    /// Lowercase wire string, matching the serde `rename_all = "lowercase"`
    /// representation. Used to populate `CellRecord::status` for the
    /// human-readable label clients hit via `GET /v1/cells`.
    pub fn as_str(self) -> &'static str {
        match self {
            CellState::Pending => "pending",
            CellState::Running => "running",
            CellState::Destroyed => "destroyed",
        }
    }
}

/// Projected view of an execution cell.
///
/// Fields are populated as cell events arrive from JetStream
/// (`cell.lifecycle.v1.started`, `cell.command.v1.completed`,
/// `cell.lifecycle.v1.destroyed`). The authoritative cell state lives
/// in the event log; this struct is a query-latency cache rebuilt on
/// every server restart by `jetstream::replay_projection`.
///
/// `status` is kept as a duplicated lowercase view of `state` so pre-
/// ARCH-001 clients (and the JSON response shape) continue to see a
/// single human-readable label without a breaking API change.
///
/// `formation_id` is reserved for future correlation: today the
/// supervisor's CloudEvents do not carry a `formationId` field on the
/// `Correlation` block (see `cellos-core::Correlation`), so this stays
/// `None` for supervisor-emitted cells until that gap is closed
/// upstream. The field is preserved so the wire shape doesn't churn
/// when correlation lands.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CellRecord {
    /// `data.cellId` from the supervisor's cell events (the projection key).
    pub id: String,
    /// `data.specId` from the lifecycle events — the execution-cell-spec id
    /// the operator declared (e.g. `"e2e-stub-echo"`).
    pub spec_id: String,
    /// Optional supervisor run id (`data.runId`); not always populated on
    /// the started event so we accept `None`.
    pub run_id: Option<String>,
    /// Typed lifecycle state.
    pub state: CellState,
    /// Lowercase mirror of `state` for legacy/string consumers.
    pub status: String,
    /// Reserved for formation correlation once the supervisor emits a
    /// `formationId` on `Correlation`. Today the field stays `None` for
    /// supervisor-emitted cells.
    pub formation_id: Option<Uuid>,
    /// CloudEvent `time` of the `cell.lifecycle.v1.started` event
    /// (RFC3339). `None` until the started event has arrived.
    pub started_at: Option<String>,
    /// CloudEvent `time` of the `cell.lifecycle.v1.destroyed` event
    /// (RFC3339). `None` until the destroyed event has arrived.
    pub destroyed_at: Option<String>,
    /// `data.outcome` from the destroyed event (e.g. `"succeeded"`,
    /// `"failed"`).
    pub outcome: Option<String>,
    /// `data.exitCode` from the `cell.command.v1.completed` event when
    /// the supervisor read an authenticated exit code. Omitted on forced
    /// terminations (see `cellos-core::LifecycleTerminalState`).
    pub exit_code: Option<i32>,
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Red-team wave 2 (LOW-W2A-1): `bump_cursor` must remain monotonic
    /// after the SeqCst→AcqRel downgrade. The CAS-loop structure
    /// guarantees this property independent of memory-ordering choice;
    /// this test pins the contract so a future drive-by edit that
    /// replaces the loop with a naive `store` cannot regress
    /// silently. We hammer the state from many concurrent tokio tasks
    /// with interleaved seq values and assert the final cursor equals
    /// the highest seq ever submitted.
    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
    async fn bump_cursor_monotonic_under_concurrent_calls() {
        let state = AppState::new(None, "test");
        let workers = 8;
        let per_worker = 500usize;
        let mut handles = Vec::with_capacity(workers);
        for w in 0..workers {
            let s = state.clone();
            handles.push(tokio::spawn(async move {
                // Each worker submits seqs in an interleaved pattern so
                // out-of-order arrivals are guaranteed across workers.
                for i in 0..per_worker {
                    let seq = (i as u64) * (workers as u64) + (w as u64);
                    s.bump_cursor(seq);
                }
            }));
        }
        for h in handles {
            h.await.expect("worker task");
        }
        let max_seq = (per_worker as u64 - 1) * (workers as u64) + (workers as u64 - 1);
        assert_eq!(
            state.cursor(),
            max_seq,
            "cursor must equal max seq across all workers; got {} expected {}",
            state.cursor(),
            max_seq,
        );
    }

    /// Smaller-but-pointed test: a single stale low seq submitted AFTER
    /// a higher one must NOT cause regression.
    #[test]
    fn bump_cursor_rejects_regression() {
        // Synchronous variant — runtime is not required since the
        // method itself is not async.
        let state = AppState::new(None, "test");
        state.bump_cursor(100);
        state.bump_cursor(50); // out-of-order arrival
        state.bump_cursor(99);
        assert_eq!(state.cursor(), 100);
        state.bump_cursor(101);
        assert_eq!(state.cursor(), 101);
    }

    /// Build a minimal CloudEvent payload for a cell event with the
    /// supervisor's canonical `data` shape. Helper for the cell-projector
    /// guard tests below.
    fn cell_event(ce_type: &str, cell_id: &str, spec_id: Option<&str>) -> serde_json::Value {
        let mut data = serde_json::Map::new();
        data.insert("cellId".to_string(), serde_json::json!(cell_id));
        if let Some(s) = spec_id {
            data.insert("specId".to_string(), serde_json::json!(s));
        }
        serde_json::json!({
            "type": ce_type,
            "time": "2026-05-17T12:00:00Z",
            "data": serde_json::Value::Object(data),
        })
    }

    /// RT3-MED ARCH-001-A: a `cell.command.v1.completed` event arriving
    /// AFTER `cell.lifecycle.v1.destroyed` for the same cell (replay,
    /// out-of-order JetStream delivery, late supervisor flush) must NOT
    /// silently overwrite the projection's exit_code. The destroyed
    /// event reported a final state — the projection must not contradict
    /// that report. We assert the post-destruction exit_code stays
    /// `None` (since destroyed arrived before completed in this
    /// replay order) and the apply outcome is `Ignored` so the audit
    /// trail surfaces the refused mutation rather than swallowing it.
    #[tokio::test]
    async fn cell_destroyed_then_command_completed_does_not_mutate() {
        let state = AppState::new(None, "test");

        // 1. Apply started → cell exists in Running state.
        let started = cell_event(
            "dev.cellos.events.cell.lifecycle.v1.started",
            "cell-arch001a",
            Some("e2e-stub-echo"),
        );
        let out = state
            .apply_event_payload(&serde_json::to_vec(&started).unwrap())
            .await
            .expect("started apply");
        assert_eq!(out, ApplyOutcome::Applied);

        // 2. Apply destroyed → cell terminal.
        let mut destroyed = cell_event(
            "dev.cellos.events.cell.lifecycle.v1.destroyed",
            "cell-arch001a",
            Some("e2e-stub-echo"),
        );
        destroyed["data"]["outcome"] = serde_json::json!("succeeded");
        let out = state
            .apply_event_payload(&serde_json::to_vec(&destroyed).unwrap())
            .await
            .expect("destroyed apply");
        assert_eq!(out, ApplyOutcome::Applied);

        // Sanity: cell is now Destroyed, exit_code still None (no
        // completed event has been applied yet).
        {
            let cells = state.cells.read().await;
            let entry = cells.get("cell-arch001a").expect("cell present");
            assert_eq!(entry.state, CellState::Destroyed);
            assert_eq!(entry.exit_code, None);
        }

        // 3. Apply a LATE completed event with exitCode=42. The
        //    terminal-state guard MUST refuse to mutate exit_code.
        let mut completed = cell_event(
            "dev.cellos.events.cell.command.v1.completed",
            "cell-arch001a",
            Some("e2e-stub-echo"),
        );
        completed["data"]["exitCode"] = serde_json::json!(42);
        let out = state
            .apply_event_payload(&serde_json::to_vec(&completed).unwrap())
            .await
            .expect("late completed apply");
        assert_eq!(
            out,
            ApplyOutcome::Applied,
            "late completed-after-destroyed: generic terminal-state guard (RT3-HIGH-3) recognizes the event family and advances cursor, but refuses mutation"
        );

        // Final assertion: exit_code unchanged (still None), state
        // still Destroyed.
        let cells = state.cells.read().await;
        let entry = cells.get("cell-arch001a").expect("cell present");
        assert_eq!(
            entry.exit_code, None,
            "exit_code must NOT be mutated after destroyed (terminal-state guard)"
        );
        assert_eq!(entry.state, CellState::Destroyed);
    }

    /// RT3-MED ARCH-001-B: a cell event with `specId` missing from
    /// `data` is malformed (the supervisor's canonical emitter always
    /// populates the field). Strict admission: refuse to project the
    /// event so we never create a phantom cell with `spec_id: ""`,
    /// and surface the gap as `ApplyOutcome::Ignored` (caller still
    /// advances the cursor, audit trail records the skip).
    #[tokio::test]
    async fn cell_event_missing_spec_id_is_skipped() {
        let state = AppState::new(None, "test");

        // Started event with NO specId field at all.
        let started_no_spec = cell_event(
            "dev.cellos.events.cell.lifecycle.v1.started",
            "cell-arch001b",
            None,
        );
        let out = state
            .apply_event_payload(&serde_json::to_vec(&started_no_spec).unwrap())
            .await
            .expect("apply must not error on malformed event");
        assert_eq!(
            out,
            ApplyOutcome::Ignored,
            "missing specId must result in Ignored (strict admission)"
        );

        // The projection must NOT contain a phantom cell for this id.
        let cells = state.cells.read().await;
        assert!(
            !cells.contains_key("cell-arch001b"),
            "projection must not gain a phantom cell from an event missing specId"
        );
        assert!(
            cells.is_empty(),
            "projection must remain empty after a single malformed event"
        );
    }
}