plg_runtime/reactor.rs
1//! Tier-2 reactor ABI for `wasm32-unknown-unknown` (Cloudflare Workers / V8
2//! isolates). No WASI, no stdio/argv — the module *exports* functions a JS
3//! host calls over linear memory (docs/design/done/WASM_TIER2_PLAN.md A3):
4//!
5//! plg_init (emitted by the generated module) → builds
6//! the Machine, hands it to `plg_rt_set_machine`
7//! plg_rt_alloc(len) → ptr host writes the query bytes here
8//! plg_rt_run_query(ptr,len,…) → u64 packed (len<<32 | ptr) of a bson buffer
9//! plg_rt_free(ptr,len) host frees the result (or the query buffer)
10//! plg_rt_atom_name(id) → u64 packed (name_ptr<<32 | len) — the host
11//! resolves bson term atom ids to names. Reads
12//! the runtime interner (program + query atoms).
13//!
14//! Bson formatting and the query path are NOT duplicated here — both go
15//! through `crate::core` (solve) and `crate::wire` (the bson encoding), the
16//! single I/O-free core the WASI shell shares.
17//!
18//! ## Concurrency contract (D3 / WASM.md finding #2)
19//!
20//! **One in-flight query per isolate.** The program Machine is a single
21//! `static`; a V8 isolate is single-threaded, but one Worker can interleave
22//! async tasks, so the host must not call `plg_rt_run_query` again before the
23//! prior call returns. This matches typical Worker use (a request maps to a
24//! query) and avoids threading per-request state through the ABI.
25
26use crate::core::{self, QueryResult};
27use crate::machine::{Machine, OutputSink};
28use crate::wire::{Envelope, PLG_ENC_BSON, WireError};
29use std::alloc::{Layout, alloc, dealloc};
30use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering};
31
32/// Exact-`Layout` allocation keyed by byte length, so the host can free a
33/// buffer with just its length. NEVER `Vec::with_capacity`: a `Vec` may
34/// over-allocate, and the host frees by *requested* length, so an actual
35/// capacity > requested length corrupts the allocator (WASM.md finding #1 —
36/// this is the bug that aborted the spike's deep query; the reflexive reach
37/// for `Vec` is the trap).
38fn raw_alloc(len: usize) -> *mut u8 {
39 if len == 0 {
40 return std::ptr::NonNull::<u8>::dangling().as_ptr();
41 }
42 // SAFETY: len > 0; align 1 is always valid for bytes.
43 unsafe { alloc(Layout::from_size_align_unchecked(len, 1)) }
44}
45
46/// The program Machine, built once by the generated `plg_init` and reused for
47/// every query (cold-start-per-isolate; never freed — a teardown entry point
48/// would only be needed to swap a live isolate's program, WASM.md finding #8).
49/// wasm is single-threaded, so `Relaxed` is sufficient.
50static MACHINE: AtomicPtr<Machine> = AtomicPtr::new(std::ptr::null_mut());
51
52/// Module-default limits, captured from the Machine at init. A per-request `0`
53/// means "use the module default" — and because the reactor reuses ONE Machine
54/// across every request (and `reset_per_query` deliberately leaves the limit
55/// fields alone, correct for the CLI's set-once use), we must restore these
56/// explicitly each request. Otherwise `0` would inherit the *previous*
57/// request's value — a cross-request latch in exactly the reuse scenario this
58/// module exists for.
59static DEFAULT_STEP_LIMIT: AtomicU64 = AtomicU64::new(0);
60static DEFAULT_DEPTH_LIMIT: AtomicU64 = AtomicU64::new(0);
61
62/// # Safety
63/// Called once from the generated `plg_init` with the `plg_rt_init` result.
64#[unsafe(no_mangle)]
65pub unsafe extern "C" fn plg_rt_set_machine(m: *mut Machine) {
66 // No stdout in a V8 isolate: capture `write/1` output into the result bson
67 // (D4) instead of streaming it nowhere.
68 unsafe { (*m).output = OutputSink::Capture(String::new()) };
69 // Snapshot the limits codegen/`plg_init` baked in, so a per-request `0`
70 // restores them rather than latching the prior request's override.
71 DEFAULT_STEP_LIMIT.store(unsafe { (*m).step_limit }, Ordering::Relaxed);
72 DEFAULT_DEPTH_LIMIT.store(
73 unsafe { (*m).metacall_depth_limit } as u64,
74 Ordering::Relaxed,
75 );
76 MACHINE.store(m, Ordering::Relaxed);
77}
78
79/// Allocate a host-writable buffer in linear memory (query in / result out).
80#[unsafe(no_mangle)]
81pub extern "C" fn plg_rt_alloc(len: u32) -> *mut u8 {
82 raw_alloc(len as usize)
83}
84
85/// # Safety
86/// `ptr`/`len` must be exactly a prior `plg_rt_alloc`/`plg_rt_run_query` pair.
87/// `len == 0` no-ops to pair with `raw_alloc(0)`'s dangling sentinel (which was
88/// never really allocated); the two halves agree by convention, not by API.
89#[unsafe(no_mangle)]
90pub unsafe extern "C" fn plg_rt_free(ptr: *mut u8, len: u32) {
91 if len == 0 {
92 return;
93 }
94 unsafe { dealloc(ptr, Layout::from_size_align_unchecked(len as usize, 1)) };
95}
96
97/// Resolve an atom id to its name, packed `(name_ptr << 32) | byte_len`. The
98/// host reads `len` bytes at `ptr` to get the UTF-8 name. This reads the
99/// **runtime interner**, which holds program atoms AND query-introduced atoms
100/// (ids ≥ the compile-time table) — the static `@plg_atom_strs` table can't
101/// resolve the latter. Returns `0` for an out-of-range id (defensive; should
102/// not occur — bson atom ids always come from this interner).
103///
104/// # Safety
105/// Requires `plg_init`. The returned pointer is into the interner's heap and
106/// is stable for the duration of host decode (the interner is only mutated
107/// during `plg_rt_run_query`, never between queries) — the host must read the
108/// bytes before the next query.
109#[unsafe(no_mangle)]
110pub extern "C" fn plg_rt_atom_name(id: u32) -> u64 {
111 let m = unsafe { &*MACHINE.load(Ordering::Relaxed) };
112 match m.atoms.try_resolve(id) {
113 Some(s) => ((s.as_ptr() as u64) << 32) | (s.len() as u64),
114 None => 0,
115 }
116}
117
118/// Run one query (UTF-8 at `qptr..qptr+qlen`) and return packed
119/// `(len << 32) | ptr` of a bson byte buffer the host reads then frees via
120/// `plg_rt_free`. The packed return assumes **wasm32** (the pointer fits in the
121/// low 32 bits); wasm64 would need a wider/two-value result (WASM.md finding #7).
122///
123/// Per-request limits bound the query before the platform's CPU/wall limit does
124/// (WASM.md finding #5). All three mirror the CLI's knobs:
125/// - `limit`: max solutions; `0` = unbounded.
126/// - `step_limit`: step ceiling (`PLG_MAX_STEPS`); `0` = keep the module default.
127/// - `depth_limit`: metacall depth bound (`PLG_METACALL_DEPTH`); `0` = keep the
128/// default. Depth matters more on wasm: its ~1 MB stack is far smaller than
129/// native's ~8 MB.
130///
131/// # Safety
132/// Requires `plg_init` to have run first; `qptr`/`qlen` a valid buffer. See the
133/// module's single-in-flight concurrency contract.
134#[unsafe(no_mangle)]
135pub unsafe extern "C" fn plg_rt_run_query(
136 qptr: *const u8,
137 qlen: u32,
138 limit: u32,
139 step_limit: u64,
140 depth_limit: u32,
141) -> u64 {
142 let m = unsafe { &mut *MACHINE.load(Ordering::Relaxed) };
143 m.reset_per_query();
144 // Assign all three limits UNCONDITIONALLY: `reset_per_query` doesn't touch
145 // the limit fields (set-once for the CLI), so on the reused reactor Machine
146 // a non-zero arg overrides and `0` restores the module default — never the
147 // previous request's value.
148 m.solution_limit = if limit == 0 {
149 None
150 } else {
151 Some(limit as usize)
152 };
153 m.step_limit = if step_limit != 0 {
154 step_limit
155 } else {
156 DEFAULT_STEP_LIMIT.load(Ordering::Relaxed)
157 };
158 m.metacall_depth_limit = if depth_limit != 0 {
159 depth_limit as usize
160 } else {
161 DEFAULT_DEPTH_LIMIT.load(Ordering::Relaxed) as usize
162 };
163
164 let q = std::str::from_utf8(unsafe { std::slice::from_raw_parts(qptr, qlen as usize) })
165 .unwrap_or("");
166
167 let mut buf = Vec::new();
168 // Writes never fail (a `Vec` sink), so the `io::Result`s are infallible.
169 // The reactor emits bson (the binary machine format); the host glue
170 // decodes bson→JSON for HTTP clients, so the engine itself never
171 // serializes JSON. (docs/design/IO.md.)
172 match core::run_query(m, q) {
173 QueryResult::ParseError(msg) => {
174 let _ = (PLG_ENC_BSON.write_error)(&mut buf, &WireError::Parse(msg));
175 }
176 QueryResult::RuntimeError(msg) => {
177 let _ = (PLG_ENC_BSON.write_error)(&mut buf, &WireError::Runtime(msg));
178 }
179 QueryResult::Solutions => {
180 let exhausted = core::exhausted(m);
181 // Capture mode → `captured_output()` is always `Some` (`""` when
182 // nothing was written), so the result always carries an `output`
183 // field. Intended D4 contract: a stable shape for hosts, present
184 // even when empty.
185 let env = Envelope::from_machine(m, exhausted);
186 let _ = (PLG_ENC_BSON.write_envelope)(&mut buf, m, &env);
187 }
188 }
189
190 // Copy into an exact-Layout buffer so the host frees it with just `len`.
191 let out = raw_alloc(buf.len());
192 unsafe { std::ptr::copy_nonoverlapping(buf.as_ptr(), out, buf.len()) };
193 ((buf.len() as u64) << 32) | (out as u32 as u64)
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use plg_shared::StringInterner;
200
201 /// Build and register a Machine the way the generated `plg_init` does.
202 /// Leaks the Machine and any result buffers — fine for a test, and only
203 /// this test touches the `MACHINE`/`DEFAULT_*` statics, so no race.
204 fn install() -> *mut Machine {
205 let m = Box::into_raw(Machine::new(StringInterner::new(), Vec::new()));
206 // SAFETY: mirrors `plg_init` handing us a freshly built Machine.
207 unsafe { plg_rt_set_machine(m) };
208 m
209 }
210
211 fn run(q: &str, limit: u32, step: u64, depth: u32) {
212 let b = q.as_bytes();
213 // SAFETY: `b` is a valid UTF-8 buffer; a Machine is installed above.
214 let _ = unsafe { plg_rt_run_query(b.as_ptr(), b.len() as u32, limit, step, depth) };
215 }
216
217 #[test]
218 fn zero_limits_restore_module_default_not_previous_request() {
219 let m = install();
220 // The module defaults `plg_init`/`Machine::new` baked in.
221 let (def_step, def_depth) = unsafe { ((*m).step_limit, (*m).metacall_depth_limit) };
222
223 // Request 1: explicit non-default per-request limits.
224 run("x", 0, 5_000, 50);
225 unsafe {
226 assert_eq!((*m).step_limit, 5_000);
227 assert_eq!((*m).metacall_depth_limit, 50);
228 }
229
230 // Request 2: `0` => module default, NOT request 1's latched values.
231 run("x", 0, 0, 0);
232 unsafe {
233 assert_eq!(
234 (*m).step_limit,
235 def_step,
236 "step_limit must revert to the module default"
237 );
238 assert_eq!(
239 (*m).metacall_depth_limit,
240 def_depth,
241 "metacall_depth_limit must revert to the module default"
242 );
243 }
244 }
245}