1use std::collections::BTreeMap;
8
9use crate::abi::{ArkheError, CapabilityMask, EntityId, InstanceId, Principal, Tick, TypeCode};
10use crate::state::authz::authorize;
11use crate::state::{
12 Action, ActionContext, Effect, Instance, InstanceConfig, ScheduledActionId, Unverified,
13};
14
15use super::apply::{apply_stage, discard_stage};
16use super::dispatch::dispatch;
17use super::event::{KernelEvent, ObserverHandle};
18use super::observer::{KernelObserver, ObserverRegistry};
19use super::registry::ActionRegistry;
20use super::stage::StepStage;
21
22use crate::persist::{AuthDecisionAnnotation, Wal, WalWriter};
23
24pub struct Kernel {
36 instances: BTreeMap<InstanceId, Instance>,
37 action_registry: ActionRegistry,
38 observers: ObserverRegistry,
39 next_instance_id: u64,
40 wal: Option<WalWriter>,
41}
42
43#[derive(Debug, Clone, Default, PartialEq, Eq)]
45pub struct StepReport {
46 pub actions_executed: u32,
48 pub effects_applied: u32,
50 pub effects_denied: u32,
52 pub observers_evicted: u32,
54 pub domain_events_emitted: u32,
56}
57
58#[derive(Debug, Clone, Default, PartialEq, Eq)]
60pub struct Stats {
61 pub instance_count: usize,
63 pub scheduled_action_count: usize,
65 pub entity_count: u32,
67 pub component_byte_count: u64,
69 pub observer_count: usize,
71 pub wal_record_count: usize,
73}
74
75impl Default for Kernel {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81impl Kernel {
82 pub fn new() -> Self {
87 Self {
88 instances: BTreeMap::new(),
89 action_registry: ActionRegistry::new(),
90 observers: ObserverRegistry::new(),
91 next_instance_id: 0,
92 wal: None,
93 }
94 }
95
96 pub fn new_with_wal(world_id: [u8; 32], manifest_digest: [u8; 32]) -> Self {
99 Self {
100 instances: BTreeMap::new(),
101 action_registry: ActionRegistry::new(),
102 observers: ObserverRegistry::new(),
103 next_instance_id: 0,
104 wal: Some(WalWriter::new(world_id, manifest_digest)),
105 }
106 }
107
108 pub fn new_with_wal_signed(
113 world_id: [u8; 32],
114 manifest_digest: [u8; 32],
115 sig_class: crate::persist::SignatureClass,
116 ) -> Self {
117 Self {
118 instances: BTreeMap::new(),
119 action_registry: ActionRegistry::new(),
120 observers: ObserverRegistry::new(),
121 next_instance_id: 0,
122 wal: Some(WalWriter::with_signature(
123 world_id,
124 manifest_digest,
125 sig_class,
126 )),
127 }
128 }
129
130 pub fn wal_chain_tip(&self) -> Option<[u8; 32]> {
132 self.wal.as_ref().map(|w| w.chain_tip())
133 }
134
135 pub fn wal_record_count(&self) -> Option<usize> {
137 self.wal.as_ref().map(|w| w.record_count())
138 }
139
140 pub fn export_wal(self) -> Option<Wal> {
142 self.wal.map(Wal::from_writer)
143 }
144
145 pub fn register_action<A: Action>(&mut self) {
149 self.action_registry.register::<A>();
150 }
151
152 pub fn register_observer(&mut self, obs: Box<dyn KernelObserver>) -> ObserverHandle {
156 self.observers.register(obs)
157 }
158
159 pub fn register_observer_filtered(
165 &mut self,
166 obs: Box<dyn KernelObserver>,
167 mask: super::event::EventMask,
168 ) -> ObserverHandle {
169 self.observers.register_filtered(obs, mask)
170 }
171
172 pub fn create_instance(&mut self, config: InstanceConfig) -> InstanceId {
175 self.next_instance_id = self.next_instance_id.saturating_add(1);
176 let id = InstanceId::new(self.next_instance_id).expect("instance id > 0");
177 self.instances.insert(id, Instance::new(id, config));
178 id
179 }
180
181 pub fn instances_len(&self) -> usize {
183 self.instances.len()
184 }
185
186 pub fn instance_view(&self, id: InstanceId) -> Option<super::view::InstanceView<'_>> {
190 self.instances
191 .get(&id)
192 .map(|instance| super::view::InstanceView { instance })
193 }
194
195 pub fn snapshot(&self) -> crate::persist::KernelSnapshot {
200 let instances = self
201 .instances
202 .iter()
203 .map(|(id, inst)| (*id, inst.to_snapshot()))
204 .collect();
205 crate::persist::KernelSnapshot::__construct(instances, self.next_instance_id)
206 }
207
208 pub fn from_snapshot(snap: crate::persist::KernelSnapshot) -> Self {
212 let (instances_in, next_instance_id) = snap.__into_parts();
213 let instances = instances_in
214 .into_iter()
215 .map(|(id, s)| (id, Instance::from_snapshot(s)))
216 .collect();
217 Self {
218 instances,
219 action_registry: ActionRegistry::new(),
220 observers: ObserverRegistry::new(),
221 next_instance_id,
222 wal: None,
223 }
224 }
225
226 pub fn stats(&self) -> Stats {
228 let mut scheduled = 0usize;
229 let mut entities = 0u32;
230 let mut bytes = 0u64;
231 for inst in self.instances.values() {
232 scheduled = scheduled.saturating_add(inst.scheduler().len());
233 entities = entities.saturating_add(inst.ledger().total_entities());
234 bytes = bytes.saturating_add(inst.ledger().total_bytes());
235 }
236 Stats {
237 instance_count: self.instances.len(),
238 scheduled_action_count: scheduled,
239 entity_count: entities,
240 component_byte_count: bytes,
241 observer_count: self.observers.len(),
242 wal_record_count: self.wal.as_ref().map(|w| w.record_count()).unwrap_or(0),
243 }
244 }
245
246 pub fn force_unload(
253 &mut self,
254 route_id: crate::abi::RouteId,
255 caps: CapabilityMask,
256 ) -> Result<usize, ArkheError> {
257 if !caps.contains(CapabilityMask::ADMIN_UNLOAD) {
258 return Err(ArkheError::CapabilityDenied);
259 }
260
261 let mut total_live_refs: u32 = 0;
262 for inst in self.instances.values_mut() {
263 if let Some(refs) = inst.inflight_refs_mut().remove(&route_id) {
264 total_live_refs = total_live_refs.saturating_add(refs);
265 }
266 }
267
268 let event = KernelEvent::ModuleForceUnloaded {
269 route_id,
270 live_refs_at_unload: total_live_refs,
271 };
272 let _ = self.observers.deliver(&event);
273
274 Ok(total_live_refs as usize)
275 }
276
277 pub fn submit(
286 &mut self,
287 instance: InstanceId,
288 principal: Principal,
289 actor: Option<EntityId>,
290 at: Tick,
291 action_type_code: TypeCode,
292 action_bytes: Vec<u8>,
293 ) -> Result<ScheduledActionId, ArkheError> {
294 let inst = self
295 .instances
296 .get_mut(&instance)
297 .ok_or(ArkheError::InstanceNotFound)?;
298 let counters = inst.id_counters_mut();
299 counters.next_scheduled = counters.next_scheduled.saturating_add(1);
300 let id = ScheduledActionId::new(counters.next_scheduled).expect("scheduled id > 0");
301 inst.scheduler_mut().schedule_with_id(
302 id,
303 at,
304 actor,
305 principal,
306 action_type_code,
307 action_bytes,
308 );
309 Ok(id)
310 }
311
312 pub fn step(&mut self, now: Tick, caps: CapabilityMask) -> StepReport {
315 let mut report = StepReport::default();
316
317 let instance_ids: Vec<InstanceId> = self.instances.keys().copied().collect();
318 for inst_id in instance_ids {
319 let entry = match self.instances.get_mut(&inst_id) {
320 Some(inst) => inst.scheduler_mut().pop_due(now),
321 None => continue,
322 };
323 let entry = match entry {
324 Some(e) => e,
325 None => continue,
326 };
327 report.actions_executed = report.actions_executed.saturating_add(1);
328
329 let reg = match self.action_registry.get(entry.action_type_code).cloned() {
330 Some(r) => r,
331 None => continue,
332 };
333
334 let action = match (reg.deserializer)(reg.schema_version, &entry.action_bytes) {
335 Ok(a) => a,
336 Err(_) => continue,
337 };
338
339 let inst_ref = self.instances.get(&inst_id).expect("instance present");
340 let ctx = ActionContext::new(entry.actor, now, inst_id, inst_ref);
341 let ops = action.compute_dyn(&ctx);
342
343 let mut stage = StepStage::default();
344 let mut next_scheduled_id = inst_ref.id_counters_snapshot().next_scheduled;
345 let budget = inst_ref.config().memory_budget_bytes;
346 let baseline_bytes: i64 = inst_ref.ledger().total_bytes() as i64;
347 let mut any_denied = false;
348 for op in ops {
349 let principal_clone = match &entry.principal {
350 Principal::Unauthenticated => Principal::Unauthenticated,
351 Principal::External(e) => Principal::External(*e),
352 Principal::System => Principal::System,
353 };
354 let eff: Effect<'_, Unverified> = Effect::new(inst_id, principal_clone, op);
355 match authorize(caps, eff) {
356 Ok(authorized) => {
357 if budget > 0 {
362 let op_size: i64 = match &authorized.op {
363 crate::state::Op::SetComponent { size, .. } => *size as i64,
364 crate::state::Op::RemoveComponent { size, .. } => -(*size as i64),
365 _ => 0,
366 };
367 let projected = baseline_bytes
368 .saturating_add(super::stage::bytes_delta(&stage))
369 .saturating_add(op_size);
370 if projected > budget as i64 {
371 report.effects_denied = report.effects_denied.saturating_add(1);
372 stage.events.push_back(KernelEvent::EffectFailed {
373 instance: inst_id,
374 reason: bytes::Bytes::from_static(b"budget_exceeded"),
375 });
376 continue;
377 }
378 }
379 dispatch(authorized, &mut stage, now, &mut next_scheduled_id);
380 report.effects_applied = report.effects_applied.saturating_add(1);
381 }
382 Err(_) => {
383 report.effects_denied = report.effects_denied.saturating_add(1);
384 any_denied = true;
385 }
386 }
387 }
388
389 if any_denied {
390 let inst_mut = self.instances.get_mut(&inst_id).expect("instance present");
391 discard_stage(inst_mut, stage);
392 continue;
393 }
394
395 let domain_emit_count = stage
398 .events
399 .iter()
400 .filter(|e| matches!(e, KernelEvent::DomainEventEmitted { .. }))
401 .count();
402 report.domain_events_emitted = report
403 .domain_events_emitted
404 .saturating_add(domain_emit_count as u32);
405 let events_to_deliver: Vec<KernelEvent> = stage.events.iter().cloned().collect();
406
407 let wal_stage = if self.wal.is_some() {
409 Some(stage.clone())
410 } else {
411 None
412 };
413 let principal_for_wal = match &entry.principal {
414 Principal::Unauthenticated => Principal::Unauthenticated,
415 Principal::External(e) => Principal::External(*e),
416 Principal::System => Principal::System,
417 };
418 let action_bytes_for_wal = entry.action_bytes.clone();
419 let action_type_for_wal = entry.action_type_code;
420
421 let inst_mut = self.instances.get_mut(&inst_id).expect("instance present");
422 apply_stage(inst_mut, stage);
423
424 if let (Some(wal), Some(s)) = (self.wal.as_mut(), wal_stage) {
425 let _ = wal.append(
426 now,
427 inst_id,
428 principal_for_wal,
429 action_type_for_wal,
430 action_bytes_for_wal,
431 caps.bits(),
432 s,
433 AuthDecisionAnnotation::AllAuthorized,
434 );
435 }
436
437 for event in events_to_deliver {
438 let evicted = self.observers.deliver(&event);
439 report.observers_evicted = report
440 .observers_evicted
441 .saturating_add(evicted.len() as u32);
442 }
443
444 let action_executed = KernelEvent::ActionExecuted {
445 instance: inst_id,
446 action_type: entry.action_type_code,
447 at: now,
448 };
449 let evicted = self.observers.deliver(&action_executed);
450 report.observers_evicted = report
451 .observers_evicted
452 .saturating_add(evicted.len() as u32);
453 }
454
455 report
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use bytes::Bytes;
463 use std::sync::atomic::{AtomicU32, Ordering};
464 use std::sync::Arc;
465
466 use crate::abi::{ExternalId, RouteId};
467 use crate::state::traits::_sealed::Sealed;
468 use crate::state::{ActionCompute, ActionDeriv, Op};
469 use serde::{Deserialize, Serialize};
470
471 #[derive(Serialize, Deserialize)]
473 struct SpawnOneAction;
474 impl Sealed for SpawnOneAction {}
475 impl ActionDeriv for SpawnOneAction {
476 const TYPE_CODE: TypeCode = TypeCode(100);
477 const SCHEMA_VERSION: u32 = 1;
478 }
479 impl ActionCompute for SpawnOneAction {
480 fn compute(&self, _ctx: &ActionContext) -> Vec<Op> {
481 vec![Op::SpawnEntity {
482 id: EntityId::new(42).unwrap(),
483 owner: Principal::System,
484 }]
485 }
486 }
487
488 #[derive(Serialize, Deserialize)]
489 struct EmitAction;
490 impl Sealed for EmitAction {}
491 impl ActionDeriv for EmitAction {
492 const TYPE_CODE: TypeCode = TypeCode(101);
493 const SCHEMA_VERSION: u32 = 1;
494 }
495 impl ActionCompute for EmitAction {
496 fn compute(&self, _ctx: &ActionContext) -> Vec<Op> {
497 vec![Op::EmitEvent {
498 actor: None,
499 event_type_code: TypeCode(7),
500 event_bytes: Bytes::from_static(b"hello"),
501 }]
502 }
503 }
504
505 #[derive(Serialize, Deserialize)]
506 struct SignalAction;
507 impl Sealed for SignalAction {}
508 impl ActionDeriv for SignalAction {
509 const TYPE_CODE: TypeCode = TypeCode(102);
510 const SCHEMA_VERSION: u32 = 1;
511 }
512 impl ActionCompute for SignalAction {
513 fn compute(&self, _ctx: &ActionContext) -> Vec<Op> {
514 vec![Op::SendSignal {
516 target: InstanceId::new(1).unwrap(),
517 route: RouteId(1),
518 payload: Bytes::new(),
519 }]
520 }
521 }
522
523 struct CountingObserver {
524 count: Arc<AtomicU32>,
525 }
526 impl KernelObserver for CountingObserver {
527 fn on_event(&self, _event: &KernelEvent) {
528 self.count.fetch_add(1, Ordering::SeqCst);
529 }
530 }
531
532 struct PanicObserver;
533 impl KernelObserver for PanicObserver {
534 fn on_event(&self, _event: &KernelEvent) {
535 panic!("observer intentional panic");
536 }
537 }
538
539 #[test]
540 fn create_instance_returns_monotonic_ids() {
541 let mut k = Kernel::new();
542 let i1 = k.create_instance(InstanceConfig::default());
543 let i2 = k.create_instance(InstanceConfig::default());
544 let i3 = k.create_instance(InstanceConfig::default());
545 assert!(i1 < i2);
546 assert!(i2 < i3);
547 assert_eq!(i1.get(), 1);
548 assert_eq!(i3.get(), 3);
549 assert_eq!(k.instances_len(), 3);
550 }
551
552 #[test]
553 fn submit_unknown_instance_returns_error() {
554 let mut k = Kernel::new();
555 let bogus = InstanceId::new(99).unwrap();
556 let result = k.submit(
557 bogus,
558 Principal::System,
559 None,
560 Tick(0),
561 TypeCode(100),
562 Vec::new(),
563 );
564 assert!(matches!(result, Err(ArkheError::InstanceNotFound)));
565 }
566
567 #[test]
568 fn submit_then_step_executes_action_and_spawns_entity() {
569 let mut k = Kernel::new();
570 k.register_action::<SpawnOneAction>();
571 let inst = k.create_instance(InstanceConfig::default());
572 k.submit(
573 inst,
574 Principal::System,
575 None,
576 Tick(0),
577 TypeCode(100),
578 Vec::new(),
579 )
580 .unwrap();
581
582 let report = k.step(Tick(5), CapabilityMask::SYSTEM);
583 assert_eq!(report.actions_executed, 1);
584 assert_eq!(report.effects_applied, 1);
585 assert_eq!(report.effects_denied, 0);
586 let inst_ref = k.instances.get(&inst).unwrap();
588 assert_eq!(inst_ref.entities_len(), 1);
589 }
590
591 #[test]
592 fn step_with_unknown_type_code_skips_action() {
593 let mut k = Kernel::new();
594 let inst = k.create_instance(InstanceConfig::default());
596 k.submit(
597 inst,
598 Principal::System,
599 None,
600 Tick(0),
601 TypeCode(999),
602 Vec::new(),
603 )
604 .unwrap();
605
606 let report = k.step(Tick(5), CapabilityMask::SYSTEM);
607 assert_eq!(report.actions_executed, 1);
608 assert_eq!(report.effects_applied, 0);
609 assert_eq!(k.instances.get(&inst).unwrap().entities_len(), 0);
610 }
611
612 #[test]
613 fn observer_receives_action_executed_event() {
614 let mut k = Kernel::new();
615 k.register_action::<SpawnOneAction>();
616 let count = Arc::new(AtomicU32::new(0));
617 let _h = k.register_observer(Box::new(CountingObserver {
618 count: count.clone(),
619 }));
620 let inst = k.create_instance(InstanceConfig::default());
621 k.submit(
622 inst,
623 Principal::System,
624 None,
625 Tick(0),
626 TypeCode(100),
627 Vec::new(),
628 )
629 .unwrap();
630 k.step(Tick(5), CapabilityMask::SYSTEM);
631 assert_eq!(count.load(Ordering::SeqCst), 1);
633 }
634
635 #[test]
636 fn observer_receives_domain_event_emitted() {
637 let mut k = Kernel::new();
638 k.register_action::<EmitAction>();
639 let count = Arc::new(AtomicU32::new(0));
640 k.register_observer(Box::new(CountingObserver {
641 count: count.clone(),
642 }));
643 let inst = k.create_instance(InstanceConfig::default());
644 k.submit(
645 inst,
646 Principal::System,
647 None,
648 Tick(0),
649 TypeCode(101),
650 Vec::new(),
651 )
652 .unwrap();
653 let report = k.step(Tick(5), CapabilityMask::SYSTEM);
654 assert_eq!(report.domain_events_emitted, 1);
655 assert_eq!(count.load(Ordering::SeqCst), 2);
657 }
658
659 #[test]
660 fn panic_observer_evicted_after_first_event() {
661 let mut k = Kernel::new();
662 k.register_action::<SpawnOneAction>();
663 let h = k.register_observer(Box::new(PanicObserver));
664 let inst = k.create_instance(InstanceConfig::default());
665 k.submit(
666 inst,
667 Principal::System,
668 None,
669 Tick(0),
670 TypeCode(100),
671 Vec::new(),
672 )
673 .unwrap();
674 let report = k.step(Tick(5), CapabilityMask::SYSTEM);
675 assert!(report.observers_evicted >= 1);
676 assert!(k.observers.is_evicted(h));
677 }
678
679 #[test]
680 fn unauthenticated_principal_denies_all_effects() {
681 let mut k = Kernel::new();
682 k.register_action::<SpawnOneAction>();
683 let inst = k.create_instance(InstanceConfig::default());
684 k.submit(
685 inst,
686 Principal::Unauthenticated,
687 None,
688 Tick(0),
689 TypeCode(100),
690 Vec::new(),
691 )
692 .unwrap();
693 let report = k.step(Tick(5), CapabilityMask::SYSTEM);
694 assert_eq!(report.effects_denied, 1);
695 assert_eq!(report.effects_applied, 0);
696 assert_eq!(k.instances.get(&inst).unwrap().entities_len(), 0);
698 }
699
700 #[test]
701 fn external_without_system_cap_denies_signal() {
702 let mut k = Kernel::new();
703 k.register_action::<SignalAction>();
704 let inst = k.create_instance(InstanceConfig::default());
705 k.submit(
706 inst,
707 Principal::External(ExternalId(7)),
708 None,
709 Tick(0),
710 TypeCode(102),
711 Vec::new(),
712 )
713 .unwrap();
714 let report = k.step(Tick(5), CapabilityMask::default());
715 assert_eq!(report.effects_denied, 1);
716 assert_eq!(report.effects_applied, 0);
717 }
718
719 #[test]
720 fn wal_attached_kernel_records_committed_step() {
721 let mut k = Kernel::new_with_wal([7u8; 32], [3u8; 32]);
722 k.register_action::<SpawnOneAction>();
723 let inst = k.create_instance(InstanceConfig::default());
724 k.submit(
725 inst,
726 Principal::System,
727 None,
728 Tick(0),
729 TypeCode(100),
730 Vec::new(),
731 )
732 .unwrap();
733 assert_eq!(k.wal_record_count(), Some(0));
734 let pre_tip = k.wal_chain_tip().unwrap();
735 k.step(Tick(5), CapabilityMask::SYSTEM);
736 assert_eq!(k.wal_record_count(), Some(1));
737 let post_tip = k.wal_chain_tip().unwrap();
738 assert_ne!(pre_tip, post_tip);
739 }
740
741 #[test]
742 fn wal_kernel_export_then_verify_chain() {
743 let mut k = Kernel::new_with_wal([11u8; 32], [0u8; 32]);
744 k.register_action::<SpawnOneAction>();
745 let inst = k.create_instance(InstanceConfig::default());
746 for _ in 0..3 {
747 k.submit(
748 inst,
749 Principal::System,
750 None,
751 Tick(0),
752 TypeCode(100),
753 Vec::new(),
754 )
755 .unwrap();
756 k.step(Tick(0), CapabilityMask::SYSTEM);
757 }
758 let wal = k.export_wal().expect("wal attached");
759 assert_eq!(wal.records.len(), 3);
760 wal.verify_chain([11u8; 32]).expect("chain verifies");
761 }
762
763 #[test]
764 fn replay_reconstructs_chain_tip() {
765 use crate::persist::replay_into;
766 let mut k1 = Kernel::new_with_wal([42u8; 32], [0u8; 32]);
768 k1.register_action::<SpawnOneAction>();
769 let i1 = k1.create_instance(InstanceConfig::default());
770 for _ in 0..4 {
771 k1.submit(
772 i1,
773 Principal::System,
774 None,
775 Tick(0),
776 TypeCode(100),
777 Vec::new(),
778 )
779 .unwrap();
780 k1.step(Tick(0), CapabilityMask::SYSTEM);
781 }
782 let original_tip = k1.wal_chain_tip().unwrap();
783 let wal = k1.export_wal().unwrap();
784
785 let mut k2 = Kernel::new_with_wal([42u8; 32], [0u8; 32]);
787 k2.register_action::<SpawnOneAction>();
788 let _i2 = k2.create_instance(InstanceConfig::default());
791 let report = replay_into(&mut k2, &wal).expect("replay ok");
792 assert_eq!(report.records_replayed, 4);
793 let replayed_tip = k2.wal_chain_tip().unwrap();
794 assert_eq!(replayed_tip, original_tip);
795 assert_eq!(report.final_chain_tip, original_tip);
796 }
797
798 #[test]
799 fn step_processes_instances_in_ascending_order() {
800 let mut k = Kernel::new();
804 k.register_action::<SpawnOneAction>();
805 let i1 = k.create_instance(InstanceConfig::default());
806 let i2 = k.create_instance(InstanceConfig::default());
807 k.submit(
808 i2,
809 Principal::System,
810 None,
811 Tick(0),
812 TypeCode(100),
813 Vec::new(),
814 )
815 .unwrap();
816 k.submit(
817 i1,
818 Principal::System,
819 None,
820 Tick(0),
821 TypeCode(100),
822 Vec::new(),
823 )
824 .unwrap();
825 let report = k.step(Tick(5), CapabilityMask::SYSTEM);
826 assert_eq!(report.actions_executed, 2);
827 assert_eq!(report.effects_applied, 2);
828 }
829
830 #[test]
831 fn stats_aggregate_reflects_instances_and_scheduler() {
832 let mut k = Kernel::new();
833 k.register_action::<SpawnOneAction>();
834 assert_eq!(k.stats(), Stats::default());
835
836 let i1 = k.create_instance(InstanceConfig::default());
837 let i2 = k.create_instance(InstanceConfig::default());
838 let stats_pre = k.stats();
839 assert_eq!(stats_pre.instance_count, 2);
840 assert_eq!(stats_pre.scheduled_action_count, 0);
841 assert_eq!(stats_pre.entity_count, 0);
842
843 k.submit(
844 i1,
845 Principal::System,
846 None,
847 Tick(0),
848 TypeCode(100),
849 Vec::new(),
850 )
851 .unwrap();
852 k.submit(
853 i2,
854 Principal::System,
855 None,
856 Tick(0),
857 TypeCode(100),
858 Vec::new(),
859 )
860 .unwrap();
861 let stats_queued = k.stats();
862 assert_eq!(stats_queued.scheduled_action_count, 2);
863
864 let _ = k.step(Tick(1), CapabilityMask::SYSTEM);
865 let stats_post = k.stats();
866 assert_eq!(stats_post.scheduled_action_count, 0);
867 assert_eq!(stats_post.entity_count, 2);
868 }
869
870 #[test]
871 fn stats_counts_observers() {
872 struct NullObs;
873 impl KernelObserver for NullObs {
874 fn on_event(&self, _e: &KernelEvent) {}
875 }
876
877 let mut k = Kernel::new();
878 k.register_observer(Box::new(NullObs));
879 k.register_observer(Box::new(NullObs));
880 assert_eq!(k.stats().observer_count, 2);
881 }
882
883 #[test]
884 fn stats_wal_record_count_reflects_writer() {
885 let mut k = Kernel::new_with_wal([1u8; 32], [0u8; 32]);
886 k.register_action::<SpawnOneAction>();
887 assert_eq!(k.stats().wal_record_count, 0);
888
889 let i = k.create_instance(InstanceConfig::default());
890 k.submit(
891 i,
892 Principal::System,
893 None,
894 Tick(0),
895 TypeCode(100),
896 Vec::new(),
897 )
898 .unwrap();
899 let _ = k.step(Tick(1), CapabilityMask::SYSTEM);
900 assert_eq!(k.stats().wal_record_count, 1);
901 }
902
903 struct ForceUnloadCapture {
907 seen: Arc<std::sync::Mutex<Vec<(RouteId, u32)>>>,
908 }
909 impl KernelObserver for ForceUnloadCapture {
910 fn on_event(&self, event: &KernelEvent) {
911 if let KernelEvent::ModuleForceUnloaded {
912 route_id,
913 live_refs_at_unload,
914 } = event
915 {
916 self.seen
917 .lock()
918 .unwrap()
919 .push((*route_id, *live_refs_at_unload));
920 }
921 }
922 }
923
924 #[test]
925 fn force_unload_without_cap_denied() {
926 let mut k = Kernel::new();
927 let result = k.force_unload(RouteId(1), CapabilityMask::default());
928 assert!(matches!(result, Err(ArkheError::CapabilityDenied)));
929 }
930
931 #[test]
932 fn force_unload_removes_inflight_refs() {
933 let mut k = Kernel::new();
934 k.register_action::<SignalAction>();
935 let inst = k.create_instance(InstanceConfig::default());
936 k.submit(
939 inst,
940 Principal::System,
941 None,
942 Tick(0),
943 TypeCode(102),
944 Vec::new(),
945 )
946 .unwrap();
947 let report = k.step(Tick(0), CapabilityMask::SYSTEM);
948 assert_eq!(report.effects_applied, 1);
949 assert_eq!(
950 k.instances
951 .get(&inst)
952 .unwrap()
953 .inflight_refs_for(RouteId(1)),
954 1
955 );
956
957 let dropped = k
958 .force_unload(RouteId(1), CapabilityMask::ADMIN_UNLOAD)
959 .expect("admin_unload caps");
960 assert_eq!(dropped, 1);
961 assert_eq!(
962 k.instances
963 .get(&inst)
964 .unwrap()
965 .inflight_refs_for(RouteId(1)),
966 0
967 );
968 assert_eq!(k.instances.get(&inst).unwrap().inflight_refs_len(), 0);
969 }
970
971 #[test]
972 fn force_unload_emits_module_unloaded_event() {
973 let mut k = Kernel::new();
974 k.register_action::<SignalAction>();
975 let seen = Arc::new(std::sync::Mutex::new(Vec::new()));
976 k.register_observer(Box::new(ForceUnloadCapture { seen: seen.clone() }));
977 let inst = k.create_instance(InstanceConfig::default());
978 k.submit(
979 inst,
980 Principal::System,
981 None,
982 Tick(0),
983 TypeCode(102),
984 Vec::new(),
985 )
986 .unwrap();
987 let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
988
989 k.force_unload(RouteId(1), CapabilityMask::ADMIN_UNLOAD)
990 .expect("admin_unload caps");
991
992 let captured = seen.lock().unwrap().clone();
993 assert_eq!(captured, vec![(RouteId(1), 1)]);
994 }
995
996 #[test]
997 fn force_unload_no_live_refs_returns_zero() {
998 let mut k = Kernel::new();
999 let _ = k.create_instance(InstanceConfig::default());
1000 let dropped = k
1001 .force_unload(RouteId(99), CapabilityMask::ADMIN_UNLOAD)
1002 .expect("admin_unload caps");
1003 assert_eq!(dropped, 0);
1004 }
1005
1006 #[derive(Serialize, Deserialize)]
1013 struct SetCompAction {
1014 size: u64,
1015 entity_id: u64,
1016 }
1017 impl Sealed for SetCompAction {}
1018 impl ActionDeriv for SetCompAction {
1019 const TYPE_CODE: TypeCode = TypeCode(200);
1020 const SCHEMA_VERSION: u32 = 1;
1021 }
1022 impl ActionCompute for SetCompAction {
1023 fn compute(&self, _ctx: &ActionContext) -> Vec<Op> {
1024 let entity = EntityId::new(self.entity_id).unwrap();
1025 vec![
1026 Op::SpawnEntity {
1027 id: entity,
1028 owner: Principal::System,
1029 },
1030 Op::SetComponent {
1031 entity,
1032 type_code: TypeCode(7),
1033 bytes: Bytes::from(vec![0u8; self.size as usize]),
1034 size: self.size,
1035 },
1036 ]
1037 }
1038 }
1039
1040 #[derive(Serialize, Deserialize)]
1044 struct TwoSetCompAction {
1045 a: u64,
1046 b: u64,
1047 }
1048 impl Sealed for TwoSetCompAction {}
1049 impl ActionDeriv for TwoSetCompAction {
1050 const TYPE_CODE: TypeCode = TypeCode(201);
1051 const SCHEMA_VERSION: u32 = 1;
1052 }
1053 impl ActionCompute for TwoSetCompAction {
1054 fn compute(&self, _ctx: &ActionContext) -> Vec<Op> {
1055 let e1 = EntityId::new(1).unwrap();
1056 let e2 = EntityId::new(2).unwrap();
1057 vec![
1058 Op::SpawnEntity {
1059 id: e1,
1060 owner: Principal::System,
1061 },
1062 Op::SpawnEntity {
1063 id: e2,
1064 owner: Principal::System,
1065 },
1066 Op::SetComponent {
1067 entity: e1,
1068 type_code: TypeCode(7),
1069 bytes: Bytes::from(vec![0u8; self.a as usize]),
1070 size: self.a,
1071 },
1072 Op::SetComponent {
1073 entity: e2,
1074 type_code: TypeCode(7),
1075 bytes: Bytes::from(vec![0u8; self.b as usize]),
1076 size: self.b,
1077 },
1078 ]
1079 }
1080 }
1081
1082 fn cfg_with_budget(budget: u64) -> InstanceConfig {
1083 InstanceConfig {
1084 memory_budget_bytes: budget,
1085 ..Default::default()
1086 }
1087 }
1088
1089 fn submit_set(k: &mut Kernel, inst: InstanceId, size: u64, entity_id: u64) {
1090 let action = SetCompAction { size, entity_id };
1091 let bytes = Action::canonical_bytes(&action);
1092 k.submit(
1093 inst,
1094 Principal::System,
1095 None,
1096 Tick(0),
1097 SetCompAction::TYPE_CODE,
1098 bytes,
1099 )
1100 .expect("submit ok");
1101 }
1102
1103 #[test]
1104 fn budget_zero_allows_unlimited() {
1105 let mut k = Kernel::new();
1108 k.register_action::<SetCompAction>();
1109 let inst = k.create_instance(InstanceConfig::default());
1110 submit_set(&mut k, inst, 1_000_000, 1);
1111 let report = k.step(Tick(0), CapabilityMask::SYSTEM);
1112 assert_eq!(report.effects_applied, 2);
1113 assert_eq!(report.effects_denied, 0);
1114 assert_eq!(k.instances.get(&inst).unwrap().components_len(), 1);
1115 }
1116
1117 #[test]
1118 fn budget_exceeded_denies_op() {
1119 let mut k = Kernel::new();
1122 k.register_action::<SetCompAction>();
1123 let inst = k.create_instance(cfg_with_budget(100));
1124 submit_set(&mut k, inst, 500, 1);
1125 let report = k.step(Tick(0), CapabilityMask::SYSTEM);
1126 assert_eq!(report.effects_applied, 1); assert_eq!(report.effects_denied, 1); assert_eq!(report.actions_executed, 1);
1129 assert_eq!(k.instances.get(&inst).unwrap().entities_len(), 1);
1130 assert_eq!(k.instances.get(&inst).unwrap().components_len(), 0);
1131 }
1132
1133 #[test]
1134 fn budget_at_edge_allows_equal() {
1135 let mut k = Kernel::new();
1138 k.register_action::<SetCompAction>();
1139 let inst = k.create_instance(cfg_with_budget(500));
1140 submit_set(&mut k, inst, 500, 1);
1141 let report = k.step(Tick(0), CapabilityMask::SYSTEM);
1142 assert_eq!(report.effects_applied, 2);
1143 assert_eq!(report.effects_denied, 0);
1144 assert_eq!(k.instances.get(&inst).unwrap().components_len(), 1);
1145 assert_eq!(k.instances.get(&inst).unwrap().ledger().total_bytes(), 500);
1146 }
1147
1148 #[test]
1149 fn multi_op_stage_respects_running_delta() {
1150 let mut k = Kernel::new();
1156 k.register_action::<TwoSetCompAction>();
1157 let inst = k.create_instance(cfg_with_budget(600));
1158 let action = TwoSetCompAction { a: 400, b: 400 };
1159 let bytes = Action::canonical_bytes(&action);
1160 k.submit(
1161 inst,
1162 Principal::System,
1163 None,
1164 Tick(0),
1165 TwoSetCompAction::TYPE_CODE,
1166 bytes,
1167 )
1168 .unwrap();
1169 let report = k.step(Tick(0), CapabilityMask::SYSTEM);
1170 assert_eq!(report.effects_applied, 3);
1171 assert_eq!(report.effects_denied, 1);
1172 assert_eq!(k.instances.get(&inst).unwrap().entities_len(), 2);
1173 assert_eq!(k.instances.get(&inst).unwrap().components_len(), 1);
1174 assert_eq!(k.instances.get(&inst).unwrap().ledger().total_bytes(), 400);
1175 }
1176
1177 struct EffectFailedCapture {
1179 seen: Arc<std::sync::Mutex<Vec<Bytes>>>,
1180 }
1181 impl KernelObserver for EffectFailedCapture {
1182 fn on_event(&self, event: &KernelEvent) {
1183 if let KernelEvent::EffectFailed { reason, .. } = event {
1184 self.seen.lock().unwrap().push(reason.clone());
1185 }
1186 }
1187 }
1188
1189 #[test]
1190 fn effect_failed_event_on_budget_deny() {
1191 let mut k = Kernel::new();
1192 k.register_action::<SetCompAction>();
1193 let seen = Arc::new(std::sync::Mutex::new(Vec::new()));
1194 k.register_observer(Box::new(EffectFailedCapture { seen: seen.clone() }));
1195 let inst = k.create_instance(cfg_with_budget(100));
1196 submit_set(&mut k, inst, 500, 1);
1197 let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1198
1199 let captured = seen.lock().unwrap().clone();
1200 assert_eq!(captured.len(), 1);
1201 assert_eq!(captured[0].as_ref(), b"budget_exceeded");
1202 }
1203
1204 use crate::runtime::event::EventMask;
1207
1208 #[derive(Default)]
1211 struct VariantCounters {
1212 action_executed: AtomicU32,
1213 action_failed: AtomicU32,
1214 domain_event: AtomicU32,
1215 effect_failed: AtomicU32,
1216 other: AtomicU32,
1217 }
1218
1219 struct VariantTallyObserver {
1220 counters: Arc<VariantCounters>,
1221 }
1222 impl KernelObserver for VariantTallyObserver {
1223 fn on_event(&self, event: &KernelEvent) {
1224 match event {
1225 KernelEvent::ActionExecuted { .. } => {
1226 self.counters.action_executed.fetch_add(1, Ordering::SeqCst);
1227 }
1228 KernelEvent::ActionFailed { .. } => {
1229 self.counters.action_failed.fetch_add(1, Ordering::SeqCst);
1230 }
1231 KernelEvent::DomainEventEmitted { .. } => {
1232 self.counters.domain_event.fetch_add(1, Ordering::SeqCst);
1233 }
1234 KernelEvent::EffectFailed { .. } => {
1235 self.counters.effect_failed.fetch_add(1, Ordering::SeqCst);
1236 }
1237 KernelEvent::ObserverPanic { .. }
1238 | KernelEvent::ObserverEvicted { .. }
1239 | KernelEvent::SignalDropped { .. }
1240 | KernelEvent::ModuleForceUnloaded { .. }
1241 | KernelEvent::ActionDeferredToNextTick { .. }
1242 | KernelEvent::ObserversFlushed { .. } => {
1243 self.counters.other.fetch_add(1, Ordering::SeqCst);
1244 }
1245 }
1246 }
1247 }
1248
1249 #[test]
1250 fn event_mask_default_is_all() {
1251 let m = EventMask::default();
1252 assert_eq!(m, EventMask::ALL);
1253 assert!(m.contains(EventMask::ACTION_EXECUTED));
1254 assert!(m.contains(EventMask::DOMAIN_EVENT_EMITTED));
1255 assert!(m.contains(EventMask::MODULE_FORCE_UNLOADED));
1256 }
1257
1258 #[test]
1259 fn register_observer_backward_compat_receives_all() {
1260 let mut k = Kernel::new();
1263 k.register_action::<EmitAction>();
1264 let counters = Arc::new(VariantCounters::default());
1265 k.register_observer(Box::new(VariantTallyObserver {
1266 counters: counters.clone(),
1267 }));
1268 let inst = k.create_instance(InstanceConfig::default());
1269 k.submit(
1270 inst,
1271 Principal::System,
1272 None,
1273 Tick(0),
1274 TypeCode(101),
1275 Vec::new(),
1276 )
1277 .unwrap();
1278 let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1279 assert_eq!(counters.action_executed.load(Ordering::SeqCst), 1);
1280 assert_eq!(counters.domain_event.load(Ordering::SeqCst), 1);
1281 }
1282
1283 #[test]
1284 fn filter_only_action_executed() {
1285 let mut k = Kernel::new();
1287 k.register_action::<EmitAction>();
1288 let counters = Arc::new(VariantCounters::default());
1289 k.register_observer_filtered(
1290 Box::new(VariantTallyObserver {
1291 counters: counters.clone(),
1292 }),
1293 EventMask::ACTION_EXECUTED,
1294 );
1295 let inst = k.create_instance(InstanceConfig::default());
1296 k.submit(
1297 inst,
1298 Principal::System,
1299 None,
1300 Tick(0),
1301 TypeCode(101),
1302 Vec::new(),
1303 )
1304 .unwrap();
1305 let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1306 assert_eq!(counters.action_executed.load(Ordering::SeqCst), 1);
1307 assert_eq!(counters.domain_event.load(Ordering::SeqCst), 0);
1308 }
1309
1310 #[test]
1311 fn filter_domain_event_only() {
1312 let mut k = Kernel::new();
1314 k.register_action::<EmitAction>();
1315 let counters = Arc::new(VariantCounters::default());
1316 k.register_observer_filtered(
1317 Box::new(VariantTallyObserver {
1318 counters: counters.clone(),
1319 }),
1320 EventMask::DOMAIN_EVENT_EMITTED,
1321 );
1322 let inst = k.create_instance(InstanceConfig::default());
1323 k.submit(
1324 inst,
1325 Principal::System,
1326 None,
1327 Tick(0),
1328 TypeCode(101),
1329 Vec::new(),
1330 )
1331 .unwrap();
1332 let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1333 assert_eq!(counters.action_executed.load(Ordering::SeqCst), 0);
1334 assert_eq!(counters.domain_event.load(Ordering::SeqCst), 1);
1335 }
1336
1337 #[test]
1338 fn multiple_observers_independent_masks() {
1339 let mut k = Kernel::new();
1342 k.register_action::<EmitAction>();
1343 let ca = Arc::new(VariantCounters::default());
1344 let cb = Arc::new(VariantCounters::default());
1345 k.register_observer_filtered(
1346 Box::new(VariantTallyObserver {
1347 counters: ca.clone(),
1348 }),
1349 EventMask::ACTION_EXECUTED,
1350 );
1351 k.register_observer_filtered(
1352 Box::new(VariantTallyObserver {
1353 counters: cb.clone(),
1354 }),
1355 EventMask::DOMAIN_EVENT_EMITTED,
1356 );
1357 let inst = k.create_instance(InstanceConfig::default());
1358 k.submit(
1359 inst,
1360 Principal::System,
1361 None,
1362 Tick(0),
1363 TypeCode(101),
1364 Vec::new(),
1365 )
1366 .unwrap();
1367 let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1368 assert_eq!(ca.action_executed.load(Ordering::SeqCst), 1);
1369 assert_eq!(ca.domain_event.load(Ordering::SeqCst), 0);
1370 assert_eq!(cb.action_executed.load(Ordering::SeqCst), 0);
1371 assert_eq!(cb.domain_event.load(Ordering::SeqCst), 1);
1372 }
1373
1374 #[test]
1375 fn filter_empty_mask_receives_nothing() {
1376 let mut k = Kernel::new();
1378 k.register_action::<EmitAction>();
1379 let counters = Arc::new(VariantCounters::default());
1380 k.register_observer_filtered(
1381 Box::new(VariantTallyObserver {
1382 counters: counters.clone(),
1383 }),
1384 EventMask::empty(),
1385 );
1386 let inst = k.create_instance(InstanceConfig::default());
1387 k.submit(
1388 inst,
1389 Principal::System,
1390 None,
1391 Tick(0),
1392 TypeCode(101),
1393 Vec::new(),
1394 )
1395 .unwrap();
1396 let _ = k.step(Tick(0), CapabilityMask::SYSTEM);
1397 assert_eq!(counters.action_executed.load(Ordering::SeqCst), 0);
1398 assert_eq!(counters.domain_event.load(Ordering::SeqCst), 0);
1399 assert_eq!(counters.action_failed.load(Ordering::SeqCst), 0);
1400 assert_eq!(counters.effect_failed.load(Ordering::SeqCst), 0);
1401 assert_eq!(counters.other.load(Ordering::SeqCst), 0);
1402 }
1403}