1use crate::abi::{Principal, Tick};
12use crate::state::{Authorized, Effect, EntityMeta, Op, ScheduledActionId, ScheduledEntry};
13
14use super::event::KernelEvent;
15use super::stage::{LedgerOp, PendingSignal, ScheduledEntryDelta, StagedStateDelta, StepStage};
16
17pub(crate) fn dispatch<'i>(
24 effect: Effect<'i, Authorized>,
25 stage: &mut StepStage,
26 now: Tick,
27 next_scheduled_id: &mut u64,
28) {
29 let instance_id = effect.instance_id;
30 let effect_principal = match &effect.principal {
31 Principal::Unauthenticated => Principal::Unauthenticated,
32 Principal::External(e) => Principal::External(*e),
33 Principal::System => Principal::System,
34 };
35 match effect.op {
36 Op::SpawnEntity { id, owner } => {
37 stage.state_ops.push(StagedStateDelta::SpawnEntity {
38 id,
39 meta: EntityMeta {
40 owner,
41 created: now,
42 },
43 });
44 stage.ledger_delta.ops.push(LedgerOp::AddEntity(id));
45 stage.id_counters.next_entity_advance =
46 stage.id_counters.next_entity_advance.saturating_add(1);
47 }
48 Op::DespawnEntity { id } => {
49 stage.state_ops.push(StagedStateDelta::DespawnEntity { id });
50 stage.ledger_delta.ops.push(LedgerOp::RemoveEntity(id));
51 }
52 Op::SetComponent {
53 entity,
54 type_code,
55 bytes,
56 size,
57 } => {
58 stage.state_ops.push(StagedStateDelta::SetComponent {
59 entity,
60 type_code,
61 bytes,
62 size,
63 });
64 stage.ledger_delta.ops.push(LedgerOp::AddComponent {
65 entity,
66 type_code,
67 size,
68 });
69 }
70 Op::RemoveComponent {
71 entity,
72 type_code,
73 size,
74 } => {
75 stage.state_ops.push(StagedStateDelta::RemoveComponent {
76 entity,
77 type_code,
78 size,
79 });
80 stage.ledger_delta.ops.push(LedgerOp::RemoveComponent {
81 entity,
82 type_code,
83 size,
84 });
85 }
86 Op::EmitEvent {
87 actor,
88 event_type_code,
89 event_bytes,
90 } => {
91 stage.events.push_back(KernelEvent::DomainEventEmitted {
92 instance: instance_id,
93 actor,
94 event_type_code,
95 bytes: event_bytes,
96 });
97 }
98 Op::ScheduleAction {
99 at,
100 actor,
101 action_type_code,
102 action_bytes,
103 action_principal,
104 } => {
105 *next_scheduled_id = next_scheduled_id.saturating_add(1);
106 let id = ScheduledActionId::new(*next_scheduled_id)
107 .expect("next_scheduled_id incremented before use; non-zero");
108 stage
109 .schedule_deltas
110 .push(ScheduledEntryDelta::Add(ScheduledEntry {
111 id,
112 at,
113 actor,
114 principal: action_principal,
115 action_type_code,
116 action_bytes: action_bytes.to_vec(),
117 }));
118 stage.id_counters.next_scheduled_advance =
119 stage.id_counters.next_scheduled_advance.saturating_add(1);
120 }
121 Op::SendSignal {
122 target,
123 route,
124 payload,
125 } => {
126 stage.pending_signals.push(PendingSignal {
127 target,
128 route,
129 payload,
130 principal: effect_principal,
131 });
132 *stage.inflight_refs_delta.entry(route).or_insert(0) += 1;
133 }
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use bytes::Bytes;
141
142 use crate::abi::{CapabilityMask, EntityId, InstanceId, Principal, RouteId, Tick, TypeCode};
143 use crate::state::authz::authorize;
144 use crate::state::{Effect, Op, Unverified};
145
146 fn inst() -> InstanceId {
147 InstanceId::new(1).unwrap()
148 }
149 fn ent(n: u64) -> EntityId {
150 EntityId::new(n).unwrap()
151 }
152
153 fn auth_system(op: Op) -> Effect<'static, crate::state::Authorized> {
154 let e: Effect<'static, Unverified> = Effect::new(inst(), Principal::System, op);
155 authorize(CapabilityMask::SYSTEM, e).expect("system always authorized")
156 }
157
158 #[test]
159 fn dispatch_spawn_entity_pushes_state_op_ledger_and_advances_counter() {
160 let mut stage = StepStage::default();
161 let mut next_id: u64 = 0;
162 let e = auth_system(Op::SpawnEntity {
163 id: ent(1),
164 owner: Principal::System,
165 });
166 dispatch(e, &mut stage, Tick(5), &mut next_id);
167 assert_eq!(stage.state_ops.len(), 1);
168 assert_eq!(stage.ledger_delta.ops.len(), 1);
169 assert_eq!(stage.id_counters.next_entity_advance, 1);
170 assert_eq!(next_id, 0); }
172
173 #[test]
174 fn dispatch_despawn_entity_pushes_state_and_ledger_remove() {
175 let mut stage = StepStage::default();
176 let mut next_id: u64 = 0;
177 let e = auth_system(Op::DespawnEntity { id: ent(1) });
178 dispatch(e, &mut stage, Tick(0), &mut next_id);
179 assert_eq!(stage.state_ops.len(), 1);
180 assert_eq!(stage.ledger_delta.ops.len(), 1);
181 }
182
183 #[test]
184 fn dispatch_set_component_pushes_state_and_ledger_add() {
185 let mut stage = StepStage::default();
186 let mut next_id: u64 = 0;
187 let e = auth_system(Op::SetComponent {
188 entity: ent(1),
189 type_code: TypeCode(7),
190 bytes: Bytes::from_static(b"data"),
191 size: 4,
192 });
193 dispatch(e, &mut stage, Tick(0), &mut next_id);
194 assert_eq!(stage.state_ops.len(), 1);
195 assert_eq!(stage.ledger_delta.ops.len(), 1);
196 }
197
198 #[test]
199 fn dispatch_remove_component_pushes_state_and_ledger_remove() {
200 let mut stage = StepStage::default();
201 let mut next_id: u64 = 0;
202 let e = auth_system(Op::RemoveComponent {
203 entity: ent(1),
204 type_code: TypeCode(7),
205 size: 4,
206 });
207 dispatch(e, &mut stage, Tick(0), &mut next_id);
208 assert_eq!(stage.state_ops.len(), 1);
209 assert_eq!(stage.ledger_delta.ops.len(), 1);
210 }
211
212 #[test]
213 fn dispatch_emit_event_pushes_kernel_event() {
214 let mut stage = StepStage::default();
215 let mut next_id: u64 = 0;
216 let e = auth_system(Op::EmitEvent {
217 actor: Some(ent(1)),
218 event_type_code: TypeCode(2),
219 event_bytes: Bytes::from_static(b"evt"),
220 });
221 dispatch(e, &mut stage, Tick(0), &mut next_id);
222 assert_eq!(stage.events.len(), 1);
223 match stage.events.front().unwrap() {
224 KernelEvent::DomainEventEmitted {
225 event_type_code, ..
226 } => {
227 assert_eq!(*event_type_code, TypeCode(2));
228 }
229 _ => panic!("expected DomainEventEmitted"),
230 }
231 }
232
233 #[test]
234 fn dispatch_schedule_action_advances_id_and_pushes_delta() {
235 let mut stage = StepStage::default();
236 let mut next_id: u64 = 0;
237 let e = auth_system(Op::ScheduleAction {
238 at: Tick(10),
239 actor: None,
240 action_type_code: TypeCode(3),
241 action_bytes: Bytes::from_static(b"a"),
242 action_principal: Principal::System,
243 });
244 dispatch(e, &mut stage, Tick(0), &mut next_id);
245 assert_eq!(stage.schedule_deltas.len(), 1);
246 assert_eq!(next_id, 1);
247 assert_eq!(stage.id_counters.next_scheduled_advance, 1);
248 }
249
250 #[test]
251 fn dispatch_send_signal_pushes_pending_and_increments_inflight_refs() {
252 let mut stage = StepStage::default();
253 let mut next_id: u64 = 0;
254 let route = RouteId(42);
255 let e = auth_system(Op::SendSignal {
256 target: inst(),
257 route,
258 payload: Bytes::from_static(b"hello"),
259 });
260 dispatch(e, &mut stage, Tick(0), &mut next_id);
261 assert_eq!(stage.pending_signals.len(), 1);
262 assert_eq!(stage.inflight_refs_delta.get(&route).copied(), Some(1));
263 }
264
265 #[test]
266 fn dispatch_send_signal_twice_aggregates_inflight_refs() {
267 let mut stage = StepStage::default();
268 let mut next_id: u64 = 0;
269 let route = RouteId(42);
270 for _ in 0..3 {
271 let e = auth_system(Op::SendSignal {
272 target: inst(),
273 route,
274 payload: Bytes::new(),
275 });
276 dispatch(e, &mut stage, Tick(0), &mut next_id);
277 }
278 assert_eq!(stage.pending_signals.len(), 3);
279 assert_eq!(stage.inflight_refs_delta.get(&route).copied(), Some(3));
280 }
281}