Skip to main content

arkhe_kernel/runtime/
kernel.rs

1//! `Kernel` — top-level orchestrator.
2//!
3//! Step path: `pop_due → deserialize → compute → authorize → dispatch
4//! → apply_stage → observer.deliver`. Per-instance step ordering is
5//! `InstanceId` ascending (A23) — `BTreeMap` iteration delivers it for free.
6
7use std::collections::BTreeMap;
8
9use crate::abi::{ArkheError, CapabilityMask, EntityId, InstanceId, Principal, Tick, TypeCode};
10use crate::state::authz::authorize;
11use crate::state::{
12    Action, ActionContext, Effect, Instance, InstanceConfig, ScheduledActionId, Unverified,
13};
14
15use super::apply::{apply_stage, discard_stage};
16use super::dispatch::dispatch;
17use super::event::{KernelEvent, ObserverHandle};
18use super::observer::{KernelObserver, ObserverRegistry};
19use super::registry::ActionRegistry;
20use super::stage::StepStage;
21
22use crate::persist::{AuthDecisionAnnotation, Wal, WalWriter};
23
24/// Top-level kernel orchestrator.
25///
26/// Lifecycle: [`Kernel::new`] (or [`Kernel::new_with_wal`] /
27/// [`Kernel::new_with_wal_signed`]) → [`register_action`](Kernel::register_action)
28/// → [`create_instance`](Kernel::create_instance) →
29/// [`submit`](Kernel::submit) → [`step`](Kernel::step) (repeat) →
30/// optional [`snapshot`](Kernel::snapshot) / [`export_wal`](Kernel::export_wal).
31///
32/// `Kernel` is `!Sync` (A2 single-thread) and is owned by the caller —
33/// no internal locking, no async. All determinism guarantees depend on
34/// the caller driving a single kernel from one thread.
35pub struct Kernel {
36    instances: BTreeMap<InstanceId, Instance>,
37    action_registry: ActionRegistry,
38    observers: ObserverRegistry,
39    next_instance_id: u64,
40    wal: Option<WalWriter>,
41}
42
43/// Aggregated counters returned by [`Kernel::step`].
44#[derive(Debug, Clone, Default, PartialEq, Eq)]
45pub struct StepReport {
46    /// Number of scheduled actions whose `compute()` ran this step.
47    pub actions_executed: u32,
48    /// Number of effects (Ops) that committed.
49    pub effects_applied: u32,
50    /// Number of effects denied (authorize-deny or budget-deny).
51    pub effects_denied: u32,
52    /// Number of observers newly evicted (first-panic).
53    pub observers_evicted: u32,
54    /// Number of `KernelEvent::DomainEventEmitted` events produced.
55    pub domain_events_emitted: u32,
56}
57
58/// Cross-instance aggregate observability returned by [`Kernel::stats`].
59#[derive(Debug, Clone, Default, PartialEq, Eq)]
60pub struct Stats {
61    /// Total instances currently alive.
62    pub instance_count: usize,
63    /// Total scheduled actions pending across all instances.
64    pub scheduled_action_count: usize,
65    /// Total entities across all instances.
66    pub entity_count: u32,
67    /// Total component bytes across all ledgers.
68    pub component_byte_count: u64,
69    /// Live observer count (pre-eviction).
70    pub observer_count: usize,
71    /// WAL record count; `0` if no WAL is attached.
72    pub wal_record_count: usize,
73}
74
75impl Default for Kernel {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81impl Kernel {
82    /// Construct a fresh kernel with no instances, no observers, and
83    /// no WAL. Use [`new_with_wal`](Kernel::new_with_wal) or
84    /// [`new_with_wal_signed`](Kernel::new_with_wal_signed) instead if
85    /// you want WAL recording from step zero.
86    pub fn new() -> Self {
87        Self {
88            instances: BTreeMap::new(),
89            action_registry: ActionRegistry::new(),
90            observers: ObserverRegistry::new(),
91            next_instance_id: 0,
92            wal: None,
93        }
94    }
95
96    /// Construct a Kernel with an attached WAL writer. Each successfully
97    /// committed step appends one record (A13/A14).
98    pub fn new_with_wal(world_id: [u8; 32], manifest_digest: [u8; 32]) -> Self {
99        Self {
100            instances: BTreeMap::new(),
101            action_registry: ActionRegistry::new(),
102            observers: ObserverRegistry::new(),
103            next_instance_id: 0,
104            wal: Some(WalWriter::new(world_id, manifest_digest)),
105        }
106    }
107
108    /// Construct a Kernel with a WAL writer that signs each record under
109    /// the supplied `SignatureClass` (A16 — Ed25519 (Tier 2) or Hybrid (Ed25519 + ML-DSA 65, CNSA 2.0)). The
110    /// verifying key is pinned in the WAL header so post-hoc verification
111    /// is self-contained.
112    pub fn new_with_wal_signed(
113        world_id: [u8; 32],
114        manifest_digest: [u8; 32],
115        sig_class: crate::persist::SignatureClass,
116    ) -> Self {
117        Self {
118            instances: BTreeMap::new(),
119            action_registry: ActionRegistry::new(),
120            observers: ObserverRegistry::new(),
121            next_instance_id: 0,
122            wal: Some(WalWriter::with_signature(
123                world_id,
124                manifest_digest,
125                sig_class,
126            )),
127        }
128    }
129
130    /// Current chain tip if the kernel has a WAL attached.
131    pub fn wal_chain_tip(&self) -> Option<[u8; 32]> {
132        self.wal.as_ref().map(|w| w.chain_tip())
133    }
134
135    /// Number of WAL records currently buffered (None if no WAL attached).
136    pub fn wal_record_count(&self) -> Option<usize> {
137        self.wal.as_ref().map(|w| w.record_count())
138    }
139
140    /// Consume the kernel and return the accumulated WAL (if any).
141    pub fn export_wal(self) -> Option<Wal> {
142        self.wal.map(Wal::from_writer)
143    }
144
145    /// Register a domain action type with the kernel's dispatch
146    /// registry. Required before [`submit`](Kernel::submit) accepts
147    /// the action's `TYPE_CODE`.
148    pub fn register_action<A: Action>(&mut self) {
149        self.action_registry.register::<A>();
150    }
151
152    /// Register an observer for every kernel event. Equivalent to
153    /// [`register_observer_filtered`](Kernel::register_observer_filtered)
154    /// with `EventMask::ALL`.
155    pub fn register_observer(&mut self, obs: Box<dyn KernelObserver>) -> ObserverHandle {
156        self.observers.register(obs)
157    }
158
159    /// Register an observer with an event-class filter. Only events
160    /// whose variant bit is set in `mask` are delivered to this observer
161    /// — useful when an observer cares about a narrow slice of the event
162    /// stream (e.g. only `DOMAIN_EVENT_EMITTED`). `EventMask::ALL` is
163    /// equivalent to `register_observer`.
164    pub fn register_observer_filtered(
165        &mut self,
166        obs: Box<dyn KernelObserver>,
167        mask: super::event::EventMask,
168    ) -> ObserverHandle {
169        self.observers.register_filtered(obs, mask)
170    }
171
172    /// Create a new instance with the supplied config. Returns the
173    /// freshly-allocated `InstanceId` (monotonic per kernel lifetime).
174    pub fn create_instance(&mut self, config: InstanceConfig) -> InstanceId {
175        self.next_instance_id = self.next_instance_id.saturating_add(1);
176        let id = InstanceId::new(self.next_instance_id).expect("instance id > 0");
177        self.instances.insert(id, Instance::new(id, config));
178        id
179    }
180
181    /// Number of live instances.
182    pub fn instances_len(&self) -> usize {
183        self.instances.len()
184    }
185
186    /// Read-only view of one instance's state. Returns `None` if `id`
187    /// does not exist. The borrow is `&self`, so callers cannot mutate
188    /// the kernel concurrently while a view is live.
189    pub fn instance_view(&self, id: InstanceId) -> Option<super::view::InstanceView<'_>> {
190        self.instances
191            .get(&id)
192            .map(|instance| super::view::InstanceView { instance })
193    }
194
195    /// Capture current kernel state as a serializable snapshot.
196    /// Excludes observers and action registry — caller re-registers those
197    /// after `Kernel::from_snapshot(...)`. WAL is independent and not
198    /// captured here.
199    pub fn snapshot(&self) -> crate::persist::KernelSnapshot {
200        let instances = self
201            .instances
202            .iter()
203            .map(|(id, inst)| (*id, inst.to_snapshot()))
204            .collect();
205        crate::persist::KernelSnapshot::__construct(instances, self.next_instance_id)
206    }
207
208    /// Restore a `Kernel` from a snapshot. The returned kernel has no
209    /// observers, an empty action registry, and no attached WAL — caller
210    /// must re-register everything before resuming `step()`.
211    pub fn from_snapshot(snap: crate::persist::KernelSnapshot) -> Self {
212        let (instances_in, next_instance_id) = snap.__into_parts();
213        let instances = instances_in
214            .into_iter()
215            .map(|(id, s)| (id, Instance::from_snapshot(s)))
216            .collect();
217        Self {
218            instances,
219            action_registry: ActionRegistry::new(),
220            observers: ObserverRegistry::new(),
221            next_instance_id,
222            wal: None,
223        }
224    }
225
226    /// Aggregate observability across all instances. See [`Stats`].
227    pub fn stats(&self) -> Stats {
228        let mut scheduled = 0usize;
229        let mut entities = 0u32;
230        let mut bytes = 0u64;
231        for inst in self.instances.values() {
232            scheduled = scheduled.saturating_add(inst.scheduler().len());
233            entities = entities.saturating_add(inst.ledger().total_entities());
234            bytes = bytes.saturating_add(inst.ledger().total_bytes());
235        }
236        Stats {
237            instance_count: self.instances.len(),
238            scheduled_action_count: scheduled,
239            entity_count: entities,
240            component_byte_count: bytes,
241            observer_count: self.observers.len(),
242            wal_record_count: self.wal.as_ref().map(|w| w.record_count()).unwrap_or(0),
243        }
244    }
245
246    /// Force-unload: drop every instance's inflight-refs entry for
247    /// `route_id` and emit `KernelEvent::ModuleForceUnloaded` with the
248    /// summed live-ref count for the audit trail. Requires `ADMIN_UNLOAD`.
249    ///
250    /// Returns the total live refs that were dropped (`Ok(0)` if no
251    /// instance held the route).
252    pub fn force_unload(
253        &mut self,
254        route_id: crate::abi::RouteId,
255        caps: CapabilityMask,
256    ) -> Result<usize, ArkheError> {
257        if !caps.contains(CapabilityMask::ADMIN_UNLOAD) {
258            return Err(ArkheError::CapabilityDenied);
259        }
260
261        let mut total_live_refs: u32 = 0;
262        for inst in self.instances.values_mut() {
263            if let Some(refs) = inst.inflight_refs_mut().remove(&route_id) {
264                total_live_refs = total_live_refs.saturating_add(refs);
265            }
266        }
267
268        let event = KernelEvent::ModuleForceUnloaded {
269            route_id,
270            live_refs_at_unload: total_live_refs,
271        };
272        let _ = self.observers.deliver(&event);
273
274        Ok(total_live_refs as usize)
275    }
276
277    /// Schedule a serialized action against an instance for execution
278    /// at tick `at`. The bytes must be the canonical postcard encoding
279    /// produced by `<A as Action>::canonical_bytes()` for some
280    /// previously-registered action type matching `action_type_code`.
281    /// Returns the freshly-allocated [`ScheduledActionId`].
282    ///
283    /// Errors with [`ArkheError::InstanceNotFound`] if `instance` is
284    /// unknown.
285    pub fn submit(
286        &mut self,
287        instance: InstanceId,
288        principal: Principal,
289        actor: Option<EntityId>,
290        at: Tick,
291        action_type_code: TypeCode,
292        action_bytes: Vec<u8>,
293    ) -> Result<ScheduledActionId, ArkheError> {
294        let inst = self
295            .instances
296            .get_mut(&instance)
297            .ok_or(ArkheError::InstanceNotFound)?;
298        let counters = inst.id_counters_mut();
299        counters.next_scheduled = counters.next_scheduled.saturating_add(1);
300        let id = ScheduledActionId::new(counters.next_scheduled).expect("scheduled id > 0");
301        inst.scheduler_mut().schedule_with_id(
302            id,
303            at,
304            actor,
305            principal,
306            action_type_code,
307            action_bytes,
308        );
309        Ok(id)
310    }
311
312    /// Process at most one due action per instance, in ascending InstanceId
313    /// order (A23). Returns aggregated counters for the step.
314    pub fn step(&mut self, now: Tick, caps: CapabilityMask) -> StepReport {
315        let mut report = StepReport::default();
316
317        let instance_ids: Vec<InstanceId> = self.instances.keys().copied().collect();
318        for inst_id in instance_ids {
319            let entry = match self.instances.get_mut(&inst_id) {
320                Some(inst) => inst.scheduler_mut().pop_due(now),
321                None => continue,
322            };
323            let entry = match entry {
324                Some(e) => e,
325                None => continue,
326            };
327            report.actions_executed = report.actions_executed.saturating_add(1);
328
329            let reg = match self.action_registry.get(entry.action_type_code).cloned() {
330                Some(r) => r,
331                None => continue,
332            };
333
334            let action = match (reg.deserializer)(reg.schema_version, &entry.action_bytes) {
335                Ok(a) => a,
336                Err(_) => continue,
337            };
338
339            let inst_ref = self.instances.get(&inst_id).expect("instance present");
340            let ctx = ActionContext::new(entry.actor, now, inst_id, inst_ref);
341            let ops = action.compute_dyn(&ctx);
342
343            let mut stage = StepStage::default();
344            let mut next_scheduled_id = inst_ref.id_counters_snapshot().next_scheduled;
345            let budget = inst_ref.config().memory_budget_bytes;
346            let baseline_bytes: i64 = inst_ref.ledger().total_bytes() as i64;
347            let mut any_denied = false;
348            for op in ops {
349                let principal_clone = match &entry.principal {
350                    Principal::Unauthenticated => Principal::Unauthenticated,
351                    Principal::External(e) => Principal::External(*e),
352                    Principal::System => Principal::System,
353                };
354                let eff: Effect<'_, Unverified> = Effect::new(inst_id, principal_clone, op);
355                match authorize(caps, eff) {
356                    Ok(authorized) => {
357                        // Budget enforcement (per-Op, post-authorize, pre-dispatch).
358                        // `budget == 0` means unlimited (default `InstanceConfig`).
359                        // Authorize-deny rolls back the whole stage (any_denied);
360                        // budget-deny is a per-Op skip that does NOT rollback.
361                        if budget > 0 {
362                            let op_size: i64 = match &authorized.op {
363                                crate::state::Op::SetComponent { size, .. } => *size as i64,
364                                crate::state::Op::RemoveComponent { size, .. } => -(*size as i64),
365                                _ => 0,
366                            };
367                            let projected = baseline_bytes
368                                .saturating_add(super::stage::bytes_delta(&stage))
369                                .saturating_add(op_size);
370                            if projected > budget as i64 {
371                                report.effects_denied = report.effects_denied.saturating_add(1);
372                                stage.events.push_back(KernelEvent::EffectFailed {
373                                    instance: inst_id,
374                                    reason: bytes::Bytes::from_static(b"budget_exceeded"),
375                                });
376                                continue;
377                            }
378                        }
379                        dispatch(authorized, &mut stage, now, &mut next_scheduled_id);
380                        report.effects_applied = report.effects_applied.saturating_add(1);
381                    }
382                    Err(_) => {
383                        report.effects_denied = report.effects_denied.saturating_add(1);
384                        any_denied = true;
385                    }
386                }
387            }
388
389            if any_denied {
390                let inst_mut = self.instances.get_mut(&inst_id).expect("instance present");
391                discard_stage(inst_mut, stage);
392                continue;
393            }
394
395            // Domain emit count covers only `DomainEventEmitted`; other staged
396            // events (e.g. `EffectFailed` from budget deny) are kernel events.
397            let domain_emit_count = stage
398                .events
399                .iter()
400                .filter(|e| matches!(e, KernelEvent::DomainEventEmitted { .. }))
401                .count();
402            report.domain_events_emitted = report
403                .domain_events_emitted
404                .saturating_add(domain_emit_count as u32);
405            let events_to_deliver: Vec<KernelEvent> = stage.events.iter().cloned().collect();
406
407            // Snapshot record metadata before stage is consumed by apply.
408            let wal_stage = if self.wal.is_some() {
409                Some(stage.clone())
410            } else {
411                None
412            };
413            let principal_for_wal = match &entry.principal {
414                Principal::Unauthenticated => Principal::Unauthenticated,
415                Principal::External(e) => Principal::External(*e),
416                Principal::System => Principal::System,
417            };
418            let action_bytes_for_wal = entry.action_bytes.clone();
419            let action_type_for_wal = entry.action_type_code;
420
421            let inst_mut = self.instances.get_mut(&inst_id).expect("instance present");
422            apply_stage(inst_mut, stage);
423
424            if let (Some(wal), Some(s)) = (self.wal.as_mut(), wal_stage) {
425                let _ = wal.append(
426                    now,
427                    inst_id,
428                    principal_for_wal,
429                    action_type_for_wal,
430                    action_bytes_for_wal,
431                    caps.bits(),
432                    s,
433                    AuthDecisionAnnotation::AllAuthorized,
434                );
435            }
436
437            for event in events_to_deliver {
438                let evicted = self.observers.deliver(&event);
439                report.observers_evicted = report
440                    .observers_evicted
441                    .saturating_add(evicted.len() as u32);
442            }
443
444            let action_executed = KernelEvent::ActionExecuted {
445                instance: inst_id,
446                action_type: entry.action_type_code,
447                at: now,
448            };
449            let evicted = self.observers.deliver(&action_executed);
450            report.observers_evicted = report
451                .observers_evicted
452                .saturating_add(evicted.len() as u32);
453        }
454
455        report
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462    use bytes::Bytes;
463    use std::sync::atomic::{AtomicU32, Ordering};
464    use std::sync::Arc;
465
466    use crate::abi::{ExternalId, RouteId};
467    use crate::state::traits::_sealed::Sealed;
468    use crate::state::{ActionCompute, ActionDeriv, Op};
469    use serde::{Deserialize, Serialize};
470
471    // ---- Test Action: spawns a single entity with id=42 ----
472    #[derive(Serialize, Deserialize)]
473    struct SpawnOneAction;
474    impl Sealed for SpawnOneAction {}
475    impl ActionDeriv for SpawnOneAction {
476        const TYPE_CODE: TypeCode = TypeCode(100);
477        const SCHEMA_VERSION: u32 = 1;
478    }
479    impl ActionCompute for SpawnOneAction {
480        fn compute(&self, _ctx: &ActionContext) -> Vec<Op> {
481            vec![Op::SpawnEntity {
482                id: EntityId::new(42).unwrap(),
483                owner: Principal::System,
484            }]
485        }
486    }
487
488    #[derive(Serialize, Deserialize)]
489    struct EmitAction;
490    impl Sealed for EmitAction {}
491    impl ActionDeriv for EmitAction {
492        const TYPE_CODE: TypeCode = TypeCode(101);
493        const SCHEMA_VERSION: u32 = 1;
494    }
495    impl ActionCompute for EmitAction {
496        fn compute(&self, _ctx: &ActionContext) -> Vec<Op> {
497            vec![Op::EmitEvent {
498                actor: None,
499                event_type_code: TypeCode(7),
500                event_bytes: Bytes::from_static(b"hello"),
501            }]
502        }
503    }
504
505    #[derive(Serialize, Deserialize)]
506    struct SignalAction;
507    impl Sealed for SignalAction {}
508    impl ActionDeriv for SignalAction {
509        const TYPE_CODE: TypeCode = TypeCode(102);
510        const SCHEMA_VERSION: u32 = 1;
511    }
512    impl ActionCompute for SignalAction {
513        fn compute(&self, _ctx: &ActionContext) -> Vec<Op> {
514            // SendSignal requires SYSTEM cap (state::authz policy).
515            vec![Op::SendSignal {
516                target: InstanceId::new(1).unwrap(),
517                route: RouteId(1),
518                payload: Bytes::new(),
519            }]
520        }
521    }
522
523    struct CountingObserver {
524        count: Arc<AtomicU32>,
525    }
526    impl KernelObserver for CountingObserver {
527        fn on_event(&self, _event: &KernelEvent) {
528            self.count.fetch_add(1, Ordering::SeqCst);
529        }
530    }
531
532    struct PanicObserver;
533    impl KernelObserver for PanicObserver {
534        fn on_event(&self, _event: &KernelEvent) {
535            panic!("observer intentional panic");
536        }
537    }
538
539    #[test]
540    fn create_instance_returns_monotonic_ids() {
541        let mut k = Kernel::new();
542        let i1 = k.create_instance(InstanceConfig::default());
543        let i2 = k.create_instance(InstanceConfig::default());
544        let i3 = k.create_instance(InstanceConfig::default());
545        assert!(i1 < i2);
546        assert!(i2 < i3);
547        assert_eq!(i1.get(), 1);
548        assert_eq!(i3.get(), 3);
549        assert_eq!(k.instances_len(), 3);
550    }
551
552    #[test]
553    fn submit_unknown_instance_returns_error() {
554        let mut k = Kernel::new();
555        let bogus = InstanceId::new(99).unwrap();
556        let result = k.submit(
557            bogus,
558            Principal::System,
559            None,
560            Tick(0),
561            TypeCode(100),
562            Vec::new(),
563        );
564        assert!(matches!(result, Err(ArkheError::InstanceNotFound)));
565    }
566
567    #[test]
568    fn submit_then_step_executes_action_and_spawns_entity() {
569        let mut k = Kernel::new();
570        k.register_action::<SpawnOneAction>();
571        let inst = k.create_instance(InstanceConfig::default());
572        k.submit(
573            inst,
574            Principal::System,
575            None,
576            Tick(0),
577            TypeCode(100),
578            Vec::new(),
579        )
580        .unwrap();
581
582        let report = k.step(Tick(5), CapabilityMask::SYSTEM);
583        assert_eq!(report.actions_executed, 1);
584        assert_eq!(report.effects_applied, 1);
585        assert_eq!(report.effects_denied, 0);
586        // Entity with id=42 added via SpawnEntity Op
587        let inst_ref = k.instances.get(&inst).unwrap();
588        assert_eq!(inst_ref.entities_len(), 1);
589    }
590
591    #[test]
592    fn step_with_unknown_type_code_skips_action() {
593        let mut k = Kernel::new();
594        // Don't register SpawnOneAction — submit with its type_code.
595        let inst = k.create_instance(InstanceConfig::default());
596        k.submit(
597            inst,
598            Principal::System,
599            None,
600            Tick(0),
601            TypeCode(999),
602            Vec::new(),
603        )
604        .unwrap();
605
606        let report = k.step(Tick(5), CapabilityMask::SYSTEM);
607        assert_eq!(report.actions_executed, 1);
608        assert_eq!(report.effects_applied, 0);
609        assert_eq!(k.instances.get(&inst).unwrap().entities_len(), 0);
610    }
611
612    #[test]
613    fn observer_receives_action_executed_event() {
614        let mut k = Kernel::new();
615        k.register_action::<SpawnOneAction>();
616        let count = Arc::new(AtomicU32::new(0));
617        let _h = k.register_observer(Box::new(CountingObserver {
618            count: count.clone(),
619        }));
620        let inst = k.create_instance(InstanceConfig::default());
621        k.submit(
622            inst,
623            Principal::System,
624            None,
625            Tick(0),
626            TypeCode(100),
627            Vec::new(),
628        )
629        .unwrap();
630        k.step(Tick(5), CapabilityMask::SYSTEM);
631        // Observer received ActionExecuted (1 event from this Spawn — no DomainEventEmitted).
632        assert_eq!(count.load(Ordering::SeqCst), 1);
633    }
634
635    #[test]
636    fn observer_receives_domain_event_emitted() {
637        let mut k = Kernel::new();
638        k.register_action::<EmitAction>();
639        let count = Arc::new(AtomicU32::new(0));
640        k.register_observer(Box::new(CountingObserver {
641            count: count.clone(),
642        }));
643        let inst = k.create_instance(InstanceConfig::default());
644        k.submit(
645            inst,
646            Principal::System,
647            None,
648            Tick(0),
649            TypeCode(101),
650            Vec::new(),
651        )
652        .unwrap();
653        let report = k.step(Tick(5), CapabilityMask::SYSTEM);
654        assert_eq!(report.domain_events_emitted, 1);
655        // DomainEventEmitted + ActionExecuted = 2.
656        assert_eq!(count.load(Ordering::SeqCst), 2);
657    }
658
659    #[test]
660    fn panic_observer_evicted_after_first_event() {
661        let mut k = Kernel::new();
662        k.register_action::<SpawnOneAction>();
663        let h = k.register_observer(Box::new(PanicObserver));
664        let inst = k.create_instance(InstanceConfig::default());
665        k.submit(
666            inst,
667            Principal::System,
668            None,
669            Tick(0),
670            TypeCode(100),
671            Vec::new(),
672        )
673        .unwrap();
674        let report = k.step(Tick(5), CapabilityMask::SYSTEM);
675        assert!(report.observers_evicted >= 1);
676        assert!(k.observers.is_evicted(h));
677    }
678
679    #[test]
680    fn unauthenticated_principal_denies_all_effects() {
681        let mut k = Kernel::new();
682        k.register_action::<SpawnOneAction>();
683        let inst = k.create_instance(InstanceConfig::default());
684        k.submit(
685            inst,
686            Principal::Unauthenticated,
687            None,
688            Tick(0),
689            TypeCode(100),
690            Vec::new(),
691        )
692        .unwrap();
693        let report = k.step(Tick(5), CapabilityMask::SYSTEM);
694        assert_eq!(report.effects_denied, 1);
695        assert_eq!(report.effects_applied, 0);
696        // Stage discarded; entity not spawned.
697        assert_eq!(k.instances.get(&inst).unwrap().entities_len(), 0);
698    }
699
700    #[test]
701    fn external_without_system_cap_denies_signal() {
702        let mut k = Kernel::new();
703        k.register_action::<SignalAction>();
704        let inst = k.create_instance(InstanceConfig::default());
705        k.submit(
706            inst,
707            Principal::External(ExternalId(7)),
708            None,
709            Tick(0),
710            TypeCode(102),
711            Vec::new(),
712        )
713        .unwrap();
714        let report = k.step(Tick(5), CapabilityMask::default());
715        assert_eq!(report.effects_denied, 1);
716        assert_eq!(report.effects_applied, 0);
717    }
718
719    #[test]
720    fn wal_attached_kernel_records_committed_step() {
721        let mut k = Kernel::new_with_wal([7u8; 32], [3u8; 32]);
722        k.register_action::<SpawnOneAction>();
723        let inst = k.create_instance(InstanceConfig::default());
724        k.submit(
725            inst,
726            Principal::System,
727            None,
728            Tick(0),
729            TypeCode(100),
730            Vec::new(),
731        )
732        .unwrap();
733        assert_eq!(k.wal_record_count(), Some(0));
734        let pre_tip = k.wal_chain_tip().unwrap();
735        k.step(Tick(5), CapabilityMask::SYSTEM);
736        assert_eq!(k.wal_record_count(), Some(1));
737        let post_tip = k.wal_chain_tip().unwrap();
738        assert_ne!(pre_tip, post_tip);
739    }
740
741    #[test]
742    fn wal_kernel_export_then_verify_chain() {
743        let mut k = Kernel::new_with_wal([11u8; 32], [0u8; 32]);
744        k.register_action::<SpawnOneAction>();
745        let inst = k.create_instance(InstanceConfig::default());
746        for _ in 0..3 {
747            k.submit(
748                inst,
749                Principal::System,
750                None,
751                Tick(0),
752                TypeCode(100),
753                Vec::new(),
754            )
755            .unwrap();
756            k.step(Tick(0), CapabilityMask::SYSTEM);
757        }
758        let wal = k.export_wal().expect("wal attached");
759        assert_eq!(wal.records.len(), 3);
760        wal.verify_chain([11u8; 32]).expect("chain verifies");
761    }
762
763    #[test]
764    fn replay_reconstructs_chain_tip() {
765        use crate::persist::replay_into;
766        // Original kernel: write WAL with several committed steps.
767        let mut k1 = Kernel::new_with_wal([42u8; 32], [0u8; 32]);
768        k1.register_action::<SpawnOneAction>();
769        let i1 = k1.create_instance(InstanceConfig::default());
770        for _ in 0..4 {
771            k1.submit(
772                i1,
773                Principal::System,
774                None,
775                Tick(0),
776                TypeCode(100),
777                Vec::new(),
778            )
779            .unwrap();
780            k1.step(Tick(0), CapabilityMask::SYSTEM);
781        }
782        let original_tip = k1.wal_chain_tip().unwrap();
783        let wal = k1.export_wal().unwrap();
784
785        // Reconstructed kernel: same WAL → same chain tip after replay.
786        let mut k2 = Kernel::new_with_wal([42u8; 32], [0u8; 32]);
787        k2.register_action::<SpawnOneAction>();
788        // Caller pre-creates instances; the integrated path is
789        // `Kernel::from_snapshot` (persist::snapshot).
790        let _i2 = k2.create_instance(InstanceConfig::default());
791        let report = replay_into(&mut k2, &wal).expect("replay ok");
792        assert_eq!(report.records_replayed, 4);
793        let replayed_tip = k2.wal_chain_tip().unwrap();
794        assert_eq!(replayed_tip, original_tip);
795        assert_eq!(report.final_chain_tip, original_tip);
796    }
797
798    #[test]
799    fn step_processes_instances_in_ascending_order() {
800        // Two instances; both submit a SpawnOneAction. After step,
801        // both should have an entity. Per A23, processing order is
802        // InstanceId ascending — observable via ActionExecuted event order.
803        let mut k = Kernel::new();
804        k.register_action::<SpawnOneAction>();
805        let i1 = k.create_instance(InstanceConfig::default());
806        let i2 = k.create_instance(InstanceConfig::default());
807        k.submit(
808            i2,
809            Principal::System,
810            None,
811            Tick(0),
812            TypeCode(100),
813            Vec::new(),
814        )
815        .unwrap();
816        k.submit(
817            i1,
818            Principal::System,
819            None,
820            Tick(0),
821            TypeCode(100),
822            Vec::new(),
823        )
824        .unwrap();
825        let report = k.step(Tick(5), CapabilityMask::SYSTEM);
826        assert_eq!(report.actions_executed, 2);
827        assert_eq!(report.effects_applied, 2);
828    }
829
830    #[test]
831    fn stats_aggregate_reflects_instances_and_scheduler() {
832        let mut k = Kernel::new();
833        k.register_action::<SpawnOneAction>();
834        assert_eq!(k.stats(), Stats::default());
835
836        let i1 = k.create_instance(InstanceConfig::default());
837        let i2 = k.create_instance(InstanceConfig::default());
838        let stats_pre = k.stats();
839        assert_eq!(stats_pre.instance_count, 2);
840        assert_eq!(stats_pre.scheduled_action_count, 0);
841        assert_eq!(stats_pre.entity_count, 0);
842
843        k.submit(
844            i1,
845            Principal::System,
846            None,
847            Tick(0),
848            TypeCode(100),
849            Vec::new(),
850        )
851        .unwrap();
852        k.submit(
853            i2,
854            Principal::System,
855            None,
856            Tick(0),
857            TypeCode(100),
858            Vec::new(),
859        )
860        .unwrap();
861        let stats_queued = k.stats();
862        assert_eq!(stats_queued.scheduled_action_count, 2);
863
864        let _ = k.step(Tick(1), CapabilityMask::SYSTEM);
865        let stats_post = k.stats();
866        assert_eq!(stats_post.scheduled_action_count, 0);
867        assert_eq!(stats_post.entity_count, 2);
868    }
869
870    #[test]
871    fn stats_counts_observers() {
872        struct NullObs;
873        impl KernelObserver for NullObs {
874            fn on_event(&self, _e: &KernelEvent) {}
875        }
876
877        let mut k = Kernel::new();
878        k.register_observer(Box::new(NullObs));
879        k.register_observer(Box::new(NullObs));
880        assert_eq!(k.stats().observer_count, 2);
881    }
882
883    #[test]
884    fn stats_wal_record_count_reflects_writer() {
885        let mut k = Kernel::new_with_wal([1u8; 32], [0u8; 32]);
886        k.register_action::<SpawnOneAction>();
887        assert_eq!(k.stats().wal_record_count, 0);
888
889        let i = k.create_instance(InstanceConfig::default());
890        k.submit(
891            i,
892            Principal::System,
893            None,
894            Tick(0),
895            TypeCode(100),
896            Vec::new(),
897        )
898        .unwrap();
899        let _ = k.step(Tick(1), CapabilityMask::SYSTEM);
900        assert_eq!(k.stats().wal_record_count, 1);
901    }
902
903    // ---- force_unload ----
904
905    /// Observer that records every `ModuleForceUnloaded` event it sees.
906    struct ForceUnloadCapture {
907        seen: Arc<std::sync::Mutex<Vec<(RouteId, u32)>>>,
908    }
909    impl KernelObserver for ForceUnloadCapture {
910        fn on_event(&self, event: &KernelEvent) {
911            if let KernelEvent::ModuleForceUnloaded {
912                route_id,
913                live_refs_at_unload,
914            } = event
915            {
916                self.seen
917                    .lock()
918                    .unwrap()
919                    .push((*route_id, *live_refs_at_unload));
920            }
921        }
922    }
923
924    #[test]
925    fn force_unload_without_cap_denied() {
926        let mut k = Kernel::new();
927        let result = k.force_unload(RouteId(1), CapabilityMask::default());
928        assert!(matches!(result, Err(ArkheError::CapabilityDenied)));
929    }
930
931    #[test]
932    fn force_unload_removes_inflight_refs() {
933        let mut k = Kernel::new();
934        k.register_action::<SignalAction>();
935        let inst = k.create_instance(InstanceConfig::default());
936        // SignalAction emits Op::SendSignal { route: RouteId(1) } — needs SYSTEM
937        // cap to pass authorize, then dispatch increments inflight_refs[RouteId(1)].
938        k.submit(
939            inst,
940            Principal::System,
941            None,
942            Tick(0),
943            TypeCode(102),
944            Vec::new(),
945        )
946        .unwrap();
947        let report = k.step(Tick(0), CapabilityMask::SYSTEM);
948        assert_eq!(report.effects_applied, 1);
949        assert_eq!(
950            k.instances
951                .get(&inst)
952                .unwrap()
953                .inflight_refs_for(RouteId(1)),
954            1
955        );
956
957        let dropped = k
958            .force_unload(RouteId(1), CapabilityMask::ADMIN_UNLOAD)
959            .expect("admin_unload caps");
960        assert_eq!(dropped, 1);
961        assert_eq!(
962            k.instances
963                .get(&inst)
964                .unwrap()
965                .inflight_refs_for(RouteId(1)),
966            0
967        );
968        assert_eq!(k.instances.get(&inst).unwrap().inflight_refs_len(), 0);
969    }
970
971    #[test]
972    fn force_unload_emits_module_unloaded_event() {
973        let mut k = Kernel::new();
974        k.register_action::<SignalAction>();
975        let seen = Arc::new(std::sync::Mutex::new(Vec::new()));
976        k.register_observer(Box::new(ForceUnloadCapture { seen: seen.clone() }));
977        let inst = k.create_instance(InstanceConfig::default());
978        k.submit(
979            inst,
980            Principal::System,
981            None,
982            Tick(0),
983            TypeCode(102),
984            Vec::new(),
985        )
986        .unwrap();
987        let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
988
989        k.force_unload(RouteId(1), CapabilityMask::ADMIN_UNLOAD)
990            .expect("admin_unload caps");
991
992        let captured = seen.lock().unwrap().clone();
993        assert_eq!(captured, vec![(RouteId(1), 1)]);
994    }
995
996    #[test]
997    fn force_unload_no_live_refs_returns_zero() {
998        let mut k = Kernel::new();
999        let _ = k.create_instance(InstanceConfig::default());
1000        let dropped = k
1001            .force_unload(RouteId(99), CapabilityMask::ADMIN_UNLOAD)
1002            .expect("admin_unload caps");
1003        assert_eq!(dropped, 0);
1004    }
1005
1006    // ---- memory_budget_bytes enforcement (A21) ----
1007
1008    /// Test action: spawns entity `entity_id` and attaches one
1009    /// `SetComponent` of `size` bytes. The ledger tracks bytes only for
1010    /// registered entities, so the spawn must precede the set; this
1011    /// action emits both in one compute() — production-realistic.
1012    #[derive(Serialize, Deserialize)]
1013    struct SetCompAction {
1014        size: u64,
1015        entity_id: u64,
1016    }
1017    impl Sealed for SetCompAction {}
1018    impl ActionDeriv for SetCompAction {
1019        const TYPE_CODE: TypeCode = TypeCode(200);
1020        const SCHEMA_VERSION: u32 = 1;
1021    }
1022    impl ActionCompute for SetCompAction {
1023        fn compute(&self, _ctx: &ActionContext) -> Vec<Op> {
1024            let entity = EntityId::new(self.entity_id).unwrap();
1025            vec![
1026                Op::SpawnEntity {
1027                    id: entity,
1028                    owner: Principal::System,
1029                },
1030                Op::SetComponent {
1031                    entity,
1032                    type_code: TypeCode(7),
1033                    bytes: Bytes::from(vec![0u8; self.size as usize]),
1034                    size: self.size,
1035                },
1036            ]
1037        }
1038    }
1039
1040    /// Test action: spawns entities 1 and 2, then attaches one
1041    /// `SetComponent` of size `a` to entity 1 and one of size `b` to
1042    /// entity 2 (4 ops total).
1043    #[derive(Serialize, Deserialize)]
1044    struct TwoSetCompAction {
1045        a: u64,
1046        b: u64,
1047    }
1048    impl Sealed for TwoSetCompAction {}
1049    impl ActionDeriv for TwoSetCompAction {
1050        const TYPE_CODE: TypeCode = TypeCode(201);
1051        const SCHEMA_VERSION: u32 = 1;
1052    }
1053    impl ActionCompute for TwoSetCompAction {
1054        fn compute(&self, _ctx: &ActionContext) -> Vec<Op> {
1055            let e1 = EntityId::new(1).unwrap();
1056            let e2 = EntityId::new(2).unwrap();
1057            vec![
1058                Op::SpawnEntity {
1059                    id: e1,
1060                    owner: Principal::System,
1061                },
1062                Op::SpawnEntity {
1063                    id: e2,
1064                    owner: Principal::System,
1065                },
1066                Op::SetComponent {
1067                    entity: e1,
1068                    type_code: TypeCode(7),
1069                    bytes: Bytes::from(vec![0u8; self.a as usize]),
1070                    size: self.a,
1071                },
1072                Op::SetComponent {
1073                    entity: e2,
1074                    type_code: TypeCode(7),
1075                    bytes: Bytes::from(vec![0u8; self.b as usize]),
1076                    size: self.b,
1077                },
1078            ]
1079        }
1080    }
1081
1082    fn cfg_with_budget(budget: u64) -> InstanceConfig {
1083        InstanceConfig {
1084            memory_budget_bytes: budget,
1085            ..Default::default()
1086        }
1087    }
1088
1089    fn submit_set(k: &mut Kernel, inst: InstanceId, size: u64, entity_id: u64) {
1090        let action = SetCompAction { size, entity_id };
1091        let bytes = Action::canonical_bytes(&action);
1092        k.submit(
1093            inst,
1094            Principal::System,
1095            None,
1096            Tick(0),
1097            SetCompAction::TYPE_CODE,
1098            bytes,
1099        )
1100        .expect("submit ok");
1101    }
1102
1103    #[test]
1104    fn budget_zero_allows_unlimited() {
1105        // Default config has memory_budget_bytes = 0 → no enforcement.
1106        // SetCompAction emits Spawn + SetComponent (2 ops).
1107        let mut k = Kernel::new();
1108        k.register_action::<SetCompAction>();
1109        let inst = k.create_instance(InstanceConfig::default());
1110        submit_set(&mut k, inst, 1_000_000, 1);
1111        let report = k.step(Tick(0), CapabilityMask::SYSTEM);
1112        assert_eq!(report.effects_applied, 2);
1113        assert_eq!(report.effects_denied, 0);
1114        assert_eq!(k.instances.get(&inst).unwrap().components_len(), 1);
1115    }
1116
1117    #[test]
1118    fn budget_exceeded_denies_op() {
1119        // budget=100; Spawn passes (size 0), SetComponent denied (500 > 100).
1120        // Per-Op deny — Spawn still applies, no rollback (any_denied=false).
1121        let mut k = Kernel::new();
1122        k.register_action::<SetCompAction>();
1123        let inst = k.create_instance(cfg_with_budget(100));
1124        submit_set(&mut k, inst, 500, 1);
1125        let report = k.step(Tick(0), CapabilityMask::SYSTEM);
1126        assert_eq!(report.effects_applied, 1); // Spawn only
1127        assert_eq!(report.effects_denied, 1); // SetComponent
1128        assert_eq!(report.actions_executed, 1);
1129        assert_eq!(k.instances.get(&inst).unwrap().entities_len(), 1);
1130        assert_eq!(k.instances.get(&inst).unwrap().components_len(), 0);
1131    }
1132
1133    #[test]
1134    fn budget_at_edge_allows_equal() {
1135        // budget=500, projected = 0 + 0 (Spawn) + 500 (Set) = 500.
1136        // 500 == budget is allowed (only `>` denies).
1137        let mut k = Kernel::new();
1138        k.register_action::<SetCompAction>();
1139        let inst = k.create_instance(cfg_with_budget(500));
1140        submit_set(&mut k, inst, 500, 1);
1141        let report = k.step(Tick(0), CapabilityMask::SYSTEM);
1142        assert_eq!(report.effects_applied, 2);
1143        assert_eq!(report.effects_denied, 0);
1144        assert_eq!(k.instances.get(&inst).unwrap().components_len(), 1);
1145        assert_eq!(k.instances.get(&inst).unwrap().ledger().total_bytes(), 500);
1146    }
1147
1148    #[test]
1149    fn multi_op_stage_respects_running_delta() {
1150        // budget=600. TwoSetCompAction emits: Spawn(1), Spawn(2),
1151        // SetComp(1, size=400), SetComp(2, size=400).
1152        // Spawns fit (size 0). SetComp(1): projected=0+0+400=400 → allow.
1153        // SetComp(2): projected=0+400+400=800 > 600 → deny.
1154        // 3 applied, 1 denied; entity 2 spawned but uncomponented.
1155        let mut k = Kernel::new();
1156        k.register_action::<TwoSetCompAction>();
1157        let inst = k.create_instance(cfg_with_budget(600));
1158        let action = TwoSetCompAction { a: 400, b: 400 };
1159        let bytes = Action::canonical_bytes(&action);
1160        k.submit(
1161            inst,
1162            Principal::System,
1163            None,
1164            Tick(0),
1165            TwoSetCompAction::TYPE_CODE,
1166            bytes,
1167        )
1168        .unwrap();
1169        let report = k.step(Tick(0), CapabilityMask::SYSTEM);
1170        assert_eq!(report.effects_applied, 3);
1171        assert_eq!(report.effects_denied, 1);
1172        assert_eq!(k.instances.get(&inst).unwrap().entities_len(), 2);
1173        assert_eq!(k.instances.get(&inst).unwrap().components_len(), 1);
1174        assert_eq!(k.instances.get(&inst).unwrap().ledger().total_bytes(), 400);
1175    }
1176
1177    /// Observer that records every `EffectFailed` reason it sees.
1178    struct EffectFailedCapture {
1179        seen: Arc<std::sync::Mutex<Vec<Bytes>>>,
1180    }
1181    impl KernelObserver for EffectFailedCapture {
1182        fn on_event(&self, event: &KernelEvent) {
1183            if let KernelEvent::EffectFailed { reason, .. } = event {
1184                self.seen.lock().unwrap().push(reason.clone());
1185            }
1186        }
1187    }
1188
1189    #[test]
1190    fn effect_failed_event_on_budget_deny() {
1191        let mut k = Kernel::new();
1192        k.register_action::<SetCompAction>();
1193        let seen = Arc::new(std::sync::Mutex::new(Vec::new()));
1194        k.register_observer(Box::new(EffectFailedCapture { seen: seen.clone() }));
1195        let inst = k.create_instance(cfg_with_budget(100));
1196        submit_set(&mut k, inst, 500, 1);
1197        let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1198
1199        let captured = seen.lock().unwrap().clone();
1200        assert_eq!(captured.len(), 1);
1201        assert_eq!(captured[0].as_ref(), b"budget_exceeded");
1202    }
1203
1204    // ---- EventMask filter ----
1205
1206    use crate::runtime::event::EventMask;
1207
1208    /// Per-variant counter observer — used to verify that only the
1209    /// expected variant arms increment.
1210    #[derive(Default)]
1211    struct VariantCounters {
1212        action_executed: AtomicU32,
1213        action_failed: AtomicU32,
1214        domain_event: AtomicU32,
1215        effect_failed: AtomicU32,
1216        other: AtomicU32,
1217    }
1218
1219    struct VariantTallyObserver {
1220        counters: Arc<VariantCounters>,
1221    }
1222    impl KernelObserver for VariantTallyObserver {
1223        fn on_event(&self, event: &KernelEvent) {
1224            match event {
1225                KernelEvent::ActionExecuted { .. } => {
1226                    self.counters.action_executed.fetch_add(1, Ordering::SeqCst);
1227                }
1228                KernelEvent::ActionFailed { .. } => {
1229                    self.counters.action_failed.fetch_add(1, Ordering::SeqCst);
1230                }
1231                KernelEvent::DomainEventEmitted { .. } => {
1232                    self.counters.domain_event.fetch_add(1, Ordering::SeqCst);
1233                }
1234                KernelEvent::EffectFailed { .. } => {
1235                    self.counters.effect_failed.fetch_add(1, Ordering::SeqCst);
1236                }
1237                KernelEvent::ObserverPanic { .. }
1238                | KernelEvent::ObserverEvicted { .. }
1239                | KernelEvent::SignalDropped { .. }
1240                | KernelEvent::ModuleForceUnloaded { .. }
1241                | KernelEvent::ActionDeferredToNextTick { .. }
1242                | KernelEvent::ObserversFlushed { .. } => {
1243                    self.counters.other.fetch_add(1, Ordering::SeqCst);
1244                }
1245            }
1246        }
1247    }
1248
1249    #[test]
1250    fn event_mask_default_is_all() {
1251        let m = EventMask::default();
1252        assert_eq!(m, EventMask::ALL);
1253        assert!(m.contains(EventMask::ACTION_EXECUTED));
1254        assert!(m.contains(EventMask::DOMAIN_EVENT_EMITTED));
1255        assert!(m.contains(EventMask::MODULE_FORCE_UNLOADED));
1256    }
1257
1258    #[test]
1259    fn register_observer_backward_compat_receives_all() {
1260        // EmitAction yields a DomainEventEmitted + an ActionExecuted —
1261        // a default-mask observer must see both.
1262        let mut k = Kernel::new();
1263        k.register_action::<EmitAction>();
1264        let counters = Arc::new(VariantCounters::default());
1265        k.register_observer(Box::new(VariantTallyObserver {
1266            counters: counters.clone(),
1267        }));
1268        let inst = k.create_instance(InstanceConfig::default());
1269        k.submit(
1270            inst,
1271            Principal::System,
1272            None,
1273            Tick(0),
1274            TypeCode(101),
1275            Vec::new(),
1276        )
1277        .unwrap();
1278        let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1279        assert_eq!(counters.action_executed.load(Ordering::SeqCst), 1);
1280        assert_eq!(counters.domain_event.load(Ordering::SeqCst), 1);
1281    }
1282
1283    #[test]
1284    fn filter_only_action_executed() {
1285        // Mask = ACTION_EXECUTED only — DomainEventEmitted should be muted.
1286        let mut k = Kernel::new();
1287        k.register_action::<EmitAction>();
1288        let counters = Arc::new(VariantCounters::default());
1289        k.register_observer_filtered(
1290            Box::new(VariantTallyObserver {
1291                counters: counters.clone(),
1292            }),
1293            EventMask::ACTION_EXECUTED,
1294        );
1295        let inst = k.create_instance(InstanceConfig::default());
1296        k.submit(
1297            inst,
1298            Principal::System,
1299            None,
1300            Tick(0),
1301            TypeCode(101),
1302            Vec::new(),
1303        )
1304        .unwrap();
1305        let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1306        assert_eq!(counters.action_executed.load(Ordering::SeqCst), 1);
1307        assert_eq!(counters.domain_event.load(Ordering::SeqCst), 0);
1308    }
1309
1310    #[test]
1311    fn filter_domain_event_only() {
1312        // Mask = DOMAIN_EVENT_EMITTED only — ActionExecuted should be muted.
1313        let mut k = Kernel::new();
1314        k.register_action::<EmitAction>();
1315        let counters = Arc::new(VariantCounters::default());
1316        k.register_observer_filtered(
1317            Box::new(VariantTallyObserver {
1318                counters: counters.clone(),
1319            }),
1320            EventMask::DOMAIN_EVENT_EMITTED,
1321        );
1322        let inst = k.create_instance(InstanceConfig::default());
1323        k.submit(
1324            inst,
1325            Principal::System,
1326            None,
1327            Tick(0),
1328            TypeCode(101),
1329            Vec::new(),
1330        )
1331        .unwrap();
1332        let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1333        assert_eq!(counters.action_executed.load(Ordering::SeqCst), 0);
1334        assert_eq!(counters.domain_event.load(Ordering::SeqCst), 1);
1335    }
1336
1337    #[test]
1338    fn multiple_observers_independent_masks() {
1339        // obs_a wants ACTION_EXECUTED, obs_b wants DOMAIN_EVENT_EMITTED.
1340        // After one EmitAction step, each observer sees exactly its slice.
1341        let mut k = Kernel::new();
1342        k.register_action::<EmitAction>();
1343        let ca = Arc::new(VariantCounters::default());
1344        let cb = Arc::new(VariantCounters::default());
1345        k.register_observer_filtered(
1346            Box::new(VariantTallyObserver {
1347                counters: ca.clone(),
1348            }),
1349            EventMask::ACTION_EXECUTED,
1350        );
1351        k.register_observer_filtered(
1352            Box::new(VariantTallyObserver {
1353                counters: cb.clone(),
1354            }),
1355            EventMask::DOMAIN_EVENT_EMITTED,
1356        );
1357        let inst = k.create_instance(InstanceConfig::default());
1358        k.submit(
1359            inst,
1360            Principal::System,
1361            None,
1362            Tick(0),
1363            TypeCode(101),
1364            Vec::new(),
1365        )
1366        .unwrap();
1367        let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1368        assert_eq!(ca.action_executed.load(Ordering::SeqCst), 1);
1369        assert_eq!(ca.domain_event.load(Ordering::SeqCst), 0);
1370        assert_eq!(cb.action_executed.load(Ordering::SeqCst), 0);
1371        assert_eq!(cb.domain_event.load(Ordering::SeqCst), 1);
1372    }
1373
1374    #[test]
1375    fn filter_empty_mask_receives_nothing() {
1376        // EventMask::empty() — observer is registered but receives zero events.
1377        let mut k = Kernel::new();
1378        k.register_action::<EmitAction>();
1379        let counters = Arc::new(VariantCounters::default());
1380        k.register_observer_filtered(
1381            Box::new(VariantTallyObserver {
1382                counters: counters.clone(),
1383            }),
1384            EventMask::empty(),
1385        );
1386        let inst = k.create_instance(InstanceConfig::default());
1387        k.submit(
1388            inst,
1389            Principal::System,
1390            None,
1391            Tick(0),
1392            TypeCode(101),
1393            Vec::new(),
1394        )
1395        .unwrap();
1396        let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1397        assert_eq!(counters.action_executed.load(Ordering::SeqCst), 0);
1398        assert_eq!(counters.domain_event.load(Ordering::SeqCst), 0);
1399        assert_eq!(counters.action_failed.load(Ordering::SeqCst), 0);
1400        assert_eq!(counters.effect_failed.load(Ordering::SeqCst), 0);
1401        assert_eq!(counters.other.load(Ordering::SeqCst), 0);
1402    }
1403}