plg_runtime/reactor.rs
1//! Tier-2 reactor ABI for `wasm32-unknown-unknown` (Cloudflare Workers / V8
2//! isolates). No WASI, no stdio/argv — the module *exports* functions a JS
3//! host calls over linear memory (docs/design/done/WASM_TIER2_PLAN.md A3):
4//!
5//! plg_init (emitted by the generated module) → builds
6//! the Machine, hands it to `plg_rt_set_machine`
7//! plg_rt_alloc(len) → ptr host writes the query bytes here
8//! plg_rt_run_query(ptr,len,…) → u64 packed (len<<32 | ptr) of a JSON buffer
9//! plg_rt_free(ptr,len) host frees the result (or the query buffer)
10//!
11//! JSON formatting and the query path are NOT duplicated here — both go
12//! through `crate::core`, the single I/O-free core the WASI shell shares.
13//!
14//! ## Concurrency contract (D3 / WASM.md finding #2)
15//!
16//! **One in-flight query per isolate.** The program Machine is a single
17//! `static`; a V8 isolate is single-threaded, but one Worker can interleave
18//! async tasks, so the host must not call `plg_rt_run_query` again before the
19//! prior call returns. This matches typical Worker use (a request maps to a
20//! query) and avoids threading per-request state through the ABI.
21
22use crate::core::{self, QueryResult};
23use crate::machine::{Machine, OutputSink};
24use std::alloc::{Layout, alloc, dealloc};
25use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering};
26
27/// Exact-`Layout` allocation keyed by byte length, so the host can free a
28/// buffer with just its length. NEVER `Vec::with_capacity`: a `Vec` may
29/// over-allocate, and the host frees by *requested* length, so an actual
30/// capacity > requested length corrupts the allocator (WASM.md finding #1 —
31/// this is the bug that aborted the spike's deep query; the reflexive reach
32/// for `Vec` is the trap).
33fn raw_alloc(len: usize) -> *mut u8 {
34 if len == 0 {
35 return std::ptr::NonNull::<u8>::dangling().as_ptr();
36 }
37 // SAFETY: len > 0; align 1 is always valid for bytes.
38 unsafe { alloc(Layout::from_size_align_unchecked(len, 1)) }
39}
40
41/// The program Machine, built once by the generated `plg_init` and reused for
42/// every query (cold-start-per-isolate; never freed — a teardown entry point
43/// would only be needed to swap a live isolate's program, WASM.md finding #8).
44/// wasm is single-threaded, so `Relaxed` is sufficient.
45static MACHINE: AtomicPtr<Machine> = AtomicPtr::new(std::ptr::null_mut());
46
47/// Module-default limits, captured from the Machine at init. A per-request `0`
48/// means "use the module default" — and because the reactor reuses ONE Machine
49/// across every request (and `reset_per_query` deliberately leaves the limit
50/// fields alone, correct for the CLI's set-once use), we must restore these
51/// explicitly each request. Otherwise `0` would inherit the *previous*
52/// request's value — a cross-request latch in exactly the reuse scenario this
53/// module exists for.
54static DEFAULT_STEP_LIMIT: AtomicU64 = AtomicU64::new(0);
55static DEFAULT_DEPTH_LIMIT: AtomicU64 = AtomicU64::new(0);
56
57/// # Safety
58/// Called once from the generated `plg_init` with the `plg_rt_init` result.
59#[unsafe(no_mangle)]
60pub unsafe extern "C" fn plg_rt_set_machine(m: *mut Machine) {
61 // No stdout in a V8 isolate: capture `write/1` output into the result JSON
62 // (D4) instead of streaming it nowhere.
63 unsafe { (*m).output = OutputSink::Capture(String::new()) };
64 // Snapshot the limits codegen/`plg_init` baked in, so a per-request `0`
65 // restores them rather than latching the prior request's override.
66 DEFAULT_STEP_LIMIT.store(unsafe { (*m).step_limit }, Ordering::Relaxed);
67 DEFAULT_DEPTH_LIMIT.store(
68 unsafe { (*m).metacall_depth_limit } as u64,
69 Ordering::Relaxed,
70 );
71 MACHINE.store(m, Ordering::Relaxed);
72}
73
74/// Allocate a host-writable buffer in linear memory (query in / result out).
75#[unsafe(no_mangle)]
76pub extern "C" fn plg_rt_alloc(len: u32) -> *mut u8 {
77 raw_alloc(len as usize)
78}
79
80/// # Safety
81/// `ptr`/`len` must be exactly a prior `plg_rt_alloc`/`plg_rt_run_query` pair.
82/// `len == 0` no-ops to pair with `raw_alloc(0)`'s dangling sentinel (which was
83/// never really allocated); the two halves agree by convention, not by API.
84#[unsafe(no_mangle)]
85pub unsafe extern "C" fn plg_rt_free(ptr: *mut u8, len: u32) {
86 if len == 0 {
87 return;
88 }
89 unsafe { dealloc(ptr, Layout::from_size_align_unchecked(len as usize, 1)) };
90}
91
92/// Run one query (UTF-8 at `qptr..qptr+qlen`) and return packed
93/// `(len << 32) | ptr` of a JSON byte buffer the host reads then frees via
94/// `plg_rt_free`. The packed return assumes **wasm32** (the pointer fits in the
95/// low 32 bits); wasm64 would need a wider/two-value result (WASM.md finding #7).
96///
97/// Per-request limits bound the query before the platform's CPU/wall limit does
98/// (WASM.md finding #5). All three mirror the CLI's knobs:
99/// - `limit`: max solutions; `0` = unbounded.
100/// - `step_limit`: step ceiling (`PLG_MAX_STEPS`); `0` = keep the module default.
101/// - `depth_limit`: metacall depth bound (`PLG_METACALL_DEPTH`); `0` = keep the
102/// default. Depth matters more on wasm: its ~1 MB stack is far smaller than
103/// native's ~8 MB.
104///
105/// # Safety
106/// Requires `plg_init` to have run first; `qptr`/`qlen` a valid buffer. See the
107/// module's single-in-flight concurrency contract.
108#[unsafe(no_mangle)]
109pub unsafe extern "C" fn plg_rt_run_query(
110 qptr: *const u8,
111 qlen: u32,
112 limit: u32,
113 step_limit: u64,
114 depth_limit: u32,
115) -> u64 {
116 let m = unsafe { &mut *MACHINE.load(Ordering::Relaxed) };
117 m.reset_per_query();
118 // Assign all three limits UNCONDITIONALLY: `reset_per_query` doesn't touch
119 // the limit fields (set-once for the CLI), so on the reused reactor Machine
120 // a non-zero arg overrides and `0` restores the module default — never the
121 // previous request's value.
122 m.solution_limit = if limit == 0 {
123 None
124 } else {
125 Some(limit as usize)
126 };
127 m.step_limit = if step_limit != 0 {
128 step_limit
129 } else {
130 DEFAULT_STEP_LIMIT.load(Ordering::Relaxed)
131 };
132 m.metacall_depth_limit = if depth_limit != 0 {
133 depth_limit as usize
134 } else {
135 DEFAULT_DEPTH_LIMIT.load(Ordering::Relaxed) as usize
136 };
137
138 let q = std::str::from_utf8(unsafe { std::slice::from_raw_parts(qptr, qlen as usize) })
139 .unwrap_or("");
140
141 let mut buf = Vec::new();
142 // Writes never fail (a `Vec` sink), so the `io::Result`s are infallible.
143 match core::run_query(m, q) {
144 QueryResult::ParseError(msg) | QueryResult::RuntimeError(msg) => {
145 let _ = core::write_error_json(&mut buf, &msg);
146 }
147 QueryResult::Solutions => {
148 let exhausted = core::exhausted(m);
149 // Capture mode → `captured_output()` is always `Some` (`""` when
150 // nothing was written), so the result always carries an `output`
151 // field. Intended D4 contract: a stable shape for hosts, present
152 // even when empty.
153 let _ = core::write_solutions_json(&mut buf, m, exhausted, m.captured_output());
154 }
155 }
156
157 // Copy into an exact-Layout buffer so the host frees it with just `len`.
158 let out = raw_alloc(buf.len());
159 unsafe { std::ptr::copy_nonoverlapping(buf.as_ptr(), out, buf.len()) };
160 ((buf.len() as u64) << 32) | (out as u32 as u64)
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use plg_shared::StringInterner;
167
168 /// Build and register a Machine the way the generated `plg_init` does.
169 /// Leaks the Machine and any result buffers — fine for a test, and only
170 /// this test touches the `MACHINE`/`DEFAULT_*` statics, so no race.
171 fn install() -> *mut Machine {
172 let m = Box::into_raw(Machine::new(StringInterner::new(), Vec::new()));
173 // SAFETY: mirrors `plg_init` handing us a freshly built Machine.
174 unsafe { plg_rt_set_machine(m) };
175 m
176 }
177
178 fn run(q: &str, limit: u32, step: u64, depth: u32) {
179 let b = q.as_bytes();
180 // SAFETY: `b` is a valid UTF-8 buffer; a Machine is installed above.
181 let _ = unsafe { plg_rt_run_query(b.as_ptr(), b.len() as u32, limit, step, depth) };
182 }
183
184 #[test]
185 fn zero_limits_restore_module_default_not_previous_request() {
186 let m = install();
187 // The module defaults `plg_init`/`Machine::new` baked in.
188 let (def_step, def_depth) = unsafe { ((*m).step_limit, (*m).metacall_depth_limit) };
189
190 // Request 1: explicit non-default per-request limits.
191 run("x", 0, 5_000, 50);
192 unsafe {
193 assert_eq!((*m).step_limit, 5_000);
194 assert_eq!((*m).metacall_depth_limit, 50);
195 }
196
197 // Request 2: `0` => module default, NOT request 1's latched values.
198 run("x", 0, 0, 0);
199 unsafe {
200 assert_eq!(
201 (*m).step_limit,
202 def_step,
203 "step_limit must revert to the module default"
204 );
205 assert_eq!(
206 (*m).metacall_depth_limit,
207 def_depth,
208 "metacall_depth_limit must revert to the module default"
209 );
210 }
211 }
212}