Skip to main content

plg_runtime/
reactor.rs

1//! Tier-2 reactor ABI for `wasm32-unknown-unknown` (Cloudflare Workers / V8
2//! isolates). No WASI, no stdio/argv — the module *exports* functions a JS
3//! host calls over linear memory (docs/design/done/WASM_TIER2_PLAN.md A3):
4//!
5//!   plg_init                       (emitted by the generated module) → builds
6//!                                  the Machine, hands it to `plg_rt_set_machine`
7//!   plg_rt_alloc(len) → ptr        host writes the query bytes here
8//!   plg_rt_run_query(ptr,len,…) → u64   packed (len<<32 | ptr) of a bson buffer
9//!   plg_rt_free(ptr,len)           host frees the result (or the query buffer)
10//!   plg_rt_atom_name(id) → u64      packed (name_ptr<<32 | len) — the host
11//!                                  resolves bson term atom ids to names. Reads
12//!                                  the runtime interner (program + query atoms).
13//!
14//! Bson formatting and the query path are NOT duplicated here — both go
15//! through `crate::core` (solve) and `crate::wire` (the bson encoding), the
16//! single I/O-free core the WASI shell shares.
17//!
18//! ## Concurrency contract (D3 / WASM.md finding #2)
19//!
20//! **One in-flight query per isolate.** The program Machine is a single
21//! `static`; a V8 isolate is single-threaded, but one Worker can interleave
22//! async tasks, so the host must not call `plg_rt_run_query` again before the
23//! prior call returns. This matches typical Worker use (a request maps to a
24//! query) and avoids threading per-request state through the ABI.
25
26use crate::core::{self, QueryResult};
27use crate::machine::{Machine, OutputSink};
28use crate::wire::{Envelope, PLG_ENC_BSON, WireError};
29use std::alloc::{Layout, alloc, dealloc};
30use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering};
31
32/// Exact-`Layout` allocation keyed by byte length, so the host can free a
33/// buffer with just its length. NEVER `Vec::with_capacity`: a `Vec` may
34/// over-allocate, and the host frees by *requested* length, so an actual
35/// capacity > requested length corrupts the allocator (WASM.md finding #1 —
36/// this is the bug that aborted the spike's deep query; the reflexive reach
37/// for `Vec` is the trap).
38fn raw_alloc(len: usize) -> *mut u8 {
39    if len == 0 {
40        return std::ptr::NonNull::<u8>::dangling().as_ptr();
41    }
42    // SAFETY: len > 0; align 1 is always valid for bytes.
43    unsafe { alloc(Layout::from_size_align_unchecked(len, 1)) }
44}
45
46/// The program Machine, built once by the generated `plg_init` and reused for
47/// every query (cold-start-per-isolate; never freed — a teardown entry point
48/// would only be needed to swap a live isolate's program, WASM.md finding #8).
49/// wasm is single-threaded, so `Relaxed` is sufficient.
50static MACHINE: AtomicPtr<Machine> = AtomicPtr::new(std::ptr::null_mut());
51
52/// Module-default limits, captured from the Machine at init. A per-request `0`
53/// means "use the module default" — and because the reactor reuses ONE Machine
54/// across every request (and `reset_per_query` deliberately leaves the limit
55/// fields alone, correct for the CLI's set-once use), we must restore these
56/// explicitly each request. Otherwise `0` would inherit the *previous*
57/// request's value — a cross-request latch in exactly the reuse scenario this
58/// module exists for.
59static DEFAULT_STEP_LIMIT: AtomicU64 = AtomicU64::new(0);
60static DEFAULT_DEPTH_LIMIT: AtomicU64 = AtomicU64::new(0);
61
62/// # Safety
63/// Called once from the generated `plg_init` with the `plg_rt_init` result.
64#[unsafe(no_mangle)]
65pub unsafe extern "C" fn plg_rt_set_machine(m: *mut Machine) {
66    // No stdout in a V8 isolate: capture `write/1` output into the result bson
67    // (D4) instead of streaming it nowhere.
68    unsafe { (*m).output = OutputSink::Capture(String::new()) };
69    // Snapshot the limits codegen/`plg_init` baked in, so a per-request `0`
70    // restores them rather than latching the prior request's override.
71    DEFAULT_STEP_LIMIT.store(unsafe { (*m).step_limit }, Ordering::Relaxed);
72    DEFAULT_DEPTH_LIMIT.store(
73        unsafe { (*m).metacall_depth_limit } as u64,
74        Ordering::Relaxed,
75    );
76    MACHINE.store(m, Ordering::Relaxed);
77}
78
79/// Allocate a host-writable buffer in linear memory (query in / result out).
80#[unsafe(no_mangle)]
81pub extern "C" fn plg_rt_alloc(len: u32) -> *mut u8 {
82    raw_alloc(len as usize)
83}
84
85/// # Safety
86/// `ptr`/`len` must be exactly a prior `plg_rt_alloc`/`plg_rt_run_query` pair.
87/// `len == 0` no-ops to pair with `raw_alloc(0)`'s dangling sentinel (which was
88/// never really allocated); the two halves agree by convention, not by API.
89#[unsafe(no_mangle)]
90pub unsafe extern "C" fn plg_rt_free(ptr: *mut u8, len: u32) {
91    if len == 0 {
92        return;
93    }
94    unsafe { dealloc(ptr, Layout::from_size_align_unchecked(len as usize, 1)) };
95}
96
97/// Resolve an atom id to its name, packed `(name_ptr << 32) | byte_len`. The
98/// host reads `len` bytes at `ptr` to get the UTF-8 name. This reads the
99/// **runtime interner**, which holds program atoms AND query-introduced atoms
100/// (ids ≥ the compile-time table) — the static `@plg_atom_strs` table can't
101/// resolve the latter. Returns `0` for an out-of-range id (defensive; should
102/// not occur — bson atom ids always come from this interner).
103///
104/// # Safety
105/// Requires `plg_init`. The returned pointer is into the interner's heap and
106/// is stable for the duration of host decode (the interner is only mutated
107/// during `plg_rt_run_query`, never between queries) — the host must read the
108/// bytes before the next query.
109#[unsafe(no_mangle)]
110pub extern "C" fn plg_rt_atom_name(id: u32) -> u64 {
111    let m = unsafe { &*MACHINE.load(Ordering::Relaxed) };
112    match m.atoms.try_resolve(id) {
113        Some(s) => ((s.as_ptr() as u64) << 32) | (s.len() as u64),
114        None => 0,
115    }
116}
117
118/// Run one query (UTF-8 at `qptr..qptr+qlen`) and return packed
119/// `(len << 32) | ptr` of a bson byte buffer the host reads then frees via
120/// `plg_rt_free`. The packed return assumes **wasm32** (the pointer fits in the
121/// low 32 bits); wasm64 would need a wider/two-value result (WASM.md finding #7).
122///
123/// Per-request limits bound the query before the platform's CPU/wall limit does
124/// (WASM.md finding #5). All three mirror the CLI's knobs:
125/// - `limit`: max solutions; `0` = unbounded.
126/// - `step_limit`: step ceiling (`PLG_MAX_STEPS`); `0` = keep the module default.
127/// - `depth_limit`: metacall depth bound (`PLG_METACALL_DEPTH`); `0` = keep the
128///   default. Depth matters more on wasm: its ~1 MB stack is far smaller than
129///   native's ~8 MB.
130///
131/// # Safety
132/// Requires `plg_init` to have run first; `qptr`/`qlen` a valid buffer. See the
133/// module's single-in-flight concurrency contract.
134#[unsafe(no_mangle)]
135pub unsafe extern "C" fn plg_rt_run_query(
136    qptr: *const u8,
137    qlen: u32,
138    limit: u32,
139    step_limit: u64,
140    depth_limit: u32,
141) -> u64 {
142    let m = unsafe { &mut *MACHINE.load(Ordering::Relaxed) };
143    m.reset_per_query();
144    // Assign all three limits UNCONDITIONALLY: `reset_per_query` doesn't touch
145    // the limit fields (set-once for the CLI), so on the reused reactor Machine
146    // a non-zero arg overrides and `0` restores the module default — never the
147    // previous request's value.
148    m.solution_limit = if limit == 0 {
149        None
150    } else {
151        Some(limit as usize)
152    };
153    m.step_limit = if step_limit != 0 {
154        step_limit
155    } else {
156        DEFAULT_STEP_LIMIT.load(Ordering::Relaxed)
157    };
158    m.metacall_depth_limit = if depth_limit != 0 {
159        depth_limit as usize
160    } else {
161        DEFAULT_DEPTH_LIMIT.load(Ordering::Relaxed) as usize
162    };
163
164    let q = std::str::from_utf8(unsafe { std::slice::from_raw_parts(qptr, qlen as usize) })
165        .unwrap_or("");
166
167    let mut buf = Vec::new();
168    // Writes never fail (a `Vec` sink), so the `io::Result`s are infallible.
169    // The reactor emits bson (the binary machine format); the host glue
170    // decodes bson→JSON for HTTP clients, so the engine itself never
171    // serializes JSON. (docs/design/IO.md.)
172    match core::run_query(m, q) {
173        QueryResult::ParseError(msg) => {
174            let _ = (PLG_ENC_BSON.write_error)(&mut buf, &WireError::Parse(msg));
175        }
176        QueryResult::RuntimeError(msg) => {
177            let _ = (PLG_ENC_BSON.write_error)(&mut buf, &WireError::Runtime(msg));
178        }
179        QueryResult::Solutions => {
180            let exhausted = core::exhausted(m);
181            // Capture mode → `captured_output()` is always `Some` (`""` when
182            // nothing was written), so the result always carries an `output`
183            // field. Intended D4 contract: a stable shape for hosts, present
184            // even when empty.
185            let env = Envelope::from_machine(m, exhausted);
186            let _ = (PLG_ENC_BSON.write_envelope)(&mut buf, m, &env);
187        }
188    }
189
190    // Copy into an exact-Layout buffer so the host frees it with just `len`.
191    let out = raw_alloc(buf.len());
192    unsafe { std::ptr::copy_nonoverlapping(buf.as_ptr(), out, buf.len()) };
193    ((buf.len() as u64) << 32) | (out as u32 as u64)
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use plg_shared::StringInterner;
200
201    /// Build and register a Machine the way the generated `plg_init` does.
202    /// Leaks the Machine and any result buffers — fine for a test, and only
203    /// this test touches the `MACHINE`/`DEFAULT_*` statics, so no race.
204    fn install() -> *mut Machine {
205        let m = Box::into_raw(Machine::new(StringInterner::new(), Vec::new()));
206        // SAFETY: mirrors `plg_init` handing us a freshly built Machine.
207        unsafe { plg_rt_set_machine(m) };
208        m
209    }
210
211    fn run(q: &str, limit: u32, step: u64, depth: u32) {
212        let b = q.as_bytes();
213        // SAFETY: `b` is a valid UTF-8 buffer; a Machine is installed above.
214        let _ = unsafe { plg_rt_run_query(b.as_ptr(), b.len() as u32, limit, step, depth) };
215    }
216
217    #[test]
218    fn zero_limits_restore_module_default_not_previous_request() {
219        let m = install();
220        // The module defaults `plg_init`/`Machine::new` baked in.
221        let (def_step, def_depth) = unsafe { ((*m).step_limit, (*m).metacall_depth_limit) };
222
223        // Request 1: explicit non-default per-request limits.
224        run("x", 0, 5_000, 50);
225        unsafe {
226            assert_eq!((*m).step_limit, 5_000);
227            assert_eq!((*m).metacall_depth_limit, 50);
228        }
229
230        // Request 2: `0` => module default, NOT request 1's latched values.
231        run("x", 0, 0, 0);
232        unsafe {
233            assert_eq!(
234                (*m).step_limit,
235                def_step,
236                "step_limit must revert to the module default"
237            );
238            assert_eq!(
239                (*m).metacall_depth_limit,
240                def_depth,
241                "metacall_depth_limit must revert to the module default"
242            );
243        }
244    }
245}