Skip to main content

arkhe_forge_platform/
dispatcher.rs

1//! L2 service layer — drives forge actions through the kernel's
2//! authorize → dispatch → WAL append loop.
3//!
4//! `RuntimeService` wraps a [`Kernel`] (with WAL) and exposes a single
5//! `dispatch` method that takes a forge `ArkheAction`, postcard-encodes
6//! its canonical bytes, calls [`Kernel::submit`] + [`Kernel::step`] in
7//! one shot, and returns the kernel's `StepReport`. The kernel handles
8//! the L0 work — authorization, dispatch, WAL append — internally.
9//!
10//! Forge actions are made kernel-compatible by the
11//! `arkhe-forge-macros::ArkheAction` derive: it emits both the
12//! forge-side sealed-trait stack **and** the kernel-side `Sealed +
13//! ActionDeriv + ActionCompute` stack, with the kernel-side
14//! `ActionCompute::compute` body delegating to
15//! `arkhe_forge_core::bridge::kernel_compute`. The bridge runs the
16//! forge `compute()` on a fresh forge `ActionContext` and returns the
17//! drained `Vec<Op>` to the kernel.
18//!
19//! ## WAL export
20//!
21//! After one or more `dispatch` calls, the caller may extract the
22//! kernel's internal WAL via [`RuntimeService::export_wal`] (consumes
23//! the service). Each [`arkhe_kernel::WalRecord`] in the returned
24//! [`arkhe_kernel::Wal`] can be streamed into a
25//! [`crate::wal_export::BufferedWalSink`] via [`wal_to_sink`] for
26//! durable backup; the sink frames each record
27//! with the standard magic + length-prefix shape per the firm
28//! requirements pinned in `wal_export`.
29//!
30//! ## Current scope
31//!
32//! Manifest-driven authz policy, the PG-UNIQUE-INDEX-backed
33//! idempotency dedup, and full
34//! [`ActorHandleIndex`](arkhe_forge_core::context::ActorHandleIndex)
35//! production paths are not yet wired through `RuntimeService` — a
36//! forge action's idempotency / actor-handle paths run with the L1
37//! defaults (no view, no index). Callers who need those layers attach
38//! them through the forge `ActionContext` builder directly while the
39//! L2 layer matures.
40
41use arkhe_kernel::abi::{ArkheError, CapabilityMask, InstanceId, Principal, Tick};
42use arkhe_kernel::state::traits::Action;
43use arkhe_kernel::state::InstanceConfig;
44use arkhe_kernel::{Kernel, StepReport, Wal};
45
46use crate::wal_export::{BufferedWalSink, WalExportError, WalRecordSink};
47
48/// Errors surfaced by [`wal_to_sink`].
49#[derive(Debug, thiserror::Error)]
50#[non_exhaustive]
51pub enum WalSinkError {
52    /// `WalRecord` failed postcard encoding (should be unreachable —
53    /// `WalRecord` is `derive(Serialize)` on a stable wire shape).
54    #[error("WalRecord postcard encode failed: {0}")]
55    Encode(#[from] postcard::Error),
56    /// Sink rejected the framed record (length / append-only / overflow).
57    #[error("BufferedWalSink rejected record: {0}")]
58    Sink(#[from] WalExportError),
59}
60
61/// Service-layer wrapper around [`arkhe_kernel::Kernel`]. Builds a
62/// kernel with WAL configured and exposes a forge-shaped dispatch API.
63pub struct RuntimeService {
64    kernel: Kernel,
65}
66
67impl RuntimeService {
68    /// Construct a service backed by a chain-only WAL writer (L0
69    /// `SignatureClass::None`). `world_id` and `manifest_digest` are
70    /// pinned into the WAL header.
71    #[must_use]
72    pub fn new(world_id: [u8; 32], manifest_digest: [u8; 32]) -> Self {
73        Self {
74            kernel: Kernel::new_with_wal(world_id, manifest_digest),
75        }
76    }
77
78    /// Register a forge `ArkheAction` so the kernel will execute it
79    /// when scheduled. Any forge action whose type bears
80    /// `#[derive(ArkheAction)]` automatically satisfies the kernel
81    /// [`Action`] bound through the derive's emitted kernel-side
82    /// stack.
83    pub fn register_action<A: Action>(&mut self) {
84        self.kernel.register_action::<A>();
85    }
86
87    /// Create a fresh kernel instance and return its `InstanceId`.
88    pub fn create_instance(&mut self, config: InstanceConfig) -> InstanceId {
89        self.kernel.create_instance(config)
90    }
91
92    /// Dispatch a forge action — postcard-encode its canonical bytes,
93    /// submit at tick `at`, then step the kernel once with `caps`.
94    /// Returns the kernel's `StepReport` so the caller can inspect
95    /// `actions_executed` / `effects_applied` / `effects_denied`.
96    ///
97    /// # Errors
98    ///
99    /// Returns the kernel's [`ArkheError`] surface verbatim. The
100    /// most common case is `InstanceNotFound` if `instance` is not
101    /// live; capability denial happens inside `step` and is reflected
102    /// in the returned report's `effects_denied` count rather than as
103    /// an `Err` from this function.
104    pub fn dispatch<A>(
105        &mut self,
106        instance: InstanceId,
107        principal: Principal,
108        action: &A,
109        at: Tick,
110        caps: CapabilityMask,
111    ) -> Result<StepReport, ArkheError>
112    where
113        A: Action,
114    {
115        let bytes = action.canonical_bytes();
116        self.kernel
117            .submit(instance, principal, None, at, A::TYPE_CODE, bytes)?;
118        Ok(self.kernel.step(at, caps))
119    }
120
121    /// Drain the kernel's internal WAL (consumes the service so the
122    /// kernel cannot continue stepping after export).
123    #[must_use]
124    pub fn export_wal(self) -> Option<Wal> {
125        self.kernel.export_wal()
126    }
127}
128
129/// Append every record of `wal` into the buffered sink, then flush.
130/// Each record is postcard-serialized via the kernel's stable
131/// [`arkhe_kernel::WalRecord`] wire shape (DO NOT TOUCH #7 —
132/// `seq: u64` first declared field) and the sink frames with the
133/// standard magic + length-prefix per `wal_export`'s firm
134/// requirements.
135///
136/// # Errors
137///
138/// Returns [`WalSinkError::Encode`] if a record fails postcard
139/// serialization (unreachable in practice — `WalRecord` derives
140/// `Serialize` over a stable shape) or [`WalSinkError::Sink`] if the
141/// sink rejects the framed record (length, append-only, overflow).
142pub fn wal_to_sink<W: std::io::Write>(
143    wal: &Wal,
144    sink: &mut BufferedWalSink<W>,
145) -> Result<(), WalSinkError> {
146    for record in &wal.records {
147        let bytes = postcard::to_allocvec(record)?;
148        sink.append_record(&bytes)?;
149    }
150    sink.flush()?;
151    Ok(())
152}
153
154#[cfg(test)]
155#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
156mod tests {
157    use super::*;
158    use arkhe_kernel::abi::{Principal, Tick};
159
160    /// Smoke — `RuntimeService::new` returns a service whose underlying
161    /// kernel reports zero records (the WAL header has been pinned but
162    /// no `step` has fired yet).
163    #[test]
164    fn fresh_service_has_zero_wal_records() {
165        let svc = RuntimeService::new([0x11u8; 32], [0x22u8; 32]);
166        assert_eq!(svc.kernel.wal_record_count(), Some(0));
167    }
168
169    /// `create_instance` increments the kernel's instance count.
170    #[test]
171    fn create_instance_grows_kernel() {
172        let mut svc = RuntimeService::new([0u8; 32], [0u8; 32]);
173        let _id = svc.create_instance(InstanceConfig::default());
174        assert_eq!(svc.kernel.instances_len(), 1);
175    }
176
177    /// `dispatch` returns `InstanceNotFound` for an unregistered
178    /// instance — verifies the `Result` plumbing without needing a
179    /// concrete forge action in the platform-crate test scope (forge
180    /// actions live in forge-core and downstream crates).
181    #[test]
182    fn dispatch_unknown_instance_returns_instance_not_found() {
183        // Use a dummy kernel-Action via the kernel's own derive —
184        // platform crate sees only kernel surface, no forge-core dep
185        // in test scope (avoids cross-crate test churn).
186        use arkhe_kernel::abi::EntityId;
187        use arkhe_kernel::state::{ActionCompute, ActionContext, Op};
188        use arkhe_kernel::ArkheAction;
189        use serde::{Deserialize, Serialize};
190
191        #[derive(Serialize, Deserialize, ArkheAction)]
192        #[arkhe(type_code = 0x0001_5101, schema_version = 1)]
193        struct NoopAction;
194
195        impl ActionCompute for NoopAction {
196            fn compute(&self, _ctx: &ActionContext<'_>) -> Vec<Op> {
197                vec![Op::SpawnEntity {
198                    id: EntityId::new(1).unwrap(),
199                    owner: Principal::System,
200                }]
201            }
202        }
203
204        let mut svc = RuntimeService::new([0u8; 32], [0u8; 32]);
205        svc.register_action::<NoopAction>();
206        // No `create_instance` call — InstanceId(99) is not live.
207        let bogus = InstanceId::new(99).unwrap();
208        let result = svc.dispatch(
209            bogus,
210            Principal::System,
211            &NoopAction,
212            Tick(1),
213            CapabilityMask::SYSTEM,
214        );
215        assert!(matches!(result, Err(ArkheError::InstanceNotFound)));
216    }
217
218    /// Happy-path dispatch — register → create_instance → dispatch
219    /// returns `Ok(StepReport)` with `actions_executed = 1`.
220    #[test]
221    fn dispatch_happy_path_executes_one_action() {
222        use arkhe_kernel::abi::EntityId;
223        use arkhe_kernel::state::{ActionCompute, ActionContext, Op};
224        use arkhe_kernel::ArkheAction;
225        use serde::{Deserialize, Serialize};
226
227        #[derive(Serialize, Deserialize, ArkheAction)]
228        #[arkhe(type_code = 0x0001_5102, schema_version = 1)]
229        struct SpawnOne;
230
231        impl ActionCompute for SpawnOne {
232            fn compute(&self, _ctx: &ActionContext<'_>) -> Vec<Op> {
233                vec![Op::SpawnEntity {
234                    id: EntityId::new(1).unwrap(),
235                    owner: Principal::System,
236                }]
237            }
238        }
239
240        let mut svc = RuntimeService::new([0u8; 32], [0u8; 32]);
241        svc.register_action::<SpawnOne>();
242        let inst = svc.create_instance(InstanceConfig::default());
243        let report = svc
244            .dispatch(
245                inst,
246                Principal::System,
247                &SpawnOne,
248                Tick(0),
249                CapabilityMask::SYSTEM,
250            )
251            .expect("dispatch must succeed for live instance");
252        assert_eq!(report.actions_executed, 1);
253        assert_eq!(report.effects_applied, 1);
254        assert_eq!(report.effects_denied, 0);
255    }
256
257    /// `wal_to_sink` round-trips: dispatch one action, export WAL,
258    /// stream into `BufferedWalSink<Vec<u8>>` — sink buffer ends up
259    /// non-empty + starts with the stream-header magic.
260    #[test]
261    fn wal_to_sink_round_trips_single_record() {
262        use arkhe_kernel::abi::EntityId;
263        use arkhe_kernel::state::{ActionCompute, ActionContext, Op};
264        use arkhe_kernel::ArkheAction;
265        use serde::{Deserialize, Serialize};
266
267        #[derive(Serialize, Deserialize, ArkheAction)]
268        #[arkhe(type_code = 0x0001_5103, schema_version = 1)]
269        struct SpawnOne;
270
271        impl ActionCompute for SpawnOne {
272            fn compute(&self, _ctx: &ActionContext<'_>) -> Vec<Op> {
273                vec![Op::SpawnEntity {
274                    id: EntityId::new(1).unwrap(),
275                    owner: Principal::System,
276                }]
277            }
278        }
279
280        let mut svc = RuntimeService::new([0u8; 32], [0u8; 32]);
281        svc.register_action::<SpawnOne>();
282        let inst = svc.create_instance(InstanceConfig::default());
283        let _ = svc
284            .dispatch(
285                inst,
286                Principal::System,
287                &SpawnOne,
288                Tick(0),
289                CapabilityMask::SYSTEM,
290            )
291            .unwrap();
292
293        let wal = svc.export_wal().expect("WAL is configured");
294        assert_eq!(wal.records.len(), 1);
295
296        let mut buffer: Vec<u8> = Vec::new();
297        let mut sink = BufferedWalSink::new(&mut buffer);
298        wal_to_sink(&wal, &mut sink).expect("wal_to_sink must succeed");
299        // After flush the sink's internal buffer is empty; the writer
300        // (our `&mut buffer`) carries the bytes.
301        assert!(!buffer.is_empty(), "sink writer must hold framed bytes");
302        assert!(
303            buffer.starts_with(&crate::wal_export::STREAM_HEADER_MAGIC),
304            "sink stream must begin with ARKHEXP1 magic",
305        );
306    }
307
308    /// Multi-record dispatch + export: 3 ticks × 1 action each → 3
309    /// WAL records; `wal_to_sink` frames all three.
310    #[test]
311    fn wal_to_sink_handles_multi_record_stream() {
312        use arkhe_kernel::abi::EntityId;
313        use arkhe_kernel::state::{ActionCompute, ActionContext, Op};
314        use arkhe_kernel::ArkheAction;
315        use serde::{Deserialize, Serialize};
316
317        #[derive(Serialize, Deserialize, ArkheAction)]
318        #[arkhe(type_code = 0x0001_5104, schema_version = 1)]
319        struct SpawnAt(u64);
320
321        impl ActionCompute for SpawnAt {
322            fn compute(&self, _ctx: &ActionContext<'_>) -> Vec<Op> {
323                vec![Op::SpawnEntity {
324                    id: EntityId::new(self.0.max(1)).unwrap(),
325                    owner: Principal::System,
326                }]
327            }
328        }
329
330        let mut svc = RuntimeService::new([0u8; 32], [0u8; 32]);
331        svc.register_action::<SpawnAt>();
332        let inst = svc.create_instance(InstanceConfig::default());
333        for i in 1..=3 {
334            svc.dispatch(
335                inst,
336                Principal::System,
337                &SpawnAt(i),
338                Tick(i),
339                CapabilityMask::SYSTEM,
340            )
341            .unwrap();
342        }
343        let wal = svc.export_wal().expect("WAL configured");
344        assert_eq!(wal.records.len(), 3);
345
346        let mut buffer: Vec<u8> = Vec::new();
347        let mut sink = BufferedWalSink::new(&mut buffer);
348        wal_to_sink(&wal, &mut sink).unwrap();
349        assert!(!buffer.is_empty());
350        assert!(buffer.starts_with(&crate::wal_export::STREAM_HEADER_MAGIC));
351    }
352}