Skip to main content

plg_runtime/
reactor.rs

1//! Tier-2 reactor ABI for `wasm32-unknown-unknown` (Cloudflare Workers / V8
2//! isolates). No WASI, no stdio/argv — the module *exports* functions a JS
3//! host calls over linear memory (docs/design/done/WASM_TIER2_PLAN.md A3):
4//!
5//!   plg_init                       (emitted by the generated module) → builds
6//!                                  the Machine, hands it to `plg_rt_set_machine`
7//!   plg_rt_alloc(len) → ptr        host writes the query bytes here
8//!   plg_rt_run_query(ptr,len,…) → u64   packed (len<<32 | ptr) of a JSON buffer
9//!   plg_rt_free(ptr,len)           host frees the result (or the query buffer)
10//!
11//! JSON formatting and the query path are NOT duplicated here — both go
12//! through `crate::core`, the single I/O-free core the WASI shell shares.
13//!
14//! ## Concurrency contract (D3 / WASM.md finding #2)
15//!
16//! **One in-flight query per isolate.** The program Machine is a single
17//! `static`; a V8 isolate is single-threaded, but one Worker can interleave
18//! async tasks, so the host must not call `plg_rt_run_query` again before the
19//! prior call returns. This matches typical Worker use (a request maps to a
20//! query) and avoids threading per-request state through the ABI.
21
22use crate::core::{self, QueryResult};
23use crate::machine::{Machine, OutputSink};
24use std::alloc::{Layout, alloc, dealloc};
25use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering};
26
27/// Exact-`Layout` allocation keyed by byte length, so the host can free a
28/// buffer with just its length. NEVER `Vec::with_capacity`: a `Vec` may
29/// over-allocate, and the host frees by *requested* length, so an actual
30/// capacity > requested length corrupts the allocator (WASM.md finding #1 —
31/// this is the bug that aborted the spike's deep query; the reflexive reach
32/// for `Vec` is the trap).
33fn raw_alloc(len: usize) -> *mut u8 {
34    if len == 0 {
35        return std::ptr::NonNull::<u8>::dangling().as_ptr();
36    }
37    // SAFETY: len > 0; align 1 is always valid for bytes.
38    unsafe { alloc(Layout::from_size_align_unchecked(len, 1)) }
39}
40
41/// The program Machine, built once by the generated `plg_init` and reused for
42/// every query (cold-start-per-isolate; never freed — a teardown entry point
43/// would only be needed to swap a live isolate's program, WASM.md finding #8).
44/// wasm is single-threaded, so `Relaxed` is sufficient.
45static MACHINE: AtomicPtr<Machine> = AtomicPtr::new(std::ptr::null_mut());
46
47/// Module-default limits, captured from the Machine at init. A per-request `0`
48/// means "use the module default" — and because the reactor reuses ONE Machine
49/// across every request (and `reset_per_query` deliberately leaves the limit
50/// fields alone, correct for the CLI's set-once use), we must restore these
51/// explicitly each request. Otherwise `0` would inherit the *previous*
52/// request's value — a cross-request latch in exactly the reuse scenario this
53/// module exists for.
54static DEFAULT_STEP_LIMIT: AtomicU64 = AtomicU64::new(0);
55static DEFAULT_DEPTH_LIMIT: AtomicU64 = AtomicU64::new(0);
56
57/// # Safety
58/// Called once from the generated `plg_init` with the `plg_rt_init` result.
59#[unsafe(no_mangle)]
60pub unsafe extern "C" fn plg_rt_set_machine(m: *mut Machine) {
61    // No stdout in a V8 isolate: capture `write/1` output into the result JSON
62    // (D4) instead of streaming it nowhere.
63    unsafe { (*m).output = OutputSink::Capture(String::new()) };
64    // Snapshot the limits codegen/`plg_init` baked in, so a per-request `0`
65    // restores them rather than latching the prior request's override.
66    DEFAULT_STEP_LIMIT.store(unsafe { (*m).step_limit }, Ordering::Relaxed);
67    DEFAULT_DEPTH_LIMIT.store(
68        unsafe { (*m).metacall_depth_limit } as u64,
69        Ordering::Relaxed,
70    );
71    MACHINE.store(m, Ordering::Relaxed);
72}
73
74/// Allocate a host-writable buffer in linear memory (query in / result out).
75#[unsafe(no_mangle)]
76pub extern "C" fn plg_rt_alloc(len: u32) -> *mut u8 {
77    raw_alloc(len as usize)
78}
79
80/// # Safety
81/// `ptr`/`len` must be exactly a prior `plg_rt_alloc`/`plg_rt_run_query` pair.
82/// `len == 0` no-ops to pair with `raw_alloc(0)`'s dangling sentinel (which was
83/// never really allocated); the two halves agree by convention, not by API.
84#[unsafe(no_mangle)]
85pub unsafe extern "C" fn plg_rt_free(ptr: *mut u8, len: u32) {
86    if len == 0 {
87        return;
88    }
89    unsafe { dealloc(ptr, Layout::from_size_align_unchecked(len as usize, 1)) };
90}
91
92/// Run one query (UTF-8 at `qptr..qptr+qlen`) and return packed
93/// `(len << 32) | ptr` of a JSON byte buffer the host reads then frees via
94/// `plg_rt_free`. The packed return assumes **wasm32** (the pointer fits in the
95/// low 32 bits); wasm64 would need a wider/two-value result (WASM.md finding #7).
96///
97/// Per-request limits bound the query before the platform's CPU/wall limit does
98/// (WASM.md finding #5). All three mirror the CLI's knobs:
99/// - `limit`: max solutions; `0` = unbounded.
100/// - `step_limit`: step ceiling (`PLG_MAX_STEPS`); `0` = keep the module default.
101/// - `depth_limit`: metacall depth bound (`PLG_METACALL_DEPTH`); `0` = keep the
102///   default. Depth matters more on wasm: its ~1 MB stack is far smaller than
103///   native's ~8 MB.
104///
105/// # Safety
106/// Requires `plg_init` to have run first; `qptr`/`qlen` a valid buffer. See the
107/// module's single-in-flight concurrency contract.
108#[unsafe(no_mangle)]
109pub unsafe extern "C" fn plg_rt_run_query(
110    qptr: *const u8,
111    qlen: u32,
112    limit: u32,
113    step_limit: u64,
114    depth_limit: u32,
115) -> u64 {
116    let m = unsafe { &mut *MACHINE.load(Ordering::Relaxed) };
117    m.reset_per_query();
118    // Assign all three limits UNCONDITIONALLY: `reset_per_query` doesn't touch
119    // the limit fields (set-once for the CLI), so on the reused reactor Machine
120    // a non-zero arg overrides and `0` restores the module default — never the
121    // previous request's value.
122    m.solution_limit = if limit == 0 {
123        None
124    } else {
125        Some(limit as usize)
126    };
127    m.step_limit = if step_limit != 0 {
128        step_limit
129    } else {
130        DEFAULT_STEP_LIMIT.load(Ordering::Relaxed)
131    };
132    m.metacall_depth_limit = if depth_limit != 0 {
133        depth_limit as usize
134    } else {
135        DEFAULT_DEPTH_LIMIT.load(Ordering::Relaxed) as usize
136    };
137
138    let q = std::str::from_utf8(unsafe { std::slice::from_raw_parts(qptr, qlen as usize) })
139        .unwrap_or("");
140
141    let mut buf = Vec::new();
142    // Writes never fail (a `Vec` sink), so the `io::Result`s are infallible.
143    match core::run_query(m, q) {
144        QueryResult::ParseError(msg) | QueryResult::RuntimeError(msg) => {
145            let _ = core::write_error_json(&mut buf, &msg);
146        }
147        QueryResult::Solutions => {
148            let exhausted = core::exhausted(m);
149            // Capture mode → `captured_output()` is always `Some` (`""` when
150            // nothing was written), so the result always carries an `output`
151            // field. Intended D4 contract: a stable shape for hosts, present
152            // even when empty.
153            let _ = core::write_solutions_json(&mut buf, m, exhausted, m.captured_output());
154        }
155    }
156
157    // Copy into an exact-Layout buffer so the host frees it with just `len`.
158    let out = raw_alloc(buf.len());
159    unsafe { std::ptr::copy_nonoverlapping(buf.as_ptr(), out, buf.len()) };
160    ((buf.len() as u64) << 32) | (out as u32 as u64)
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use plg_shared::StringInterner;
167
168    /// Build and register a Machine the way the generated `plg_init` does.
169    /// Leaks the Machine and any result buffers — fine for a test, and only
170    /// this test touches the `MACHINE`/`DEFAULT_*` statics, so no race.
171    fn install() -> *mut Machine {
172        let m = Box::into_raw(Machine::new(StringInterner::new(), Vec::new()));
173        // SAFETY: mirrors `plg_init` handing us a freshly built Machine.
174        unsafe { plg_rt_set_machine(m) };
175        m
176    }
177
178    fn run(q: &str, limit: u32, step: u64, depth: u32) {
179        let b = q.as_bytes();
180        // SAFETY: `b` is a valid UTF-8 buffer; a Machine is installed above.
181        let _ = unsafe { plg_rt_run_query(b.as_ptr(), b.len() as u32, limit, step, depth) };
182    }
183
184    #[test]
185    fn zero_limits_restore_module_default_not_previous_request() {
186        let m = install();
187        // The module defaults `plg_init`/`Machine::new` baked in.
188        let (def_step, def_depth) = unsafe { ((*m).step_limit, (*m).metacall_depth_limit) };
189
190        // Request 1: explicit non-default per-request limits.
191        run("x", 0, 5_000, 50);
192        unsafe {
193            assert_eq!((*m).step_limit, 5_000);
194            assert_eq!((*m).metacall_depth_limit, 50);
195        }
196
197        // Request 2: `0` => module default, NOT request 1's latched values.
198        run("x", 0, 0, 0);
199        unsafe {
200            assert_eq!(
201                (*m).step_limit,
202                def_step,
203                "step_limit must revert to the module default"
204            );
205            assert_eq!(
206                (*m).metacall_depth_limit,
207                def_depth,
208                "metacall_depth_limit must revert to the module default"
209            );
210        }
211    }
212}