Skip to main content

arkhe_kernel/runtime/
apply.rs

1//! Stage application — runtime-side `apply_stage` / `discard_stage`.
2//!
3//! Lives in `runtime/` so the layer DAG `abi → state → runtime → persist`
4//! is preserved (R4-X) — placing it on `Instance` would force `state` to
5//! import `runtime::StepStage`.
6//!
7//! Application is panic-free (totality contract): every operation
8//! uses saturating arithmetic and `if let Some(...)` style guards.
9
10use super::stage::{LedgerOp, ScheduledEntryDelta, StagedStateDelta, StepStage};
11use crate::state::Instance;
12
13/// Apply a `StepStage` to `instance` in canonical order:
14///
15/// 1. `id_counters`
16/// 2. `state_ops`
17/// 3. `ledger_delta`
18/// 4. `inflight_refs_delta`
19/// 5. `schedule_deltas`
20/// 6. `wall_remainder` and `local_tick`
21///
22/// Kernel-level buckets `pending_signals` / `events` / `observer_eviction`
23/// are drained by Kernel post-commit (chunks 3b/c) — no-op here.
24pub(crate) fn apply_stage(instance: &mut Instance, stage: StepStage) {
25    // 1. id_counters — monotonic; no preconditions.
26    {
27        let c = instance.id_counters_mut();
28        c.next_entity = c
29            .next_entity
30            .saturating_add(stage.id_counters.next_entity_advance);
31        c.next_scheduled = c
32            .next_scheduled
33            .saturating_add(stage.id_counters.next_scheduled_advance);
34        c.next_source_seq = c
35            .next_source_seq
36            .saturating_add(stage.id_counters.next_source_seq_advance);
37    }
38
39    // 2. state_ops — entity/component mutation.
40    for op in stage.state_ops {
41        match op {
42            StagedStateDelta::SpawnEntity { id, meta } => {
43                instance.insert_entity(id, meta);
44            }
45            StagedStateDelta::DespawnEntity { id } => {
46                instance.remove_entity(id);
47            }
48            StagedStateDelta::SetComponent {
49                entity,
50                type_code,
51                bytes,
52                ..
53            } => {
54                instance.insert_component((entity, type_code), bytes);
55            }
56            StagedStateDelta::RemoveComponent {
57                entity, type_code, ..
58            } => {
59                instance.remove_component((entity, type_code));
60            }
61        }
62    }
63
64    // 3. ledger_delta — accounting follows entity/component apply.
65    {
66        let ledger = instance.ledger_mut();
67        for lop in stage.ledger_delta.ops {
68            match lop {
69                LedgerOp::AddEntity(id) => {
70                    let _ = ledger.add_entity(id);
71                }
72                LedgerOp::RemoveEntity(id) => {
73                    let _ = ledger.remove_entity(id);
74                }
75                LedgerOp::AddComponent {
76                    entity,
77                    type_code,
78                    size,
79                } => {
80                    let _ = ledger.add_component(entity, type_code, size);
81                }
82                LedgerOp::RemoveComponent {
83                    entity,
84                    type_code,
85                    size,
86                } => {
87                    let _ = ledger.remove_component(entity, type_code, size);
88                }
89            }
90        }
91    }
92
93    // 4. inflight_refs_delta — i32 deltas applied to u32 table.
94    {
95        let refs = instance.inflight_refs_mut();
96        for (route_id, delta) in stage.inflight_refs_delta {
97            let entry = refs.entry(route_id).or_insert(0);
98            if delta >= 0 {
99                *entry = entry.saturating_add(delta as u32);
100            } else {
101                *entry = entry.saturating_sub(delta.unsigned_abs());
102            }
103            if *entry == 0 {
104                refs.remove(&route_id);
105            }
106        }
107    }
108
109    // 5. schedule_deltas — scheduler mutation. NOTE: the Add path passes
110    // entry data to `Scheduler::schedule` (which assigns a fresh ID); the
111    // staged `entry.id` is *not* preserved at this layer. Pre-assigned
112    // scheduling is reserved (deferred) for when the stage will carry
113    // the canonical ID up-front.
114    {
115        let scheduler = instance.scheduler_mut();
116        for sd in stage.schedule_deltas {
117            match sd {
118                ScheduledEntryDelta::Add(entry) => {
119                    let _ = scheduler.schedule(
120                        entry.at,
121                        entry.actor,
122                        entry.principal,
123                        entry.action_type_code,
124                        entry.action_bytes,
125                    );
126                }
127                ScheduledEntryDelta::Remove(id) => {
128                    let _ = scheduler.cancel(id);
129                }
130            }
131        }
132    }
133
134    // 6/7/9. pending_signals / events / observer_eviction_pending —
135    // kernel-level state; Kernel drains these post-commit (chunks 3b/c).
136    // Drop here so the owned stage releases the resources.
137    let _ = stage.pending_signals;
138    let _ = stage.events;
139    let _ = stage.observer_eviction_pending;
140
141    // 8. wall_remainder + local_tick advance.
142    instance.advance_wall_remainder(stage.wall_remainder_delta);
143    instance.advance_local_tick(stage.local_tick_delta);
144}
145
146/// Drop a `StepStage` without applying any of its deltas (rollback path).
147/// `_instance` is taken as `&mut` to mirror `apply_stage`'s signature so
148/// callers can pivot between the two without lifetime gymnastics.
149#[allow(clippy::needless_pass_by_value)]
150pub(crate) fn discard_stage(_instance: &mut Instance, _stage: StepStage) {
151    // Owned stage drops at function exit. No mutation to instance.
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157    use bytes::Bytes;
158
159    use crate::abi::{CapabilityMask, EntityId, InstanceId, Principal, RouteId, Tick, TypeCode};
160    use crate::runtime::stage::{IdCountersDelta, LedgerOp, ScheduledEntryDelta, StagedStateDelta};
161    use crate::state::{
162        EntityMeta, Instance, InstanceConfig, QuotaReductionPolicy, ScheduledActionId,
163        ScheduledEntry,
164    };
165
166    fn id(n: u64) -> InstanceId {
167        InstanceId::new(n).unwrap()
168    }
169    fn entity(n: u64) -> EntityId {
170        EntityId::new(n).unwrap()
171    }
172    fn cfg() -> InstanceConfig {
173        InstanceConfig {
174            default_caps: CapabilityMask::default(),
175            max_entities: 100,
176            max_scheduled: 1000,
177            memory_budget_bytes: 1 << 20,
178            parent: None,
179            quota_reduction: QuotaReductionPolicy::default(),
180        }
181    }
182
183    #[test]
184    fn apply_empty_stage_leaves_instance_untouched() {
185        let mut inst = Instance::new(id(1), cfg());
186        apply_stage(&mut inst, StepStage::default());
187        assert_eq!(inst.entities_len(), 0);
188        assert_eq!(inst.components_len(), 0);
189        assert_eq!(inst.local_tick(), 0);
190        assert_eq!(inst.wall_remainder(), 0);
191        assert_eq!(inst.ledger().total_entities(), 0);
192        assert_eq!(inst.id_counters().next_entity, 0);
193    }
194
195    #[test]
196    fn apply_spawn_entity() {
197        let mut inst = Instance::new(id(1), cfg());
198        let mut stage = StepStage::default();
199        stage.state_ops.push(StagedStateDelta::SpawnEntity {
200            id: entity(1),
201            meta: EntityMeta {
202                owner: Principal::System,
203                created: Tick(0),
204            },
205        });
206        apply_stage(&mut inst, stage);
207        assert_eq!(inst.entities_len(), 1);
208    }
209
210    #[test]
211    fn apply_despawn_entity() {
212        let mut inst = Instance::new(id(1), cfg());
213        let mut spawn = StepStage::default();
214        spawn.state_ops.push(StagedStateDelta::SpawnEntity {
215            id: entity(1),
216            meta: EntityMeta {
217                owner: Principal::System,
218                created: Tick(0),
219            },
220        });
221        apply_stage(&mut inst, spawn);
222        assert_eq!(inst.entities_len(), 1);
223
224        let mut despawn = StepStage::default();
225        despawn
226            .state_ops
227            .push(StagedStateDelta::DespawnEntity { id: entity(1) });
228        apply_stage(&mut inst, despawn);
229        assert_eq!(inst.entities_len(), 0);
230    }
231
232    #[test]
233    fn apply_set_and_remove_component() {
234        let mut inst = Instance::new(id(1), cfg());
235        let mut stage = StepStage::default();
236        stage.state_ops.push(StagedStateDelta::SetComponent {
237            entity: entity(1),
238            type_code: TypeCode(7),
239            bytes: Bytes::from_static(b"data"),
240            size: 4,
241        });
242        apply_stage(&mut inst, stage);
243        assert_eq!(inst.components_len(), 1);
244
245        let mut rm = StepStage::default();
246        rm.state_ops.push(StagedStateDelta::RemoveComponent {
247            entity: entity(1),
248            type_code: TypeCode(7),
249            size: 4,
250        });
251        apply_stage(&mut inst, rm);
252        assert_eq!(inst.components_len(), 0);
253    }
254
255    #[test]
256    fn apply_ledger_delta_balanced() {
257        let mut inst = Instance::new(id(1), cfg());
258        let mut stage = StepStage::default();
259        stage.ledger_delta.ops.push(LedgerOp::AddEntity(entity(1)));
260        stage.ledger_delta.ops.push(LedgerOp::AddComponent {
261            entity: entity(1),
262            type_code: TypeCode(1),
263            size: 100,
264        });
265        apply_stage(&mut inst, stage);
266        assert_eq!(inst.ledger().total_entities(), 1);
267        assert_eq!(inst.ledger().total_bytes(), 100);
268        assert_eq!(inst.ledger().entity_bytes(entity(1)), 100);
269    }
270
271    #[test]
272    fn apply_id_counters_advance() {
273        let mut inst = Instance::new(id(1), cfg());
274        let stage = StepStage {
275            id_counters: IdCountersDelta {
276                next_entity_advance: 5,
277                next_scheduled_advance: 3,
278                next_source_seq_advance: 7,
279            },
280            ..Default::default()
281        };
282        apply_stage(&mut inst, stage);
283        assert_eq!(inst.id_counters().next_entity, 5);
284        assert_eq!(inst.id_counters().next_scheduled, 3);
285        assert_eq!(inst.id_counters().next_source_seq, 7);
286    }
287
288    #[test]
289    fn apply_inflight_refs_positive_then_negative_to_zero() {
290        let mut inst = Instance::new(id(1), cfg());
291        let route = RouteId(42);
292
293        let up = StepStage {
294            inflight_refs_delta: [(route, 3)].into_iter().collect(),
295            ..Default::default()
296        };
297        apply_stage(&mut inst, up);
298        assert_eq!(inst.inflight_refs_for(route), 3);
299
300        let down = StepStage {
301            inflight_refs_delta: [(route, -3)].into_iter().collect(),
302            ..Default::default()
303        };
304        apply_stage(&mut inst, down);
305        assert_eq!(inst.inflight_refs_for(route), 0);
306        assert_eq!(inst.inflight_refs_len(), 0);
307    }
308
309    #[test]
310    fn apply_wall_and_local_tick_advance() {
311        let mut inst = Instance::new(id(1), cfg());
312        let stage = StepStage {
313            wall_remainder_delta: 12345,
314            local_tick_delta: 7,
315            ..Default::default()
316        };
317        apply_stage(&mut inst, stage);
318        assert_eq!(inst.wall_remainder(), 12345);
319        assert_eq!(inst.local_tick(), 7);
320    }
321
322    #[test]
323    fn apply_schedule_delta_add_inserts_into_scheduler() {
324        let mut inst = Instance::new(id(1), cfg());
325        let stage = StepStage {
326            schedule_deltas: vec![ScheduledEntryDelta::Add(ScheduledEntry {
327                id: ScheduledActionId::new(1).unwrap(),
328                at: Tick(10),
329                actor: None,
330                principal: Principal::System,
331                action_type_code: TypeCode(0),
332                action_bytes: vec![1, 2, 3],
333            })],
334            ..Default::default()
335        };
336        apply_stage(&mut inst, stage);
337        assert_eq!(inst.scheduler().len(), 1);
338    }
339
340    #[test]
341    fn discard_stage_is_noop() {
342        let mut inst = Instance::new(id(1), cfg());
343        let stage = StepStage {
344            state_ops: vec![StagedStateDelta::SpawnEntity {
345                id: entity(1),
346                meta: EntityMeta {
347                    owner: Principal::System,
348                    created: Tick(0),
349                },
350            }],
351            local_tick_delta: 99,
352            ..Default::default()
353        };
354        discard_stage(&mut inst, stage);
355        assert_eq!(inst.entities_len(), 0);
356        assert_eq!(inst.local_tick(), 0);
357    }
358}