Skip to main content

arkhe_forge_platform/
projection.rs

1//! L2 Projection observer pipeline.
2//!
3//! L0 emits deterministic events; L2 turns those events into denormalized
4//! read-model rows that PG (or another store) serves to higher layers. This
5//! module exposes: the [`Projection`] trait, a
6//! [`ProjectionRouter`] that dispatches [`EventRecord`]s by `TypeCode`,
7//! a [`ProjectionStore`] abstraction, an in-memory store, and active /
8//! passive / draining lifecycle transitions.
9//!
10//! The PG-backed store, the L0 observer bridge, and the
11//! `kernel_projection_state` chain-anchored view route through the trait surface.
12
13use core::marker::PhantomData;
14use std::collections::HashMap;
15
16use arkhe_forge_core::activity::{ActivityId, ActivityRecord, EntityShellId};
17use arkhe_forge_core::actor::{ActorId, ActorProfile, UserBinding};
18use arkhe_forge_core::brand::ShellId;
19use arkhe_forge_core::context::EventRecord;
20use arkhe_forge_core::entry::{EntryBody, EntryCore, EntryId, EntryParentDepth};
21use arkhe_forge_core::event::{ArkheEvent, CrossShellActivity};
22use arkhe_forge_core::space::{ParentChainDepth, SpaceConfig, SpaceId, SpaceMembership};
23use arkhe_kernel::abi::{InstanceId, Tick, TypeCode};
24use serde::{Deserialize, Serialize};
25
26use crate::manifest::ManifestSnapshot;
27
28// ===================== Lifecycle + Context + Errors =====================
29
30/// Observer worker lifecycle (active-passive HA).
31///
32/// * `Passive` — read-only secondary. Consumes events for warm standby but
33///   does not commit writes upstream.
34/// * `Active` — primary writer. The only projection state permitted to call
35///   `ProjectionStore` mutators.
36/// * `Draining` — graceful shutdown. Rejects new work, flushes in-flight.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38#[non_exhaustive]
39pub enum ObserverState {
40    /// Not primary; events are observed but writes are blocked.
41    Passive,
42    /// Primary writer.
43    Active,
44    /// Winding down — rejects new work.
45    Draining,
46}
47
48/// Outcome of an auto-promote policy evaluation.
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50#[non_exhaustive]
51pub enum PromotionDecision {
52    /// Policy + elapsed time justify a `Passive → Active` transition.
53    Promote,
54    /// Policy requires additional wait / manual operator approval.
55    Wait,
56}
57
58/// Minimum number of KMS health channels that must report `Healthy` for the
59/// `after_60min` auto-promote policy to clear its guardrail. Matches the
60/// default 2-of-3 quorum — operators that provision more channels
61/// can re-tune via a future manifest field.
62pub const HF2_HEALTH_QUORUM_MIN: usize = 2;
63
64/// Per-dispatch context carried alongside an [`EventRecord`]. The `'i`
65/// lifetime reserves the slot that binds to the L0
66/// `Effect<'i, Authorized>` borrow, and also scopes the optional manifest
67/// snapshot reference.
68pub struct ProjectionContext<'i> {
69    /// Tick at which the event is being applied.
70    pub tick: Tick,
71    /// Runtime instance identifier.
72    pub instance_id: InstanceId,
73    /// Active manifest snapshot, if one has been loaded. `None` is legal
74    /// for Tier-0 dev bootstrap paths that run before the first manifest
75    /// has been emitted via `RuntimeBootstrap`.
76    pub manifest: Option<&'i ManifestSnapshot>,
77    _phantom: PhantomData<&'i ()>,
78}
79
80impl<'i> ProjectionContext<'i> {
81    /// Construct a projection dispatch context without a manifest.
82    #[inline]
83    #[must_use]
84    pub fn new(tick: Tick, instance_id: InstanceId) -> Self {
85        Self {
86            tick,
87            instance_id,
88            manifest: None,
89            _phantom: PhantomData,
90        }
91    }
92
93    /// Construct a projection dispatch context with an attached manifest
94    /// snapshot. Callers that have loaded a manifest should use this path
95    /// so projection workers can key on shell policy (tier, cipher, etc.).
96    #[inline]
97    #[must_use]
98    pub fn with_manifest(
99        tick: Tick,
100        instance_id: InstanceId,
101        manifest: &'i ManifestSnapshot,
102    ) -> Self {
103        Self {
104            tick,
105            instance_id,
106            manifest: Some(manifest),
107            _phantom: PhantomData,
108        }
109    }
110}
111
112/// Projection-side failure taxonomy.
113#[derive(Debug, thiserror::Error)]
114#[non_exhaustive]
115pub enum ProjectionError {
116    /// Event sequence moved backward (corruption or mis-routed dispatch).
117    #[error("projection sequence backward: last {last}, incoming {incoming}")]
118    SequenceBackward {
119        /// Last sequence applied by this projection.
120        last: u64,
121        /// Sequence number of the rejected incoming event.
122        incoming: u64,
123    },
124
125    /// A sequence number was skipped — the replay harness needs to fetch the
126    /// missing range before this projection can advance.
127    #[error("projection sequence gap: last {last}, incoming {incoming}")]
128    SequenceGap {
129        /// Last sequence applied by this projection.
130        last: u64,
131        /// Sequence number of the event that exposed the gap.
132        incoming: u64,
133    },
134
135    /// Caller attempted a mutation in a non-`Active` state (observer is
136    /// Passive or Draining).
137    #[error("observer not active: current state {state:?}")]
138    NotActive {
139        /// Observer state at the time of the attempted mutation.
140        state: ObserverState,
141    },
142
143    /// Storage-layer error (in-memory corruption, PG driver, …).
144    #[error("projection storage error: {0}")]
145    Storage(&'static str),
146
147    /// Event payload failed to decode.
148    #[error("event decode failed: {0}")]
149    DecodeFailed(&'static str),
150
151    /// An event targeted an Actor / Space / Entry / Activity that the
152    /// projection has no row for.
153    #[error("projection row missing")]
154    MissingRow,
155}
156
157// ===================== Projection trait =====================
158
159/// L2 projection worker. Each implementor owns a read-model view that is
160/// kept in sync with the L1 event stream for a specific set of `TypeCode`s.
161///
162/// Implementors must be `Send + Sync` so the `ProjectionRouter` can run
163/// across worker threads; dedup / gap detection is centralised in the
164/// router using [`Projection::last_applied`].
165pub trait Projection: Send + Sync {
166    /// TypeCodes this projection observes — the router filters incoming
167    /// events against this slice.
168    fn observes(&self) -> &[TypeCode];
169
170    /// Apply an event. Called only after router-side dedup + gap checks
171    /// have succeeded. Implementations must:
172    ///
173    /// 1. Update their internal view state.
174    /// 2. Bump `last_applied` to the event's `(sequence, tick)`.
175    fn on_event(
176        &mut self,
177        event: &EventRecord,
178        ctx: &ProjectionContext<'_>,
179    ) -> Result<(), ProjectionError>;
180
181    /// React to a worker-state transition (Passive ↔ Active ↔ Draining).
182    /// Default is no-op.
183    fn on_state_change(&mut self, _new_state: ObserverState) -> Result<(), ProjectionError> {
184        Ok(())
185    }
186
187    /// Last `(sequence, tick)` applied — `None` if the projection is fresh.
188    fn last_applied(&self) -> Option<(u64, Tick)>;
189}
190
191// ===================== Projection view structs =====================
192
193/// `(u64, Tick)` pair tracking the last event applied.
194#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
195pub struct ProjectionCursor {
196    /// Last sequence number applied.
197    pub sequence: u64,
198    /// Tick at which the last event was applied.
199    pub tick: Tick,
200}
201
202/// Actor-facing read-model row — `ActorProfile` + optional `UserBinding`.
203#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
204pub struct ActorProjection {
205    /// Wire schema version.
206    pub schema_version: u16,
207    /// Actor identity.
208    pub actor_id: ActorId,
209    /// Authoritative `ActorProfile` Component.
210    pub profile: ActorProfile,
211    /// `UserBinding` is present iff the actor is `Authenticated` (E-actor-2).
212    pub user_binding: Option<UserBinding>,
213    /// Event cursor — dedup / gap anchor.
214    pub cursor: Option<ProjectionCursor>,
215}
216
217/// Space read-model row — `SpaceConfig` + parent-chain cache + membership.
218#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
219pub struct SpaceProjection {
220    /// Wire schema version.
221    pub schema_version: u16,
222    /// Space identity.
223    pub space_id: SpaceId,
224    /// Authoritative `SpaceConfig` Component.
225    pub config: SpaceConfig,
226    /// Cached parent-chain depth (E-space-4 O(1)).
227    pub parent_chain_depth: Option<ParentChainDepth>,
228    /// Membership list for PrivateInvite Spaces.
229    pub membership: Option<SpaceMembership>,
230    /// Event cursor.
231    pub cursor: Option<ProjectionCursor>,
232}
233
234/// Entry read-model row — `EntryCore` + body metadata + depth cache.
235#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
236pub struct EntryProjection {
237    /// Wire schema version.
238    pub schema_version: u16,
239    /// Entry identity.
240    pub entry_id: EntryId,
241    /// Authoritative `EntryCore` Component.
242    pub core: EntryCore,
243    /// `EntryBody` — absent when soft-deleted (E-entry-5).
244    pub body: Option<EntryBody>,
245    /// Cached parent-chain depth (E-entry-3).
246    pub parent_depth: Option<EntryParentDepth>,
247    /// Event cursor.
248    pub cursor: Option<ProjectionCursor>,
249}
250
251/// Activity read-model row — `ActivityRecord` + optional Extension-target
252/// shell marker.
253#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
254pub struct ActivityProjection {
255    /// Wire schema version.
256    pub schema_version: u16,
257    /// Activity identity.
258    pub activity_id: ActivityId,
259    /// Authoritative `ActivityRecord` Component.
260    pub record: ActivityRecord,
261    /// Shell marker for Extension-target Activities (E-act-7).
262    pub entity_shell_id: Option<EntityShellId>,
263    /// Event cursor.
264    pub cursor: Option<ProjectionCursor>,
265}
266
267// ===================== ProjectionStore =====================
268
269/// Storage abstraction for projection rows. An in-memory
270/// implementation ships with this crate; PG-backed storage plugs in
271/// at the L2 service layer.
272pub trait ProjectionStore: Send + Sync {
273    /// Upsert an Actor row. `Active` observer only.
274    fn upsert_actor(&mut self, row: &ActorProjection) -> Result<(), ProjectionError>;
275
276    /// Upsert a Space row. `Active` observer only.
277    fn upsert_space(&mut self, row: &SpaceProjection) -> Result<(), ProjectionError>;
278
279    /// Upsert an Entry row. `Active` observer only.
280    fn upsert_entry(&mut self, row: &EntryProjection) -> Result<(), ProjectionError>;
281
282    /// Upsert an Activity row. `Active` observer only.
283    fn upsert_activity(&mut self, row: &ActivityProjection) -> Result<(), ProjectionError>;
284
285    /// Read an Actor row.
286    fn get_actor(&self, actor_id: ActorId) -> Option<ActorProjection>;
287    /// Read a Space row.
288    fn get_space(&self, space_id: SpaceId) -> Option<SpaceProjection>;
289    /// Read an Entry row.
290    fn get_entry(&self, entry_id: EntryId) -> Option<EntryProjection>;
291    /// Read an Activity row.
292    fn get_activity(&self, activity_id: ActivityId) -> Option<ActivityProjection>;
293}
294
295/// In-memory [`ProjectionStore`] — intended for tests and Tier-0 dev runs.
296#[derive(Debug, Default)]
297pub struct InMemoryProjectionStore {
298    actors: HashMap<ActorId, ActorProjection>,
299    spaces: HashMap<SpaceId, SpaceProjection>,
300    entries: HashMap<EntryId, EntryProjection>,
301    activities: HashMap<ActivityId, ActivityProjection>,
302}
303
304impl InMemoryProjectionStore {
305    /// Construct an empty store.
306    #[inline]
307    #[must_use]
308    pub fn new() -> Self {
309        Self::default()
310    }
311}
312
313impl ProjectionStore for InMemoryProjectionStore {
314    fn upsert_actor(&mut self, row: &ActorProjection) -> Result<(), ProjectionError> {
315        self.actors.insert(row.actor_id, row.clone());
316        Ok(())
317    }
318    fn upsert_space(&mut self, row: &SpaceProjection) -> Result<(), ProjectionError> {
319        self.spaces.insert(row.space_id, row.clone());
320        Ok(())
321    }
322    fn upsert_entry(&mut self, row: &EntryProjection) -> Result<(), ProjectionError> {
323        self.entries.insert(row.entry_id, row.clone());
324        Ok(())
325    }
326    fn upsert_activity(&mut self, row: &ActivityProjection) -> Result<(), ProjectionError> {
327        self.activities.insert(row.activity_id, row.clone());
328        Ok(())
329    }
330    fn get_actor(&self, actor_id: ActorId) -> Option<ActorProjection> {
331        self.actors.get(&actor_id).cloned()
332    }
333    fn get_space(&self, space_id: SpaceId) -> Option<SpaceProjection> {
334        self.spaces.get(&space_id).cloned()
335    }
336    fn get_entry(&self, entry_id: EntryId) -> Option<EntryProjection> {
337        self.entries.get(&entry_id).cloned()
338    }
339    fn get_activity(&self, activity_id: ActivityId) -> Option<ActivityProjection> {
340        self.activities.get(&activity_id).cloned()
341    }
342}
343
344// ===================== Router =====================
345
346/// Event-stream dispatcher. Matches incoming events to registered
347/// projections by `TypeCode`, enforces dedup + gap detection via
348/// `Projection::last_applied`, and propagates observer state transitions.
349pub struct ProjectionRouter {
350    projections: Vec<Box<dyn Projection>>,
351    state: ObserverState,
352}
353
354impl ProjectionRouter {
355    /// Build a router in the `Passive` state — promote to `Active` before
356    /// committing writes.
357    #[inline]
358    #[must_use]
359    pub fn new() -> Self {
360        Self {
361            projections: Vec::new(),
362            state: ObserverState::Passive,
363        }
364    }
365
366    /// Register a projection.
367    pub fn register(&mut self, projection: Box<dyn Projection>) {
368        self.projections.push(projection);
369    }
370
371    /// Current observer state.
372    #[inline]
373    #[must_use]
374    pub fn state(&self) -> ObserverState {
375        self.state
376    }
377
378    /// Transition to `Active` — fails if currently `Draining`.
379    pub fn promote_to_active(&mut self) -> Result<(), ProjectionError> {
380        if self.state == ObserverState::Draining {
381            return Err(ProjectionError::NotActive { state: self.state });
382        }
383        self.transition(ObserverState::Active)
384    }
385
386    /// Evaluate the shell's `kms_auto_promote` policy against three inputs:
387    /// the elapsed outage, the multi-channel KMS health quorum, and the
388    /// threshold-HSM share readiness.
389    ///
390    /// Policy values:
391    ///
392    /// | `kms_auto_promote`  | Decision matrix |
393    /// |---------------------|-----------------|
394    /// | `"manual"`          | Always `Some(Wait)` — operator drives the promotion manually. |
395    /// | `"after_60min"`     | `Some(Promote)` iff `primary_down_duration >= 1h` **and** the KMS health quorum has at least [`HF2_HEALTH_QUORUM_MIN`] channels healthy; otherwise `Some(Wait)`. |
396    /// | `"threshold_hsm"`   | `Some(Promote)` iff `threshold_ready` (t-of-n shares collected); otherwise `Some(Wait)`. |
397    /// | other               | `None` — unrecognised policy string, operator intervention required. |
398    ///
399    /// Returning `None` is conservative by design: callers treat unknown
400    /// policies as "do not auto-promote" and fall back to manual operator
401    /// action.
402    #[must_use]
403    pub fn evaluate_auto_promote(
404        &self,
405        manifest: &crate::manifest::ManifestSnapshot,
406        primary_down_duration: core::time::Duration,
407        health: &crate::hf2_kms::health::MultiChannelHealth,
408        threshold_ready: bool,
409    ) -> Option<PromotionDecision> {
410        match manifest.audit.kms_auto_promote.as_str() {
411            "manual" => Some(PromotionDecision::Wait),
412            "after_60min" => {
413                let elapsed_ok = primary_down_duration >= core::time::Duration::from_secs(60 * 60);
414                let health_ok = health.healthy_count() >= HF2_HEALTH_QUORUM_MIN;
415                if elapsed_ok && health_ok {
416                    Some(PromotionDecision::Promote)
417                } else {
418                    Some(PromotionDecision::Wait)
419                }
420            }
421            "threshold_hsm" => {
422                if threshold_ready {
423                    Some(PromotionDecision::Promote)
424                } else {
425                    Some(PromotionDecision::Wait)
426                }
427            }
428            _ => None,
429        }
430    }
431
432    /// Transition to `Passive`. Used when ceding primary to a peer.
433    pub fn demote_to_passive(&mut self) -> Result<(), ProjectionError> {
434        self.transition(ObserverState::Passive)
435    }
436
437    /// Transition to `Draining`. Terminal — no further state changes.
438    pub fn begin_draining(&mut self) -> Result<(), ProjectionError> {
439        self.transition(ObserverState::Draining)
440    }
441
442    fn transition(&mut self, next: ObserverState) -> Result<(), ProjectionError> {
443        for p in &mut self.projections {
444            p.on_state_change(next)?;
445        }
446        self.state = next;
447        Ok(())
448    }
449
450    /// Dispatch an event to every matching projection. Returns `Ok(n)`
451    /// where `n` is the number of projections that applied the event.
452    ///
453    /// Only the `Active` state may dispatch — `Passive` and `Draining`
454    /// reject with [`ProjectionError::NotActive`]. The `Passive` rejection
455    /// is the production guardrail for active-passive HA: a secondary
456    /// observer that mistakenly accepts writes would create split-brain
457    /// rows in the PG-backed store. The dylint `arkhe-trait-default-check`
458    /// CI gate ensures the contract is honoured by every L2 deployment.
459    pub fn dispatch(
460        &mut self,
461        event: &EventRecord,
462        ctx: &ProjectionContext<'_>,
463    ) -> Result<usize, ProjectionError> {
464        if self.state != ObserverState::Active {
465            return Err(ProjectionError::NotActive { state: self.state });
466        }
467        let tc = TypeCode(event.type_code);
468        let mut applied = 0usize;
469        for p in &mut self.projections {
470            if !p.observes().contains(&tc) {
471                continue;
472            }
473            if let Some((last_seq, _)) = p.last_applied() {
474                if event.sequence == last_seq {
475                    continue; // duplicate — silent no-op
476                }
477                if event.sequence < last_seq {
478                    return Err(ProjectionError::SequenceBackward {
479                        last: last_seq,
480                        incoming: event.sequence,
481                    });
482                }
483                if event.sequence > last_seq.saturating_add(1) {
484                    return Err(ProjectionError::SequenceGap {
485                        last: last_seq,
486                        incoming: event.sequence,
487                    });
488                }
489            }
490            p.on_event(event, ctx)?;
491            applied += 1;
492        }
493        Ok(applied)
494    }
495}
496
497impl Default for ProjectionRouter {
498    fn default() -> Self {
499        Self::new()
500    }
501}
502
503// ===================== CrossShellActivity fanout =====================
504
505/// Read-only fanout projection for `CrossShellActivity`. Stores cross-shell
506/// notifications keyed by the target shell — never touches the source
507/// shell's rows, preserving shell isolation (E-act-2 RA tier).
508#[derive(Debug)]
509pub struct CrossShellActivityFanout {
510    observes: [TypeCode; 1],
511    by_target_shell: HashMap<ShellId, Vec<CrossShellActivity>>,
512    cursor: Option<ProjectionCursor>,
513}
514
515impl Default for CrossShellActivityFanout {
516    fn default() -> Self {
517        Self::new()
518    }
519}
520
521impl CrossShellActivityFanout {
522    /// Construct a fresh fanout, pre-wired to observe `CrossShellActivity`.
523    #[inline]
524    #[must_use]
525    pub fn new() -> Self {
526        Self {
527            observes: [TypeCode(CrossShellActivity::TYPE_CODE)],
528            by_target_shell: HashMap::new(),
529            cursor: None,
530        }
531    }
532
533    /// Borrow the notification queue for a shell (read-only).
534    #[inline]
535    #[must_use]
536    pub fn notifications_for(&self, shell: &ShellId) -> &[CrossShellActivity] {
537        self.by_target_shell
538            .get(shell)
539            .map(Vec::as_slice)
540            .unwrap_or(&[])
541    }
542}
543
544impl Projection for CrossShellActivityFanout {
545    fn observes(&self) -> &[TypeCode] {
546        &self.observes
547    }
548
549    fn on_event(
550        &mut self,
551        event: &EventRecord,
552        _ctx: &ProjectionContext<'_>,
553    ) -> Result<(), ProjectionError> {
554        let notice: CrossShellActivity = postcard::from_bytes(&event.payload)
555            .map_err(|_| ProjectionError::DecodeFailed("CrossShellActivity payload"))?;
556        self.by_target_shell
557            .entry(notice.target_shell_id)
558            .or_default()
559            .push(notice);
560        self.cursor = Some(ProjectionCursor {
561            sequence: event.sequence,
562            tick: event.tick,
563        });
564        Ok(())
565    }
566
567    fn last_applied(&self) -> Option<(u64, Tick)> {
568        self.cursor.map(|c| (c.sequence, c.tick))
569    }
570}
571
572// ===================== Tests =====================
573
574#[cfg(test)]
575#[allow(clippy::unwrap_used, clippy::expect_used)]
576mod tests {
577    use super::*;
578    use arkhe_forge_core::actor::ActorKind;
579    use arkhe_forge_core::component::BoundedString;
580    use arkhe_kernel::abi::EntityId;
581    use bytes::Bytes;
582
583    fn sid(byte: u8) -> ShellId {
584        ShellId([byte; 16])
585    }
586
587    fn ent(v: u64) -> EntityId {
588        EntityId::new(v).unwrap()
589    }
590
591    fn make_cross_shell_event(seq: u64, tick: u64, target: ShellId) -> EventRecord {
592        let notice = CrossShellActivity {
593            schema_version: 1,
594            actor: ActorId::new(ent(1)),
595            target_shell_id: target,
596            record_shell_id: sid(0xAA),
597            detected_tick: Tick(tick),
598        };
599        EventRecord {
600            type_code: CrossShellActivity::TYPE_CODE,
601            sequence: seq,
602            tick: Tick(tick),
603            payload: Bytes::from(postcard::to_stdvec(&notice).unwrap()),
604        }
605    }
606
607    fn ctx(tick: u64) -> ProjectionContext<'static> {
608        ProjectionContext::new(Tick(tick), InstanceId::new(1).unwrap())
609    }
610
611    #[test]
612    fn router_defaults_to_passive() {
613        let r = ProjectionRouter::new();
614        assert_eq!(r.state(), ObserverState::Passive);
615    }
616
617    #[test]
618    fn router_promote_then_demote_then_drain() {
619        let mut r = ProjectionRouter::new();
620        r.promote_to_active().unwrap();
621        assert_eq!(r.state(), ObserverState::Active);
622        r.demote_to_passive().unwrap();
623        assert_eq!(r.state(), ObserverState::Passive);
624        r.begin_draining().unwrap();
625        assert_eq!(r.state(), ObserverState::Draining);
626        // Draining is terminal — promote rejects.
627        assert!(r.promote_to_active().is_err());
628    }
629
630    #[test]
631    fn cross_shell_fanout_routes_to_target_shell_only() {
632        let mut r = ProjectionRouter::new();
633        r.promote_to_active().unwrap();
634        r.register(Box::new(CrossShellActivityFanout::new()));
635        let target = sid(0x33);
636        let ev = make_cross_shell_event(0, 100, target);
637        let applied = r.dispatch(&ev, &ctx(100)).unwrap();
638        assert_eq!(applied, 1);
639    }
640
641    #[test]
642    fn dispatcher_skips_projection_with_no_matching_observer() {
643        let mut r = ProjectionRouter::new();
644        r.promote_to_active().unwrap();
645        r.register(Box::new(CrossShellActivityFanout::new()));
646        let other_event = EventRecord {
647            type_code: 0x0003_0F02, // UserErasureScheduled
648            sequence: 0,
649            tick: Tick(1),
650            payload: Bytes::new(),
651        };
652        let applied = r.dispatch(&other_event, &ctx(1)).unwrap();
653        assert_eq!(applied, 0, "non-observed TypeCode must not hit the fanout");
654    }
655
656    #[test]
657    fn dispatcher_dedups_duplicate_sequence() {
658        let mut r = ProjectionRouter::new();
659        r.promote_to_active().unwrap();
660        r.register(Box::new(CrossShellActivityFanout::new()));
661        let target = sid(0x10);
662        let ev = make_cross_shell_event(0, 5, target);
663        r.dispatch(&ev, &ctx(5)).unwrap();
664        let applied_again = r.dispatch(&ev, &ctx(5)).unwrap();
665        assert_eq!(applied_again, 0, "duplicate sequence must no-op");
666    }
667
668    #[test]
669    fn dispatcher_rejects_gap() {
670        let mut r = ProjectionRouter::new();
671        r.promote_to_active().unwrap();
672        r.register(Box::new(CrossShellActivityFanout::new()));
673        let target = sid(0x10);
674        r.dispatch(&make_cross_shell_event(0, 5, target), &ctx(5))
675            .unwrap();
676        // Jump to sequence 5 — gap (1..=4 missing).
677        let err = r
678            .dispatch(&make_cross_shell_event(5, 6, target), &ctx(6))
679            .unwrap_err();
680        assert!(matches!(err, ProjectionError::SequenceGap { .. }));
681    }
682
683    #[test]
684    fn dispatcher_rejects_backward_sequence() {
685        let mut r = ProjectionRouter::new();
686        r.promote_to_active().unwrap();
687        r.register(Box::new(CrossShellActivityFanout::new()));
688        let target = sid(0x10);
689        r.dispatch(&make_cross_shell_event(2, 5, target), &ctx(5))
690            .unwrap();
691        let err = r
692            .dispatch(&make_cross_shell_event(1, 5, target), &ctx(5))
693            .unwrap_err();
694        assert!(matches!(err, ProjectionError::SequenceBackward { .. }));
695    }
696
697    #[test]
698    fn draining_rejects_dispatch() {
699        let mut r = ProjectionRouter::new();
700        r.begin_draining().unwrap();
701        let err = r
702            .dispatch(&make_cross_shell_event(0, 1, sid(0)), &ctx(1))
703            .unwrap_err();
704        assert!(matches!(
705            err,
706            ProjectionError::NotActive {
707                state: ObserverState::Draining
708            }
709        ));
710    }
711
712    #[test]
713    fn passive_rejects_dispatch() {
714        // `ProjectionRouter::new()` starts in Passive — dispatch must reject
715        // until the worker is promoted, otherwise an active-passive HA pair
716        // could create split-brain rows in the PG-backed store.
717        let mut r = ProjectionRouter::new();
718        assert_eq!(r.state(), ObserverState::Passive);
719        let err = r
720            .dispatch(&make_cross_shell_event(0, 1, sid(0)), &ctx(1))
721            .unwrap_err();
722        assert!(matches!(
723            err,
724            ProjectionError::NotActive {
725                state: ObserverState::Passive
726            }
727        ));
728    }
729
730    #[test]
731    fn demote_to_passive_blocks_subsequent_dispatch() {
732        // After a successful Active dispatch, demoting back to Passive must
733        // immediately stop accepting writes — covers the failover-back path.
734        let mut r = ProjectionRouter::new();
735        r.promote_to_active().unwrap();
736        r.register(Box::new(CrossShellActivityFanout::new()));
737        r.dispatch(&make_cross_shell_event(0, 5, sid(0x10)), &ctx(5))
738            .unwrap();
739        r.demote_to_passive().unwrap();
740        let err = r
741            .dispatch(&make_cross_shell_event(1, 6, sid(0x10)), &ctx(6))
742            .unwrap_err();
743        assert!(matches!(
744            err,
745            ProjectionError::NotActive {
746                state: ObserverState::Passive
747            }
748        ));
749    }
750
751    #[test]
752    fn in_memory_store_roundtrip_actor() {
753        let mut store = InMemoryProjectionStore::new();
754        let row = ActorProjection {
755            schema_version: 1,
756            actor_id: ActorId::new(ent(42)),
757            profile: ActorProfile {
758                schema_version: 1,
759                shell_id: sid(0x01),
760                handle: BoundedString::<32>::new("alice").unwrap(),
761                kind: ActorKind::Human,
762                created_tick: Tick(1),
763            },
764            user_binding: None,
765            cursor: None,
766        };
767        store.upsert_actor(&row).unwrap();
768        let fetched = store.get_actor(ActorId::new(ent(42))).unwrap();
769        assert_eq!(fetched, row);
770    }
771
772    #[test]
773    fn cross_shell_fanout_preserves_shell_partition() {
774        let mut fanout = CrossShellActivityFanout::new();
775        let shell_a = sid(0xAA);
776        let shell_b = sid(0xBB);
777        fanout
778            .on_event(&make_cross_shell_event(0, 10, shell_a), &ctx(10))
779            .unwrap();
780        fanout
781            .on_event(&make_cross_shell_event(1, 11, shell_b), &ctx(11))
782            .unwrap();
783        assert_eq!(fanout.notifications_for(&shell_a).len(), 1);
784        assert_eq!(fanout.notifications_for(&shell_b).len(), 1);
785        assert_eq!(fanout.last_applied(), Some((1, Tick(11))));
786    }
787
788    #[test]
789    fn projection_cursor_roundtrip() {
790        let c = ProjectionCursor {
791            sequence: 5,
792            tick: Tick(10),
793        };
794        let bytes = postcard::to_stdvec(&c).unwrap();
795        let back: ProjectionCursor = postcard::from_bytes(&bytes).unwrap();
796        assert_eq!(c, back);
797    }
798
799    /// Manifest auto-promote decision table.
800    /// Run via a small helper that builds a `ManifestSnapshot` with a
801    /// configurable `kms_auto_promote` string.
802    #[test]
803    fn auto_promote_policy_matrix() {
804        use crate::hf2_kms::health::{Channel, MultiChannelHealth, Status};
805        use crate::manifest::{
806            AuditSection, FrontendSection, ManifestSnapshot, RuntimeSection, ShellSection,
807        };
808        use core::time::Duration;
809
810        fn manifest_with(policy: &str) -> ManifestSnapshot {
811            ManifestSnapshot {
812                schema_version: 1,
813                shell: ShellSection {
814                    shell_id: "test".to_string(),
815                    display_name: "Test".to_string(),
816                },
817                runtime: RuntimeSection {
818                    runtime_max: "0.15".to_string(),
819                    runtime_current: "0.13".to_string(),
820                },
821                audit: AuditSection {
822                    pii_cipher: "xchacha20-poly1305".to_string(),
823                    dek_backend: "software-kek".to_string(),
824                    kms_auto_promote: policy.to_string(),
825                    signature_class: "ed25519".to_string(),
826                    compliance_tier: 0,
827                },
828                frontend: FrontendSection::default(),
829            }
830        }
831
832        fn healthy_trio() -> MultiChannelHealth {
833            let mut h = MultiChannelHealth::new(&[
834                Channel::Default,
835                Channel::DnsOverHttps,
836                Channel::StaticIp,
837            ]);
838            for c in [Channel::Default, Channel::DnsOverHttps, Channel::StaticIp] {
839                h.set_status(c, Status::Healthy);
840            }
841            h
842        }
843
844        fn degraded_trio() -> MultiChannelHealth {
845            // Only one channel healthy — below 2/3 quorum.
846            let mut h = MultiChannelHealth::new(&[
847                Channel::Default,
848                Channel::DnsOverHttps,
849                Channel::StaticIp,
850            ]);
851            h.set_status(Channel::Default, Status::Healthy);
852            h.set_status(Channel::DnsOverHttps, Status::Failing);
853            h.set_status(Channel::StaticIp, Status::Failing);
854            h
855        }
856
857        let r = ProjectionRouter::new();
858        let healthy = healthy_trio();
859        let degraded = degraded_trio();
860
861        // Manual policy → always Wait (operator drives the promotion).
862        assert_eq!(
863            r.evaluate_auto_promote(
864                &manifest_with("manual"),
865                Duration::from_secs(7200),
866                &healthy,
867                true,
868            ),
869            Some(PromotionDecision::Wait),
870        );
871
872        // after_60min, short outage → Wait even with full health.
873        assert_eq!(
874            r.evaluate_auto_promote(
875                &manifest_with("after_60min"),
876                Duration::from_secs(59 * 60),
877                &healthy,
878                false,
879            ),
880            Some(PromotionDecision::Wait),
881        );
882
883        // after_60min, outage cleared but health below quorum → Wait.
884        assert_eq!(
885            r.evaluate_auto_promote(
886                &manifest_with("after_60min"),
887                Duration::from_secs(60 * 60),
888                &degraded,
889                false,
890            ),
891            Some(PromotionDecision::Wait),
892        );
893
894        // after_60min, outage cleared AND health quorum met → Promote.
895        assert_eq!(
896            r.evaluate_auto_promote(
897                &manifest_with("after_60min"),
898                Duration::from_secs(60 * 60),
899                &healthy,
900                false,
901            ),
902            Some(PromotionDecision::Promote),
903        );
904
905        // threshold_hsm, shares not collected → Wait.
906        assert_eq!(
907            r.evaluate_auto_promote(
908                &manifest_with("threshold_hsm"),
909                Duration::from_secs(60 * 60),
910                &degraded,
911                false,
912            ),
913            Some(PromotionDecision::Wait),
914        );
915
916        // threshold_hsm, shares collected → Promote (health is not gating here).
917        assert_eq!(
918            r.evaluate_auto_promote(
919                &manifest_with("threshold_hsm"),
920                Duration::from_secs(0),
921                &degraded,
922                true,
923            ),
924            Some(PromotionDecision::Promote),
925        );
926
927        // Unknown policy string → None (conservative default — operator must act).
928        assert!(r
929            .evaluate_auto_promote(
930                &manifest_with("unknown"),
931                Duration::from_secs(86_400),
932                &healthy,
933                true,
934            )
935            .is_none());
936    }
937}