Skip to main content

arkhe_kernel/runtime/
stage.rs

1//! `StepStage` — transactional COW staging for one `step()` (10 buckets).
2//!
3//! Every commit-conditional write the kernel performs during a step is
4//! buffered here; on commit, `Instance::apply_stage` drains in canonical
5//! canonical order; on rollback, the stage is dropped without effect.
6//!
7//! Buckets (10):
8//! 1. `state_ops` — entity/component mutations
9//! 2. `events` — KernelEvent emissions (kernel-level drain)
10//! 3. `schedule_deltas` — scheduler add/cancel
11//! 4. `pending_signals` — outbound IPC (kernel routes post-commit)
12//! 5. `id_counters` — monotonic ID counter advances
13//! 6. `ledger_delta` — ResourceLedger updates
14//! 7. `inflight_refs_delta` — drain-refcount per RouteId (signed delta)
15//! 8. `wall_remainder_delta` — sub-tick time accumulator advance
16//! 9. `local_tick_delta` — logical tick advance
17//! 10. `observer_eviction_pending` — observers slated for eviction
18
19use std::collections::{BTreeMap, VecDeque};
20
21use bytes::Bytes;
22use serde::{Deserialize, Serialize};
23
24use crate::abi::{EntityId, InstanceId, Principal, RouteId, TypeCode};
25use crate::state::{EntityMeta, ScheduledActionId, ScheduledEntry};
26
27use super::event::{KernelEvent, ObserverHandle};
28
29#[derive(Debug, Default, Clone, Serialize, Deserialize)]
30pub(crate) struct StepStage {
31    pub state_ops: Vec<StagedStateDelta>,
32    pub events: VecDeque<KernelEvent>,
33    pub schedule_deltas: Vec<ScheduledEntryDelta>,
34    pub pending_signals: Vec<PendingSignal>,
35    pub id_counters: IdCountersDelta,
36    pub ledger_delta: ResourceLedgerDelta,
37    pub inflight_refs_delta: BTreeMap<RouteId, i32>,
38    pub wall_remainder_delta: u128,
39    pub local_tick_delta: u64,
40    pub observer_eviction_pending: Vec<ObserverHandle>,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub(crate) enum StagedStateDelta {
45    SpawnEntity {
46        id: EntityId,
47        meta: EntityMeta,
48    },
49    DespawnEntity {
50        id: EntityId,
51    },
52    SetComponent {
53        entity: EntityId,
54        type_code: TypeCode,
55        bytes: Bytes,
56        size: u64,
57    },
58    RemoveComponent {
59        entity: EntityId,
60        type_code: TypeCode,
61        size: u64,
62    },
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub(crate) enum ScheduledEntryDelta {
67    Add(ScheduledEntry),
68    Remove(ScheduledActionId),
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub(crate) struct PendingSignal {
73    pub target: InstanceId,
74    pub route: RouteId,
75    pub payload: Bytes,
76    pub principal: Principal,
77}
78
79#[derive(Debug, Default, Clone, Serialize, Deserialize)]
80pub(crate) struct IdCountersDelta {
81    pub next_entity_advance: u64,
82    pub next_scheduled_advance: u64,
83    pub next_source_seq_advance: u64,
84}
85
86#[derive(Debug, Default, Clone, Serialize, Deserialize)]
87pub(crate) struct ResourceLedgerDelta {
88    pub ops: Vec<LedgerOp>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub(crate) enum LedgerOp {
93    AddEntity(EntityId),
94    RemoveEntity(EntityId),
95    AddComponent {
96        entity: EntityId,
97        type_code: TypeCode,
98        size: u64,
99    },
100    RemoveComponent {
101        entity: EntityId,
102        type_code: TypeCode,
103        size: u64,
104    },
105}
106
107/// Net component-byte delta accumulated in `stage.ledger_delta` so far.
108/// Used by `step()` budget enforcement to project the post-commit total
109/// before each Op is dispatched. `RemoveEntity` does not carry per-entity
110/// byte counts at this layer — its bytes are recovered from the ledger
111/// at apply time, so this helper is conservative (under-counts freed
112/// bytes within a single step), favoring false-deny over false-allow.
113pub(crate) fn bytes_delta(stage: &StepStage) -> i64 {
114    let mut d: i64 = 0;
115    for op in &stage.ledger_delta.ops {
116        match op {
117            LedgerOp::AddComponent { size, .. } => {
118                d = d.saturating_add(*size as i64);
119            }
120            LedgerOp::RemoveComponent { size, .. } => {
121                d = d.saturating_sub(*size as i64);
122            }
123            LedgerOp::AddEntity(_) | LedgerOp::RemoveEntity(_) => {}
124        }
125    }
126    d
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use crate::abi::Tick;
133
134    #[test]
135    fn step_stage_default_all_buckets_empty() {
136        let s = StepStage::default();
137        assert!(s.state_ops.is_empty());
138        assert!(s.events.is_empty());
139        assert!(s.schedule_deltas.is_empty());
140        assert!(s.pending_signals.is_empty());
141        assert_eq!(s.id_counters.next_entity_advance, 0);
142        assert_eq!(s.id_counters.next_scheduled_advance, 0);
143        assert_eq!(s.id_counters.next_source_seq_advance, 0);
144        assert!(s.ledger_delta.ops.is_empty());
145        assert!(s.inflight_refs_delta.is_empty());
146        assert_eq!(s.wall_remainder_delta, 0);
147        assert_eq!(s.local_tick_delta, 0);
148        assert!(s.observer_eviction_pending.is_empty());
149    }
150
151    #[test]
152    fn staged_state_delta_variants_clone() {
153        let id = EntityId::new(1).unwrap();
154        let meta = EntityMeta {
155            owner: Principal::System,
156            created: Tick(0),
157        };
158        let _ = StagedStateDelta::SpawnEntity { id, meta }.clone();
159        let _ = StagedStateDelta::DespawnEntity { id }.clone();
160        let _ = StagedStateDelta::SetComponent {
161            entity: id,
162            type_code: TypeCode(1),
163            bytes: Bytes::from_static(b"x"),
164            size: 1,
165        }
166        .clone();
167        let _ = StagedStateDelta::RemoveComponent {
168            entity: id,
169            type_code: TypeCode(1),
170            size: 1,
171        }
172        .clone();
173    }
174
175    #[test]
176    fn scheduled_entry_delta_variants() {
177        let entry = ScheduledEntry {
178            id: ScheduledActionId::new(1).unwrap(),
179            at: Tick(0),
180            actor: None,
181            principal: Principal::System,
182            action_type_code: TypeCode(0),
183            action_bytes: vec![],
184        };
185        let _ = ScheduledEntryDelta::Add(entry).clone();
186        let _ = ScheduledEntryDelta::Remove(ScheduledActionId::new(1).unwrap()).clone();
187    }
188
189    #[test]
190    fn ledger_op_variants() {
191        let id = EntityId::new(1).unwrap();
192        let _ = LedgerOp::AddEntity(id).clone();
193        let _ = LedgerOp::RemoveEntity(id).clone();
194        let _ = LedgerOp::AddComponent {
195            entity: id,
196            type_code: TypeCode(1),
197            size: 100,
198        }
199        .clone();
200        let _ = LedgerOp::RemoveComponent {
201            entity: id,
202            type_code: TypeCode(1),
203            size: 100,
204        }
205        .clone();
206    }
207
208    #[test]
209    fn pending_signal_construction() {
210        let s = PendingSignal {
211            target: InstanceId::new(1).unwrap(),
212            route: RouteId(1),
213            payload: Bytes::from_static(b"hello"),
214            principal: Principal::System,
215        };
216        assert_eq!(s.payload.len(), 5);
217    }
218}