Skip to main content

arkhe_kernel/runtime/
dispatch.rs

1//! Effect dispatcher — translate an `Effect<'i, Authorized>` into
2//! `StepStage` deltas.
3//!
4//! Inputs: an authorized Effect (Op + originating principal + branded
5//! instance), the running `StepStage`, current Tick, and the kernel's
6//! `next_scheduled_id` counter.
7//!
8//! Output: side-effect on `StepStage` only — no Instance mutation here;
9//! `runtime::apply::apply_stage` commits the stage post-dispatch.
10
11use crate::abi::{Principal, Tick};
12use crate::state::{Authorized, Effect, EntityMeta, Op, ScheduledActionId, ScheduledEntry};
13
14use super::event::KernelEvent;
15use super::stage::{LedgerOp, PendingSignal, ScheduledEntryDelta, StagedStateDelta, StepStage};
16
17/// Translate one authorized Effect into corresponding `StepStage` writes.
18///
19/// `next_scheduled_id` is monotonically advanced for `Op::ScheduleAction`
20/// to keep `ScheduledActionId` allocation deterministic across the step
21/// (id_counters tuple includes scheduled-id; staged so rollback
22/// rewinds it via `id_counters.next_scheduled_advance`).
23pub(crate) fn dispatch<'i>(
24    effect: Effect<'i, Authorized>,
25    stage: &mut StepStage,
26    now: Tick,
27    next_scheduled_id: &mut u64,
28) {
29    let instance_id = effect.instance_id;
30    let effect_principal = match &effect.principal {
31        Principal::Unauthenticated => Principal::Unauthenticated,
32        Principal::External(e) => Principal::External(*e),
33        Principal::System => Principal::System,
34    };
35    match effect.op {
36        Op::SpawnEntity { id, owner } => {
37            stage.state_ops.push(StagedStateDelta::SpawnEntity {
38                id,
39                meta: EntityMeta {
40                    owner,
41                    created: now,
42                },
43            });
44            stage.ledger_delta.ops.push(LedgerOp::AddEntity(id));
45            stage.id_counters.next_entity_advance =
46                stage.id_counters.next_entity_advance.saturating_add(1);
47        }
48        Op::DespawnEntity { id } => {
49            stage.state_ops.push(StagedStateDelta::DespawnEntity { id });
50            stage.ledger_delta.ops.push(LedgerOp::RemoveEntity(id));
51        }
52        Op::SetComponent {
53            entity,
54            type_code,
55            bytes,
56            size,
57        } => {
58            stage.state_ops.push(StagedStateDelta::SetComponent {
59                entity,
60                type_code,
61                bytes,
62                size,
63            });
64            stage.ledger_delta.ops.push(LedgerOp::AddComponent {
65                entity,
66                type_code,
67                size,
68            });
69        }
70        Op::RemoveComponent {
71            entity,
72            type_code,
73            size,
74        } => {
75            stage.state_ops.push(StagedStateDelta::RemoveComponent {
76                entity,
77                type_code,
78                size,
79            });
80            stage.ledger_delta.ops.push(LedgerOp::RemoveComponent {
81                entity,
82                type_code,
83                size,
84            });
85        }
86        Op::EmitEvent {
87            actor,
88            event_type_code,
89            event_bytes,
90        } => {
91            stage.events.push_back(KernelEvent::DomainEventEmitted {
92                instance: instance_id,
93                actor,
94                event_type_code,
95                bytes: event_bytes,
96            });
97        }
98        Op::ScheduleAction {
99            at,
100            actor,
101            action_type_code,
102            action_bytes,
103            action_principal,
104        } => {
105            *next_scheduled_id = next_scheduled_id.saturating_add(1);
106            let id = ScheduledActionId::new(*next_scheduled_id)
107                .expect("next_scheduled_id incremented before use; non-zero");
108            stage
109                .schedule_deltas
110                .push(ScheduledEntryDelta::Add(ScheduledEntry {
111                    id,
112                    at,
113                    actor,
114                    principal: action_principal,
115                    action_type_code,
116                    action_bytes: action_bytes.to_vec(),
117                }));
118            stage.id_counters.next_scheduled_advance =
119                stage.id_counters.next_scheduled_advance.saturating_add(1);
120        }
121        Op::SendSignal {
122            target,
123            route,
124            payload,
125        } => {
126            stage.pending_signals.push(PendingSignal {
127                target,
128                route,
129                payload,
130                principal: effect_principal,
131            });
132            *stage.inflight_refs_delta.entry(route).or_insert(0) += 1;
133        }
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use bytes::Bytes;
141
142    use crate::abi::{CapabilityMask, EntityId, InstanceId, Principal, RouteId, Tick, TypeCode};
143    use crate::state::authz::authorize;
144    use crate::state::{Effect, Op, Unverified};
145
146    fn inst() -> InstanceId {
147        InstanceId::new(1).unwrap()
148    }
149    fn ent(n: u64) -> EntityId {
150        EntityId::new(n).unwrap()
151    }
152
153    fn auth_system(op: Op) -> Effect<'static, crate::state::Authorized> {
154        let e: Effect<'static, Unverified> = Effect::new(inst(), Principal::System, op);
155        authorize(CapabilityMask::SYSTEM, e).expect("system always authorized")
156    }
157
158    #[test]
159    fn dispatch_spawn_entity_pushes_state_op_ledger_and_advances_counter() {
160        let mut stage = StepStage::default();
161        let mut next_id: u64 = 0;
162        let e = auth_system(Op::SpawnEntity {
163            id: ent(1),
164            owner: Principal::System,
165        });
166        dispatch(e, &mut stage, Tick(5), &mut next_id);
167        assert_eq!(stage.state_ops.len(), 1);
168        assert_eq!(stage.ledger_delta.ops.len(), 1);
169        assert_eq!(stage.id_counters.next_entity_advance, 1);
170        assert_eq!(next_id, 0); // not advanced (no schedule)
171    }
172
173    #[test]
174    fn dispatch_despawn_entity_pushes_state_and_ledger_remove() {
175        let mut stage = StepStage::default();
176        let mut next_id: u64 = 0;
177        let e = auth_system(Op::DespawnEntity { id: ent(1) });
178        dispatch(e, &mut stage, Tick(0), &mut next_id);
179        assert_eq!(stage.state_ops.len(), 1);
180        assert_eq!(stage.ledger_delta.ops.len(), 1);
181    }
182
183    #[test]
184    fn dispatch_set_component_pushes_state_and_ledger_add() {
185        let mut stage = StepStage::default();
186        let mut next_id: u64 = 0;
187        let e = auth_system(Op::SetComponent {
188            entity: ent(1),
189            type_code: TypeCode(7),
190            bytes: Bytes::from_static(b"data"),
191            size: 4,
192        });
193        dispatch(e, &mut stage, Tick(0), &mut next_id);
194        assert_eq!(stage.state_ops.len(), 1);
195        assert_eq!(stage.ledger_delta.ops.len(), 1);
196    }
197
198    #[test]
199    fn dispatch_remove_component_pushes_state_and_ledger_remove() {
200        let mut stage = StepStage::default();
201        let mut next_id: u64 = 0;
202        let e = auth_system(Op::RemoveComponent {
203            entity: ent(1),
204            type_code: TypeCode(7),
205            size: 4,
206        });
207        dispatch(e, &mut stage, Tick(0), &mut next_id);
208        assert_eq!(stage.state_ops.len(), 1);
209        assert_eq!(stage.ledger_delta.ops.len(), 1);
210    }
211
212    #[test]
213    fn dispatch_emit_event_pushes_kernel_event() {
214        let mut stage = StepStage::default();
215        let mut next_id: u64 = 0;
216        let e = auth_system(Op::EmitEvent {
217            actor: Some(ent(1)),
218            event_type_code: TypeCode(2),
219            event_bytes: Bytes::from_static(b"evt"),
220        });
221        dispatch(e, &mut stage, Tick(0), &mut next_id);
222        assert_eq!(stage.events.len(), 1);
223        match stage.events.front().unwrap() {
224            KernelEvent::DomainEventEmitted {
225                event_type_code, ..
226            } => {
227                assert_eq!(*event_type_code, TypeCode(2));
228            }
229            _ => panic!("expected DomainEventEmitted"),
230        }
231    }
232
233    #[test]
234    fn dispatch_schedule_action_advances_id_and_pushes_delta() {
235        let mut stage = StepStage::default();
236        let mut next_id: u64 = 0;
237        let e = auth_system(Op::ScheduleAction {
238            at: Tick(10),
239            actor: None,
240            action_type_code: TypeCode(3),
241            action_bytes: Bytes::from_static(b"a"),
242            action_principal: Principal::System,
243        });
244        dispatch(e, &mut stage, Tick(0), &mut next_id);
245        assert_eq!(stage.schedule_deltas.len(), 1);
246        assert_eq!(next_id, 1);
247        assert_eq!(stage.id_counters.next_scheduled_advance, 1);
248    }
249
250    #[test]
251    fn dispatch_send_signal_pushes_pending_and_increments_inflight_refs() {
252        let mut stage = StepStage::default();
253        let mut next_id: u64 = 0;
254        let route = RouteId(42);
255        let e = auth_system(Op::SendSignal {
256            target: inst(),
257            route,
258            payload: Bytes::from_static(b"hello"),
259        });
260        dispatch(e, &mut stage, Tick(0), &mut next_id);
261        assert_eq!(stage.pending_signals.len(), 1);
262        assert_eq!(stage.inflight_refs_delta.get(&route).copied(), Some(1));
263    }
264
265    #[test]
266    fn dispatch_send_signal_twice_aggregates_inflight_refs() {
267        let mut stage = StepStage::default();
268        let mut next_id: u64 = 0;
269        let route = RouteId(42);
270        for _ in 0..3 {
271            let e = auth_system(Op::SendSignal {
272                target: inst(),
273                route,
274                payload: Bytes::new(),
275            });
276            dispatch(e, &mut stage, Tick(0), &mut next_id);
277        }
278        assert_eq!(stage.pending_signals.len(), 3);
279        assert_eq!(stage.inflight_refs_delta.get(&route).copied(), Some(3));
280    }
281}