arkhe_forge_platform/dispatcher.rs
1//! L2 service layer — drives forge actions through the kernel's
2//! authorize → dispatch → WAL append loop.
3//!
4//! `RuntimeService` wraps a [`Kernel`] (with WAL) and exposes a single
5//! `dispatch` method that takes a forge `ArkheAction`, postcard-encodes
6//! its canonical bytes, calls [`Kernel::submit`] + [`Kernel::step`] in
7//! one shot, and returns the kernel's `StepReport`. The kernel handles
8//! the L0 work — authorization, dispatch, WAL append — internally.
9//!
10//! Forge actions are made kernel-compatible by the
11//! `arkhe-forge-macros::ArkheAction` derive: it emits both the
12//! forge-side sealed-trait stack **and** the kernel-side `Sealed +
13//! ActionDeriv + ActionCompute` stack, with the kernel-side
14//! `ActionCompute::compute` body delegating to
15//! `arkhe_forge_core::bridge::kernel_compute`. The bridge runs the
16//! forge `compute()` on a fresh forge `ActionContext` and returns the
17//! drained `Vec<Op>` to the kernel.
18//!
19//! ## WAL export
20//!
21//! After one or more `dispatch` calls, the caller may extract the
22//! kernel's internal WAL via [`RuntimeService::export_wal`] (consumes
23//! the service). Each [`arkhe_kernel::WalRecord`] in the returned
24//! [`arkhe_kernel::Wal`] can be streamed into a
25//! [`crate::wal_export::BufferedWalSink`] via [`wal_to_sink`] for
26//! durable backup; the sink frames each record
27//! with the standard magic + length-prefix shape per the firm
28//! requirements pinned in `wal_export`.
29//!
30//! ## Current scope
31//!
32//! Manifest-driven authz policy, the PG-UNIQUE-INDEX-backed
33//! idempotency dedup, and full
34//! [`ActorHandleIndex`](arkhe_forge_core::context::ActorHandleIndex)
35//! production paths are not yet wired through `RuntimeService` — a
36//! forge action's idempotency / actor-handle paths run with the L1
37//! defaults (no view, no index). Callers who need those layers attach
38//! them through the forge `ActionContext` builder directly while the
39//! L2 layer matures.
40
41use arkhe_kernel::abi::{ArkheError, CapabilityMask, InstanceId, Principal, Tick};
42use arkhe_kernel::state::traits::Action;
43use arkhe_kernel::state::InstanceConfig;
44use arkhe_kernel::{Kernel, StepReport, Wal};
45
46use crate::wal_export::{BufferedWalSink, WalExportError, WalRecordSink};
47
48/// Errors surfaced by [`wal_to_sink`].
49#[derive(Debug, thiserror::Error)]
50#[non_exhaustive]
51pub enum WalSinkError {
52 /// `WalRecord` failed postcard encoding (should be unreachable —
53 /// `WalRecord` is `derive(Serialize)` on a stable wire shape).
54 #[error("WalRecord postcard encode failed: {0}")]
55 Encode(#[from] postcard::Error),
56 /// Sink rejected the framed record (length / append-only / overflow).
57 #[error("BufferedWalSink rejected record: {0}")]
58 Sink(#[from] WalExportError),
59}
60
61/// Service-layer wrapper around [`arkhe_kernel::Kernel`]. Builds a
62/// kernel with WAL configured and exposes a forge-shaped dispatch API.
63pub struct RuntimeService {
64 kernel: Kernel,
65}
66
67impl RuntimeService {
68 /// Construct a service backed by a chain-only WAL writer (L0
69 /// `SignatureClass::None`). `world_id` and `manifest_digest` are
70 /// pinned into the WAL header.
71 #[must_use]
72 pub fn new(world_id: [u8; 32], manifest_digest: [u8; 32]) -> Self {
73 Self {
74 kernel: Kernel::new_with_wal(world_id, manifest_digest),
75 }
76 }
77
78 /// Register a forge `ArkheAction` so the kernel will execute it
79 /// when scheduled. Any forge action whose type bears
80 /// `#[derive(ArkheAction)]` automatically satisfies the kernel
81 /// [`Action`] bound through the derive's emitted kernel-side
82 /// stack.
83 pub fn register_action<A: Action>(&mut self) {
84 self.kernel.register_action::<A>();
85 }
86
87 /// Create a fresh kernel instance and return its `InstanceId`.
88 pub fn create_instance(&mut self, config: InstanceConfig) -> InstanceId {
89 self.kernel.create_instance(config)
90 }
91
92 /// Dispatch a forge action — postcard-encode its canonical bytes,
93 /// submit at tick `at`, then step the kernel once with `caps`.
94 /// Returns the kernel's `StepReport` so the caller can inspect
95 /// `actions_executed` / `effects_applied` / `effects_denied`.
96 ///
97 /// # Errors
98 ///
99 /// Returns the kernel's [`ArkheError`] surface verbatim. The
100 /// most common case is `InstanceNotFound` if `instance` is not
101 /// live; capability denial happens inside `step` and is reflected
102 /// in the returned report's `effects_denied` count rather than as
103 /// an `Err` from this function.
104 pub fn dispatch<A>(
105 &mut self,
106 instance: InstanceId,
107 principal: Principal,
108 action: &A,
109 at: Tick,
110 caps: CapabilityMask,
111 ) -> Result<StepReport, ArkheError>
112 where
113 A: Action,
114 {
115 let bytes = action.canonical_bytes();
116 self.kernel
117 .submit(instance, principal, None, at, A::TYPE_CODE, bytes)?;
118 Ok(self.kernel.step(at, caps))
119 }
120
121 /// Drain the kernel's internal WAL (consumes the service so the
122 /// kernel cannot continue stepping after export).
123 #[must_use]
124 pub fn export_wal(self) -> Option<Wal> {
125 self.kernel.export_wal()
126 }
127}
128
129/// Append every record of `wal` into the buffered sink, then flush.
130/// Each record is postcard-serialized via the kernel's stable
131/// [`arkhe_kernel::WalRecord`] wire shape (DO NOT TOUCH #7 —
132/// `seq: u64` first declared field) and the sink frames with the
133/// standard magic + length-prefix per `wal_export`'s firm
134/// requirements.
135///
136/// # Errors
137///
138/// Returns [`WalSinkError::Encode`] if a record fails postcard
139/// serialization (unreachable in practice — `WalRecord` derives
140/// `Serialize` over a stable shape) or [`WalSinkError::Sink`] if the
141/// sink rejects the framed record (length, append-only, overflow).
142pub fn wal_to_sink<W: std::io::Write>(
143 wal: &Wal,
144 sink: &mut BufferedWalSink<W>,
145) -> Result<(), WalSinkError> {
146 for record in &wal.records {
147 let bytes = postcard::to_allocvec(record)?;
148 sink.append_record(&bytes)?;
149 }
150 sink.flush()?;
151 Ok(())
152}
153
154#[cfg(test)]
155#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
156mod tests {
157 use super::*;
158 use arkhe_kernel::abi::{Principal, Tick};
159
160 /// Smoke — `RuntimeService::new` returns a service whose underlying
161 /// kernel reports zero records (the WAL header has been pinned but
162 /// no `step` has fired yet).
163 #[test]
164 fn fresh_service_has_zero_wal_records() {
165 let svc = RuntimeService::new([0x11u8; 32], [0x22u8; 32]);
166 assert_eq!(svc.kernel.wal_record_count(), Some(0));
167 }
168
169 /// `create_instance` increments the kernel's instance count.
170 #[test]
171 fn create_instance_grows_kernel() {
172 let mut svc = RuntimeService::new([0u8; 32], [0u8; 32]);
173 let _id = svc.create_instance(InstanceConfig::default());
174 assert_eq!(svc.kernel.instances_len(), 1);
175 }
176
177 /// `dispatch` returns `InstanceNotFound` for an unregistered
178 /// instance — verifies the `Result` plumbing without needing a
179 /// concrete forge action in the platform-crate test scope (forge
180 /// actions live in forge-core and downstream crates).
181 #[test]
182 fn dispatch_unknown_instance_returns_instance_not_found() {
183 // Use a dummy kernel-Action via the kernel's own derive —
184 // platform crate sees only kernel surface, no forge-core dep
185 // in test scope (avoids cross-crate test churn).
186 use arkhe_kernel::abi::EntityId;
187 use arkhe_kernel::state::{ActionCompute, ActionContext, Op};
188 use arkhe_kernel::ArkheAction;
189 use serde::{Deserialize, Serialize};
190
191 #[derive(Serialize, Deserialize, ArkheAction)]
192 #[arkhe(type_code = 0x0001_5101, schema_version = 1)]
193 struct NoopAction;
194
195 impl ActionCompute for NoopAction {
196 fn compute(&self, _ctx: &ActionContext<'_>) -> Vec<Op> {
197 vec![Op::SpawnEntity {
198 id: EntityId::new(1).unwrap(),
199 owner: Principal::System,
200 }]
201 }
202 }
203
204 let mut svc = RuntimeService::new([0u8; 32], [0u8; 32]);
205 svc.register_action::<NoopAction>();
206 // No `create_instance` call — InstanceId(99) is not live.
207 let bogus = InstanceId::new(99).unwrap();
208 let result = svc.dispatch(
209 bogus,
210 Principal::System,
211 &NoopAction,
212 Tick(1),
213 CapabilityMask::SYSTEM,
214 );
215 assert!(matches!(result, Err(ArkheError::InstanceNotFound)));
216 }
217
218 /// Happy-path dispatch — register → create_instance → dispatch
219 /// returns `Ok(StepReport)` with `actions_executed = 1`.
220 #[test]
221 fn dispatch_happy_path_executes_one_action() {
222 use arkhe_kernel::abi::EntityId;
223 use arkhe_kernel::state::{ActionCompute, ActionContext, Op};
224 use arkhe_kernel::ArkheAction;
225 use serde::{Deserialize, Serialize};
226
227 #[derive(Serialize, Deserialize, ArkheAction)]
228 #[arkhe(type_code = 0x0001_5102, schema_version = 1)]
229 struct SpawnOne;
230
231 impl ActionCompute for SpawnOne {
232 fn compute(&self, _ctx: &ActionContext<'_>) -> Vec<Op> {
233 vec![Op::SpawnEntity {
234 id: EntityId::new(1).unwrap(),
235 owner: Principal::System,
236 }]
237 }
238 }
239
240 let mut svc = RuntimeService::new([0u8; 32], [0u8; 32]);
241 svc.register_action::<SpawnOne>();
242 let inst = svc.create_instance(InstanceConfig::default());
243 let report = svc
244 .dispatch(
245 inst,
246 Principal::System,
247 &SpawnOne,
248 Tick(0),
249 CapabilityMask::SYSTEM,
250 )
251 .expect("dispatch must succeed for live instance");
252 assert_eq!(report.actions_executed, 1);
253 assert_eq!(report.effects_applied, 1);
254 assert_eq!(report.effects_denied, 0);
255 }
256
257 /// `wal_to_sink` round-trips: dispatch one action, export WAL,
258 /// stream into `BufferedWalSink<Vec<u8>>` — sink buffer ends up
259 /// non-empty + starts with the stream-header magic.
260 #[test]
261 fn wal_to_sink_round_trips_single_record() {
262 use arkhe_kernel::abi::EntityId;
263 use arkhe_kernel::state::{ActionCompute, ActionContext, Op};
264 use arkhe_kernel::ArkheAction;
265 use serde::{Deserialize, Serialize};
266
267 #[derive(Serialize, Deserialize, ArkheAction)]
268 #[arkhe(type_code = 0x0001_5103, schema_version = 1)]
269 struct SpawnOne;
270
271 impl ActionCompute for SpawnOne {
272 fn compute(&self, _ctx: &ActionContext<'_>) -> Vec<Op> {
273 vec![Op::SpawnEntity {
274 id: EntityId::new(1).unwrap(),
275 owner: Principal::System,
276 }]
277 }
278 }
279
280 let mut svc = RuntimeService::new([0u8; 32], [0u8; 32]);
281 svc.register_action::<SpawnOne>();
282 let inst = svc.create_instance(InstanceConfig::default());
283 let _ = svc
284 .dispatch(
285 inst,
286 Principal::System,
287 &SpawnOne,
288 Tick(0),
289 CapabilityMask::SYSTEM,
290 )
291 .unwrap();
292
293 let wal = svc.export_wal().expect("WAL is configured");
294 assert_eq!(wal.records.len(), 1);
295
296 let mut buffer: Vec<u8> = Vec::new();
297 let mut sink = BufferedWalSink::new(&mut buffer);
298 wal_to_sink(&wal, &mut sink).expect("wal_to_sink must succeed");
299 // After flush the sink's internal buffer is empty; the writer
300 // (our `&mut buffer`) carries the bytes.
301 assert!(!buffer.is_empty(), "sink writer must hold framed bytes");
302 assert!(
303 buffer.starts_with(&crate::wal_export::STREAM_HEADER_MAGIC),
304 "sink stream must begin with ARKHEXP1 magic",
305 );
306 }
307
308 /// Multi-record dispatch + export: 3 ticks × 1 action each → 3
309 /// WAL records; `wal_to_sink` frames all three.
310 #[test]
311 fn wal_to_sink_handles_multi_record_stream() {
312 use arkhe_kernel::abi::EntityId;
313 use arkhe_kernel::state::{ActionCompute, ActionContext, Op};
314 use arkhe_kernel::ArkheAction;
315 use serde::{Deserialize, Serialize};
316
317 #[derive(Serialize, Deserialize, ArkheAction)]
318 #[arkhe(type_code = 0x0001_5104, schema_version = 1)]
319 struct SpawnAt(u64);
320
321 impl ActionCompute for SpawnAt {
322 fn compute(&self, _ctx: &ActionContext<'_>) -> Vec<Op> {
323 vec![Op::SpawnEntity {
324 id: EntityId::new(self.0.max(1)).unwrap(),
325 owner: Principal::System,
326 }]
327 }
328 }
329
330 let mut svc = RuntimeService::new([0u8; 32], [0u8; 32]);
331 svc.register_action::<SpawnAt>();
332 let inst = svc.create_instance(InstanceConfig::default());
333 for i in 1..=3 {
334 svc.dispatch(
335 inst,
336 Principal::System,
337 &SpawnAt(i),
338 Tick(i),
339 CapabilityMask::SYSTEM,
340 )
341 .unwrap();
342 }
343 let wal = svc.export_wal().expect("WAL configured");
344 assert_eq!(wal.records.len(), 3);
345
346 let mut buffer: Vec<u8> = Vec::new();
347 let mut sink = BufferedWalSink::new(&mut buffer);
348 wal_to_sink(&wal, &mut sink).unwrap();
349 assert!(!buffer.is_empty());
350 assert!(buffer.starts_with(&crate::wal_export::STREAM_HEADER_MAGIC));
351 }
352}