typed_ski/arena.rs
1//! # Arena-Based Memory Management for SKI Expressions (WASM, no_std)
2//!
3//! This module implements a high-performance, thread-safe arena allocator optimized
4//! for SKI combinator calculus evaluation. It supports both single-threaded heap-based
5//! allocation and multi-threaded SharedArrayBuffer (SAB) based allocation for Web Workers.
6//!
7//! ## Core Architecture
8//!
9//! ### Arena Node Types
10//!
11//! The arena uses four distinct node types to represent SKI expressions and evaluation state:
12//!
13//! - **`Terminal`**: Leaf nodes containing SKI combinators (S, K, I)
14//! - `kind = 1`, `sym` contains the combinator symbol
15//!
16//! - **`NonTerm`**: Application nodes (function application)
17//! - `kind = 2`, `left` and `right` point to subexpressions
18//! - Represents expressions of the form `(left right)`
19//!
20//! - **`Continuation`**: Stack frames for iterative reduction (optimization)
21//! - `kind = 3`, `sym` indicates reduction stage, `left`/`right` point to parent stack and node
22//! - Used by the iterative reduction algorithm to avoid recursion stack overflow
23//!
24//! - **`Suspension`**: Paused evaluation state for preemptive multitasking
25//! - `kind = 4`, `sym` contains evaluation mode, `left`/`right` contain current expression and stack
26//! - `hash` field stores remaining reduction steps for resumption
27//! - Enables cooperative multitasking across Web Workers
28//!
29//! ### Memory Layout (SharedArrayBuffer Mode)
30//!
31//! ```text
32//! +-------------------+ <-- ARENA_BASE_ADDR
33//! | SabHeader |
34//! | - Magic, offsets |
35//! | - Rings, capacity |
36//! | - Atomic counters |
37//! +-------------------+ <-- offset_sq (64-byte aligned)
38//! | Submission Ring |
39//! | (1024 entries) |
40//! +-------------------+ <-- offset_cq (64-byte aligned)
41//! | Completion Ring |
42//! | (1024 entries) |
43//! +-------------------+ <-- offset_kind (64-byte aligned)
44//! | Kind Array | u8[capacity] - Node types
45//! +-------------------+ <-- offset_sym
46//! | Sym Array | u8[capacity] - Symbols/modes
47//! +-------------------+ <-- offset_left_id (64-byte aligned)
48//! | Left Array | u32[capacity] - Left child pointers
49//! +-------------------+ <-- offset_right_id
50//! | Right Array | u32[capacity] - Right child pointers
51//! +-------------------+ <-- offset_hash32
52//! | Hash Array | u32[capacity] - Hash values for deduplication
53//! +-------------------+ <-- offset_next_idx
54//! | Next Array | u32[capacity] - Hash table collision chains
55//! +-------------------+ <-- offset_buckets (64-byte aligned)
56//! | Bucket Array | u32[capacity] - Hash table buckets
57//! +-------------------+ <-- offset_term_cache
58//! | Terminal Cache | u32[4] - Cached S/K/I node IDs
59//! +-------------------+ <-- End of arena
60//! ```
61//!
62//! ### Key Optimizations
63//!
64//! #### 1. Hash-Consing (Structural Sharing)
65//!
66//! - **[Hash consing](https://en.wikipedia.org/wiki/Hash_consing)** dedupes identical subexpressions to prevent redundant allocations
67//! - **Uses avalanche hash** of `(left, right)` pairs for fast lookups
68//! - **Collision resolution** via separate chaining in the bucket array
69//! - **Memory efficiency**: [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph) representation instead of tree
70//!
71//! #### 2. Iterative Reduction with Continuations
72//!
73//! - **Avoids recursion stack overflow** on deep expressions
74//! - **Continuation nodes** represent suspended stack frames
75//! - **Two-stage reduction**: left child first, then right child
76//! - **Memory reuse**: Dead continuation frames are recycled
77//!
78//! #### 3. Preemptive Multitasking (Suspensions)
79//!
80//! - **Cooperative yielding** when traversal gas is exhausted
81//! - **Suspension nodes** capture complete evaluation state
82//! - **Worker preemption** prevents starvation in parallel evaluation
83//! - **State resumption** via suspension node deserialization
84//!
85//! #### 4. Lock-Free Ring Buffers (io_uring Style)
86//!
87//! - **Submission Queue (SQ)**: Main thread → Worker communication
88//! - **Completion Queue (CQ)**: Worker → Main thread results
89//! - **Atomic operations** for thread-safe producer/consumer patterns
90//! - **Blocking waits** using WASM atomic wait/notify
91//!
92//! #### 5. Concurrent Resizing
93//!
94//! - **[Seqlock](https://en.wikipedia.org/wiki/Seqlock)-style synchronization** for arena growth
95//! - **Stop-the-world pauses** during resize operations
96//! - **Reverse-order copying** to handle overlapping memory regions
97//! - **Poisoning** on OOM to prevent infinite waits
98//!
99//! ### Performance Characteristics
100//!
101//! - **O(1) allocation** for new nodes (amortized)
102//! - **O(1) lookup** for existing subexpressions (hash-consing)
103//! - **O(depth) reduction** with iterative algorithm (no stack overflow)
104//! - **Lock-free communication** between main thread and workers
105//! - **Memory efficient**: ~16 bytes per node, structural sharing
106//!
107//! ### Thread Safety
108//!
109//! - **Atomic operations** for all shared state access
110//! - **Seqlock** for resize synchronization
111//! - **Separate arenas** per worker (no cross-worker sharing)
112//! - **Ring buffer fences** prevent data races in communication
113//!
114//! ### Integration with JavaScript
115//!
116//! - **[SharedArrayBuffer](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer)** enables cross-thread memory access
117//! - **Cross-origin isolation** required for SAB support
118//! - **Typed array views** provide efficient memory access from JS
119//! - **Ring polling** in main thread for completion handling
120
121#![allow(dead_code)]
122
123// =============================================================================
124// Public enums (shared with JS)
125// =============================================================================
126
127/// Arena node types supporting SKI evaluation and parallel execution.
128///
129/// See module-level documentation for detailed descriptions of each variant.
130#[repr(u8)]
131#[derive(Clone, Copy, PartialEq, Eq, Debug)]
132pub enum ArenaKind {
133 /// Terminal node containing an SKI combinator (S, K, or I).
134 /// - `sym`: The combinator symbol
135 /// - `left`/`right`: Unused (reserved)
136 Terminal = 1,
137
138 /// Application node representing function application `(left right)`.
139 /// - `left`: Function expression
140 /// - `right`: Argument expression
141 /// - `sym`: Unused (reserved)
142 NonTerm = 2,
143
144 /// Stack frame for iterative reduction algorithm.
145 /// Used to avoid recursion stack overflow on deep expressions.
146 /// - `sym`: Reduction stage (0=left child, 1=right child)
147 /// - `left`: Parent stack frame
148 /// - `right`: Parent expression node
149 Continuation = 3,
150
151 /// Paused evaluation state for preemptive multitasking.
152 /// Captures complete evaluation context for resumption.
153 /// - `sym`: Evaluation mode (0=descend, 1=return)
154 /// - `left`: Current expression being evaluated
155 /// - `right`: Evaluation stack
156 /// - `hash`: Remaining reduction steps
157 Suspension = 4,
158}
159
160/// SKI combinator symbols and evaluation state markers.
161///
162/// Used in both Terminal nodes (for combinators) and Continuation/Suspension
163/// nodes (for evaluation state).
164#[repr(u8)]
165#[derive(Clone, Copy, PartialEq, Eq, Debug)]
166pub enum ArenaSym {
167 /// S combinator: `S x y z → x z (y z)`
168 /// The most complex combinator, enabling arbitrary computation.
169 S = 1,
170
171 /// K combinator: `K x y → x`
172 /// The constant function, discards its second argument.
173 K = 2,
174
175 /// I combinator: `I x → x`
176 /// The identity function, returns its argument unchanged.
177 I = 3,
178}
179
180// =============================================================================
181// Non-WASM stubs (keep TS type-checking happy)
182// =============================================================================
183#[cfg(not(target_arch = "wasm32"))]
184#[no_mangle]
185pub extern "C" fn initArena(_cap: u32) -> u32 { 0 }
186#[cfg(not(target_arch = "wasm32"))]
187#[no_mangle]
188pub extern "C" fn connectArena(_p: u32) -> u32 { 0 }
189#[cfg(not(target_arch = "wasm32"))]
190#[no_mangle]
191pub extern "C" fn allocTerminal(_s: u32) -> u32 { 0 }
192#[cfg(not(target_arch = "wasm32"))]
193#[no_mangle]
194pub extern "C" fn allocCons(_l: u32, _r: u32) -> u32 { 0 }
195#[cfg(not(target_arch = "wasm32"))]
196#[no_mangle]
197pub extern "C" fn kindOf(_n: u32) -> u32 { 0 }
198#[cfg(not(target_arch = "wasm32"))]
199#[no_mangle]
200pub extern "C" fn symOf(_n: u32) -> u32 { 0 }
201#[cfg(not(target_arch = "wasm32"))]
202#[no_mangle]
203pub extern "C" fn leftOf(_n: u32) -> u32 { 0 }
204#[cfg(not(target_arch = "wasm32"))]
205#[no_mangle]
206pub extern "C" fn rightOf(_n: u32) -> u32 { 0 }
207#[cfg(not(target_arch = "wasm32"))]
208#[no_mangle]
209pub extern "C" fn reset() {}
210#[cfg(not(target_arch = "wasm32"))]
211#[no_mangle]
212pub extern "C" fn arenaKernelStep(x: u32) -> u32 { x }
213#[cfg(not(target_arch = "wasm32"))]
214#[no_mangle]
215pub extern "C" fn hostSubmit(_id: u32) -> u32 { 1 }
216#[cfg(not(target_arch = "wasm32"))]
217#[no_mangle]
218pub extern "C" fn hostPull() -> u32 { u32::MAX }
219#[cfg(not(target_arch = "wasm32"))]
220#[no_mangle]
221pub extern "C" fn workerLoop() {}
222#[cfg(not(target_arch = "wasm32"))]
223#[no_mangle]
224pub extern "C" fn debugGetArenaBaseAddr() -> u32 { 0 }
225#[cfg(not(target_arch = "wasm32"))]
226#[no_mangle]
227pub extern "C" fn getArenaMode() -> u32 { 0 }
228#[cfg(not(target_arch = "wasm32"))]
229#[no_mangle]
230pub extern "C" fn debugCalculateArenaSize(_c: u32) -> u32 { 0 }
231#[cfg(not(target_arch = "wasm32"))]
232#[no_mangle]
233pub extern "C" fn debugLockState() -> u32 { 0xffff_ffff }
234
235// =============================================================================
236// WASM implementation
237// =============================================================================
238#[cfg(target_arch = "wasm32")]
239mod wasm {
240 use core::arch::wasm32;
241 use core::cell::UnsafeCell;
242 use core::sync::atomic::{AtomicU32, AtomicU8, Ordering};
243 use crate::{ArenaKind, ArenaSym};
244
245 // -------------------------------------------------------------------------
246 // Constants / helpers
247 // -------------------------------------------------------------------------
248 pub const EMPTY: u32 = 0xffff_ffff;
249 const ARENA_MAGIC: u32 = 0x534B_4941; // "SKIA"
250 const INITIAL_CAP: u32 = 1 << 20;
251 const MAX_CAP: u32 = 1 << 27;
252 const WASM_PAGE_SIZE: usize = 65536;
253 // Size of SQ/CQ rings (power of two).
254 //
255 // Larger rings reduce backpressure + atomic wait/notify churn when many workers
256 // produce results faster than the host can drain them (a common cause of "stuttering"
257 // worker timelines and main-thread saturation in profiles).
258 const RING_ENTRIES: u32 = 1 << 16; // 65536
259
260 #[inline(always)]
261 const fn align64(x: u32) -> u32 {
262 (x + 63) & !63
263 }
264
265 // -------------------------------------------------------------------------
266 // Atomics + wait/notify
267 // -------------------------------------------------------------------------
268 // WASM atomics / CAS safety notes (applies to all compare_exchange* uses below)
269 // -------------------------------------------------------------------------
270 //
271 // This module relies heavily on CAS (compare_exchange / compare_exchange_weak),
272 // which compiles to a single atomic read-modify-write instruction in WebAssembly
273 // (e.g. `i32.atomic.rmw.cmpxchg`). That property prevents the classic "check-then-
274 // store" race where two threads both observe a value and both write the same
275 // update thinking they won; CAS has a single winner and forces losers to retry.
276 //
277 // ABA notes:
278 // - Many CAS patterns are vulnerable to ABA when the *same* value can reappear
279 // (e.g. counters wrapping or pointer reuse). In this codebase, ABA is either
280 // explicitly prevented (ring slot sequence numbers advance by whole cycles)
281 // or is practically irrelevant in context (e.g. seqlock-style counters would
282 // require billions of expensive operations to wrap during one read section).
283 //
284 // WASM platform particularities:
285 // - When wasm threads are enabled, the linear memory is backed by a
286 // `SharedArrayBuffer`, and atomic ops synchronize across Web Workers.
287 // Rust `Ordering::{Acquire,Release,AcqRel,SeqCst}` map onto WASM atomics/fences
288 // to provide the needed visibility guarantees.
289 // - `memory.grow` increases linear memory size but does not "move" addresses
290 // from the module's perspective; what changes is our chosen layout within
291 // that address space (offsets/capacity), guarded by atomics/seqlock.
292 //
293 // External references:
294 // - Seqlock concept: https://en.wikipedia.org/wiki/Seqlock
295 // - SharedArrayBuffer (required for wasm threads): https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer
296 // - WebAssembly features (threads/atomics): https://webassembly.org/features/
297 // - Rust atomic memory orderings: https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html
298 // - WebAssembly `memory.grow`: https://developer.mozilla.org/en-US/docs/WebAssembly/Reference/Memory/Grow
299 //
300 mod sys {
301 use super::*;
302 #[inline(always)]
303 pub fn wait32(ptr: &AtomicU32, expected: u32) {
304 unsafe {
305 let _ = wasm32::memory_atomic_wait32(
306 ptr as *const _ as *mut i32,
307 expected as i32,
308 -1,
309 );
310 }
311 }
312 #[inline(always)]
313 pub fn notify(ptr: &AtomicU32, count: u32) {
314 unsafe {
315 let _ = wasm32::memory_atomic_notify(ptr as *const _ as *mut i32, count);
316 }
317 }
318 }
319
320 // -------------------------------------------------------------------------
321 // Ring Buffer Types (io_uring Style)
322 // -------------------------------------------------------------------------
323
324 /// Submission Queue Entry: Main thread → Worker communication.
325 ///
326 /// Sent from main thread to worker to request evaluation of an expression.
327 /// Workers dequeue these and perform the actual reduction work.
328 #[repr(C)]
329 #[derive(Clone, Copy)]
330 pub struct Sqe {
331 /// Arena node ID of the expression to evaluate.
332 /// This is the root of the expression tree to reduce.
333 pub node_id: u32,
334
335 /// Unique request identifier for correlation.
336 /// Used to match completion queue entries back to the original request.
337 /// Must be unique across all outstanding requests.
338 pub req_id: u32,
339
340 /// Max reduction steps for this specific request.
341 /// Replaces previous _pad field. Each request carries its own immutable limit.
342 pub max_steps: u32,
343 }
344
345 /// Completion Queue Entry: Worker → Main thread results.
346 ///
347 /// Workers enqueue these when they complete (or yield) evaluation work.
348 /// The main thread polls the completion queue to retrieve results.
349 #[repr(C)]
350 #[derive(Clone, Copy)]
351 pub struct Cqe {
352 /// Result node ID or Suspension node ID (for yields).
353 /// - If reduction completed: The fully reduced expression node
354 /// - If evaluation yielded: A Suspension node for resumption
355 pub node_id: u32,
356
357 /// Request identifier matching the original Sqe.req_id.
358 /// Used by main thread to correlate completions with pending requests.
359 pub req_id: u32,
360
361 /// Padding to align Slot<Cqe> to 16 bytes (power of 2) for efficient indexing.
362 pub _pad: u32,
363 }
364
365 /// Ring buffer slot with sequence number for ABA prevention.
366 ///
367 /// Each slot contains a sequence number and payload. The sequence number
368 /// prevents ABA problems in concurrent CAS operations by ensuring that
369 /// a slot can only be reused after a full cycle of the ring.
370 ///
371 /// ## Sequence Number Protocol
372 ///
373 /// - **Initial state**: `seq = slot_index`
374 /// - **Producer stores**: `seq = tail + 1`, payload written, `seq = tail + 1` (final)
375 /// - **Consumer loads**: Checks `seq == head + 1`, payload read, `seq = head + mask + 1`
376 ///
377 /// This ensures proper ordering and prevents race conditions.
378 #[repr(C)]
379 struct Slot<T> {
380 /// Sequence number for synchronization and ABA prevention.
381 /// Must be atomically updated to maintain memory ordering.
382 seq: AtomicU32,
383
384 /// The actual data payload stored in this slot.
385 /// Access must be synchronized with sequence number updates.
386 payload: UnsafeCell<T>,
387 }
388
389 // SAFETY: Ring<T> ensures proper synchronization of Slot<T> access.
390 // The UnsafeCell is only accessed after proper sequence number validation.
391 unsafe impl<T> Sync for Slot<T> {}
392
393 /// Lock-free, wait-free ring buffer for inter-thread communication.
394 ///
395 /// Uses [io_uring](https://en.wikipedia.org/wiki/Io_uring)-style producer/consumer pattern with atomic operations.
396 /// Supports both non-blocking (try_*) and blocking (*_blocking) operations.
397 ///
398 /// ## Design Principles
399 ///
400 /// - **Single-producer, single-consumer** per ring instance
401 /// - **Power-of-two sizing** for efficient masking operations
402 /// - **Cache-line alignment** (64-byte) to prevent false sharing
403 /// - **Atomic wait/notify** for efficient blocking operations
404 /// - **Sequence numbers** prevent ABA problems in concurrent access
405 ///
406 /// ## Memory Layout
407 ///
408 /// ```text
409 /// +----------------------+ <-- 64-byte aligned
410 /// | head: AtomicU32 | Consumer position
411 /// | not_full: AtomicU32 | Wait/notify for producers
412 /// | _pad1: [u8; 56] | Cache line padding
413 /// +----------------------+ <-- 64-byte aligned
414 /// | tail: AtomicU32 | Producer position
415 /// | not_empty: AtomicU32 | Wait/notify for consumers
416 /// | _pad2: [u8; 56] | Cache line padding
417 /// +----------------------+
418 /// | mask: u32 | entries - 1 (for fast modulo)
419 /// | entries: u32 | Ring capacity (power of 2)
420 /// +----------------------+
421 /// | slots[entries] | Array of Slot<T> entries
422 /// +----------------------+
423 /// ```
424 ///
425 /// ## Thread Safety
426 ///
427 /// - **Producer calls**: `try_enqueue()`, `enqueue_blocking()`
428 /// - **Consumer calls**: `try_dequeue()`, `dequeue_blocking()`
429 /// - **No internal locking**: Uses atomic CAS operations only
430 /// - **Wait-free progress**: No thread can be indefinitely blocked
431 #[repr(C, align(64))]
432 pub struct Ring<T> {
433 /// Consumer position (head of queue).
434 /// Only modified by consumer thread via CAS.
435 head: AtomicU32,
436
437 /// Producer wait/notify synchronization.
438 /// Used by producers to wait when ring is full.
439 not_full: AtomicU32,
440
441 /// Cache line padding to prevent false sharing with tail.
442 _pad1: [u8; 56],
443
444 /// Producer position (tail of queue).
445 /// Only modified by producer thread via CAS.
446 tail: AtomicU32,
447
448 /// Consumer wait/notify synchronization.
449 /// Used by consumers to wait when ring is empty.
450 not_empty: AtomicU32,
451
452 /// Cache line padding to prevent false sharing with head.
453 _pad2: [u8; 56],
454
455 /// Bitmask for fast modulo: `index & mask` ≡ `index % entries`
456 mask: u32,
457
458 /// Ring capacity (must be power of 2).
459 entries: u32,
460
461 /// Zero-sized type marker for generic parameter.
462 _marker: core::marker::PhantomData<T>,
463 }
464
465 impl<T: Copy> Ring<T> {
466 /// Get pointer to the slots array following the Ring header.
467 #[inline(always)]
468 fn slots_ptr(&self) -> *const Slot<T> {
469 unsafe { (self as *const Ring<T>).add(1) as *const Slot<T> }
470 }
471
472 /// Get reference to slot at the given index (masked for wraparound).
473 #[inline(always)]
474 unsafe fn slot_at(&self, i: u32) -> &Slot<T> {
475 &*self.slots_ptr().add((i & self.mask) as usize)
476 }
477
478 /// Initialize a ring buffer at the given memory location.
479 ///
480 /// # Safety
481 ///
482 /// - `ptr` must point to sufficient memory for the ring and all slots
483 /// - `entries_pow2` must be a power of 2
484 /// - The ring must not be accessed concurrently during initialization
485 #[inline(always)]
486 pub unsafe fn init_at(ptr: *mut u8, entries_pow2: u32) -> &'static Self {
487 let ring = &mut *(ptr as *mut Ring<T>);
488 // Initialize producer/consumer positions
489 ring.head.store(0, Ordering::Relaxed);
490 ring.tail.store(0, Ordering::Relaxed);
491 // Initialize wait/notify counters
492 ring.not_empty.store(0, Ordering::Relaxed);
493 ring.not_full.store(0, Ordering::Relaxed);
494 // Set ring parameters
495 ring.entries = entries_pow2;
496 ring.mask = entries_pow2 - 1;
497 // Initialize slot sequence numbers to their indices
498 for i in 0..entries_pow2 {
499 ring.slot_at(i).seq.store(i, Ordering::Relaxed);
500 }
501 ring
502 }
503
504 /// Attempt to enqueue an item without blocking.
505 ///
506 /// Returns `true` if the item was successfully enqueued, `false` if the ring is full.
507 ///
508 /// ## Algorithm
509 ///
510 /// 1. Load current tail position
511 /// 2. Check if the corresponding slot is available (`seq == tail`)
512 /// 3. Attempt to claim the slot by CAS'ing tail forward
513 /// 4. If successful: write payload, update sequence, notify consumers
514 /// 5. If slot unavailable: check if ring is full or retry
515 ///
516 /// ## Memory Ordering
517 ///
518 /// - `Acquire` load of sequence number ensures payload visibility
519 /// - `Release` store of sequence number publishes payload to consumers
520 /// - `Release` notify ensures consumer sees all prior writes
521 ///
522 /// See [memory barriers](https://en.wikipedia.org/wiki/Memory_barrier) for details.
523 #[inline(always)]
524 pub fn try_enqueue(&self, item: T) -> bool {
525 unsafe {
526 loop {
527 // Load producer position (relaxed: no ordering requirements)
528 let t = self.tail.load(Ordering::Relaxed);
529 let slot = self.slot_at(t);
530
531 // Check if slot is available for us (Acquire: see previous publications)
532 let s = slot.seq.load(Ordering::Acquire);
533 let diff = s.wrapping_sub(t);
534
535 if diff == 0 {
536 // Slot is free. Try to claim it by advancing tail.
537 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
538 if self
539 .tail
540 .compare_exchange_weak(
541 t,
542 t.wrapping_add(1),
543 Ordering::Relaxed, // Success: no special ordering
544 Ordering::Relaxed, // Failure: no special ordering
545 )
546 .is_ok()
547 {
548 // Successfully claimed slot. Write payload and publish.
549 *slot.payload.get() = item;
550 slot.seq.store(t.wrapping_add(1), Ordering::Release);
551 // Notify waiting consumers (Release: publish all prior writes)
552 self.not_empty.fetch_add(1, Ordering::Release);
553 sys::notify(&self.not_empty, 1);
554 return true;
555 }
556 // CAS failed: another producer claimed it, retry
557 } else if (diff as i32) < 0 {
558 // Ring is full: slot sequence is too far ahead
559 return false;
560 }
561 // Slot not ready yet, retry (another iteration in progress)
562 }
563 }
564 }
565
566 /// Attempt to dequeue an item without blocking.
567 ///
568 /// Returns `Some(item)` if an item was successfully dequeued, `None` if the ring is empty.
569 ///
570 /// ## Algorithm
571 ///
572 /// 1. Load current head position
573 /// 2. Check if the corresponding slot has data (`seq == head + 1`)
574 /// 3. Attempt to claim the slot by CAS'ing head forward
575 /// 4. If successful: read payload, update sequence for reuse, notify producers
576 /// 5. If slot unavailable: check if ring is empty or retry
577 ///
578 /// ## Sequence Number Reuse
579 ///
580 /// After consuming, sequence is set to `head + mask + 1`. Since `mask = entries - 1`,
581 /// this ensures the slot won't be reused until after a full ring cycle, preventing ABA.
582 #[inline(always)]
583 pub fn try_dequeue(&self) -> Option<T> {
584 unsafe {
585 loop {
586 // Load consumer position (relaxed: no ordering requirements)
587 let h = self.head.load(Ordering::Relaxed);
588 let slot = self.slot_at(h);
589
590 // Check if slot has data for us (Acquire: see producer's publication)
591 let s = slot.seq.load(Ordering::Acquire);
592 let diff = s.wrapping_sub(h.wrapping_add(1));
593
594 if diff == 0 {
595 // Slot has data. Try to claim it by advancing head.
596 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
597 if self
598 .head
599 .compare_exchange_weak(
600 h,
601 h.wrapping_add(1),
602 Ordering::Relaxed, // Success: no special ordering
603 Ordering::Relaxed, // Failure: no special ordering
604 )
605 .is_ok()
606 {
607 // Successfully claimed slot. Read payload and release for reuse.
608 let item = *slot.payload.get();
609 // Set sequence for next cycle: h + mask + 1 prevents ABA issues
610 slot.seq
611 .store(h.wrapping_add(self.mask).wrapping_add(1), Ordering::Release);
612 // Notify waiting producers (Release: publish slot availability)
613 self.not_full.fetch_add(1, Ordering::Release);
614 sys::notify(&self.not_full, 1);
615 return Some(item);
616 }
617 // CAS failed: another consumer claimed it, retry
618 } else if (diff as i32) < 0 {
619 // Ring is empty: no data available
620 return None;
621 }
622 // Slot not ready yet, retry (another operation in progress)
623 }
624 }
625 }
626
627 /// Enqueue an item, blocking until space is available.
628 ///
629 /// Uses WASM atomic wait/notify for efficient blocking when the ring is full.
630 /// The wait is interruptible and will retry the enqueue operation.
631 #[inline(always)]
632 pub fn enqueue_blocking(&self, item: T) {
633 while !self.try_enqueue(item) {
634 // Load current state and wait for notification
635 let v = self.not_full.load(Ordering::Acquire);
636 // Double-check after loading (spurious wakeup protection)
637 if self.try_enqueue(item) {
638 return;
639 }
640 // Wait for producer to notify us of available space
641 sys::wait32(&self.not_full, v);
642 }
643 }
644
645 /// Dequeue an item, blocking until data is available.
646 ///
647 /// Uses WASM atomic wait/notify for efficient blocking when the ring is empty.
648 /// The wait is interruptible and will retry the dequeue operation.
649 #[inline(always)]
650 pub fn dequeue_blocking(&self) -> T {
651 loop {
652 if let Some(x) = self.try_dequeue() {
653 return x;
654 }
655 // Load current state and wait for notification
656 let v = self.not_empty.load(Ordering::Acquire);
657 // Double-check after loading (spurious wakeup protection)
658 if let Some(x) = self.try_dequeue() {
659 return x;
660 }
661 // Wait for consumer to notify us of available data
662 sys::wait32(&self.not_empty, v);
663 }
664 }
665 }
666
667 #[inline(always)]
668 const fn ring_bytes<T>(entries: u32) -> u32 {
669 let header = core::mem::size_of::<Ring<T>>() as u32;
670 let slot = core::mem::size_of::<Slot<T>>() as u32;
671 align64(header + entries * slot)
672 }
673
674 // -------------------------------------------------------------------------
675 // Header layout (fixed offsets)
676 // -------------------------------------------------------------------------
677 #[repr(C, align(64))]
678 struct SabHeader {
679 magic: u32,
680 ring_entries: u32,
681 ring_mask: u32,
682 offset_sq: u32,
683 offset_cq: u32,
684 offset_kind: u32,
685 offset_sym: u32,
686 offset_left_id: u32,
687 offset_right_id: u32,
688 offset_hash32: u32,
689 offset_next_idx: u32,
690 offset_buckets: u32,
691 offset_term_cache: u32,
692 capacity: u32,
693 bucket_mask: u32,
694 resize_seq: AtomicU32,
695 top: AtomicU32,
696 }
697
698 impl SabHeader {
699 fn layout(capacity: u32) -> (u32, u32, u32) {
700 let header_size = core::mem::size_of::<SabHeader>() as u32;
701 let offset_sq = align64(header_size);
702 let offset_cq = align64(offset_sq + ring_bytes::<Sqe>(RING_ENTRIES));
703
704 let offset_kind = align64(offset_cq + ring_bytes::<Cqe>(RING_ENTRIES));
705 let offset_sym = offset_kind + capacity;
706 let offset_left_id = align64(offset_sym + capacity);
707 let offset_right_id = offset_left_id + 4 * capacity;
708 let offset_hash32 = offset_right_id + 4 * capacity;
709 let offset_next_idx = offset_hash32 + 4 * capacity;
710 let offset_buckets = align64(offset_next_idx + 4 * capacity);
711 let offset_term_cache = offset_buckets + 4 * capacity;
712 let total_size = offset_term_cache + 16;
713 (
714 offset_sq,
715 offset_cq,
716 total_size,
717 )
718 }
719 }
720
721 // -------------------------------------------------------------------------
722 // Globals
723 // -------------------------------------------------------------------------
724 static mut ARENA_BASE_ADDR: u32 = 0;
725 static mut ARENA_MODE: u32 = 0;
726
727 #[inline(always)]
728 unsafe fn ensure_arena() {
729 if ARENA_BASE_ADDR != 0 {
730 return;
731 }
732 // If we were supposed to be in SAB mode but have no base, that's fatal.
733 if ARENA_MODE == 1 {
734 wasm32::unreachable();
735 }
736 let ptr = allocate_raw_arena(INITIAL_CAP);
737 if ptr.is_null() {
738 wasm32::unreachable();
739 }
740 // Heap / single-instance mode
741 ARENA_MODE = 0;
742 }
743
744 // -------------------------------------------------------------------------
745 // Helpers to locate structures
746 // -------------------------------------------------------------------------
747 #[inline(always)]
748 unsafe fn header() -> &'static SabHeader {
749 &*(ARENA_BASE_ADDR as *const SabHeader)
750 }
751
752 #[inline(always)]
753 unsafe fn header_mut() -> &'static mut SabHeader {
754 &mut *(ARENA_BASE_ADDR as *mut SabHeader)
755 }
756
757 #[inline(always)]
758 unsafe fn sq_ring() -> &'static Ring<Sqe> {
759 let h = header();
760 &*((ARENA_BASE_ADDR + h.offset_sq) as *const Ring<Sqe>)
761 }
762
763 #[inline(always)]
764 unsafe fn cq_ring() -> &'static Ring<Cqe> {
765 let h = header();
766 &*((ARENA_BASE_ADDR + h.offset_cq) as *const Ring<Cqe>)
767 }
768
769 // Array helpers
770 #[inline(always)]
771 unsafe fn kind_ptr() -> *mut AtomicU8 {
772 (ARENA_BASE_ADDR + header().offset_kind) as *mut AtomicU8
773 }
774 #[inline(always)]
775 unsafe fn sym_ptr() -> *mut AtomicU8 {
776 (ARENA_BASE_ADDR + header().offset_sym) as *mut AtomicU8
777 }
778 #[inline(always)]
779 unsafe fn left_ptr() -> *mut AtomicU32 {
780 (ARENA_BASE_ADDR + header().offset_left_id) as *mut AtomicU32
781 }
782 #[inline(always)]
783 unsafe fn right_ptr() -> *mut AtomicU32 {
784 (ARENA_BASE_ADDR + header().offset_right_id) as *mut AtomicU32
785 }
786 #[inline(always)]
787 unsafe fn hash_ptr() -> *mut AtomicU32 {
788 (ARENA_BASE_ADDR + header().offset_hash32) as *mut AtomicU32
789 }
790 #[inline(always)]
791 unsafe fn next_ptr() -> *mut AtomicU32 {
792 (ARENA_BASE_ADDR + header().offset_next_idx) as *mut AtomicU32
793 }
794 #[inline(always)]
795 unsafe fn buckets_ptr() -> *mut AtomicU32 {
796 (ARENA_BASE_ADDR + header().offset_buckets) as *mut AtomicU32
797 }
798 #[inline(always)]
799 unsafe fn term_cache_ptr() -> *mut AtomicU32 {
800 (ARENA_BASE_ADDR + header().offset_term_cache) as *mut AtomicU32
801 }
802
803 // -------------------------------------------------------------------------
804 // Hashing helpers
805 // -------------------------------------------------------------------------
806 fn avalanche32(mut x: u32) -> u32 {
807 x ^= x >> 16;
808 x = x.wrapping_mul(0x7feb_352d);
809 x ^= x >> 15;
810 x = x.wrapping_mul(0x846c_a68b);
811 x ^= x >> 16;
812 x
813 }
814 const GOLD: u32 = 0x9e37_79b9;
815 fn mix(a: u32, b: u32) -> u32 {
816 avalanche32(a ^ b.wrapping_mul(GOLD))
817 }
818
819 // If a resize fails (OOM / max cap), poison the seqlock so other threads trap
820 // instead of spinning forever on an odd seq.
821 const POISON_SEQ: u32 = 0xffff_ffff;
822
823 // -------------------------------------------------------------------------
824 // Resize guard (seqlock-style)
825 // -------------------------------------------------------------------------
826 // See "WASM atomics / CAS safety notes" above for CAS/ABA discussion.
827 // This specific guard is seqlock-style: even=stable, odd=resize in progress.
828 // For READING existing data. Returns (seq, h) to enable read-verify-retry pattern:
829 // 1. Call enter_stable() to get sequence number
830 // 2. Read the data you need
831 // 3. Call check_stable(seq) to verify sequence didn't change
832 // 4. If changed, retry (resize occurred during read)
833 // Used in: kindOf(), symOf(), leftOf(), rightOf(), hash lookups
834 #[inline(always)]
835 fn enter_stable() -> (u32, &'static SabHeader) {
836 unsafe {
837 let h = header();
838 loop {
839 let seq = h.resize_seq.load(Ordering::Acquire);
840 if seq == POISON_SEQ {
841 core::arch::wasm32::unreachable();
842 }
843 if seq & 1 == 1 {
844 core::hint::spin_loop();
845 continue;
846 }
847 return (seq, h);
848 }
849 }
850 }
851
852 // For WRITING new data. See comment above enter_stable() for distinction.
853 // Simply waits until resize completes (sequence is even).
854 // No verification needed since we're not reading existing data.
855 // Used in: allocTerminal(), allocCons() allocation path, alloc_generic()
856 #[inline(always)]
857 fn wait_resize_stable() {
858 unsafe {
859 let h = header();
860 loop {
861 let seq = h.resize_seq.load(Ordering::Acquire);
862 if seq == POISON_SEQ {
863 core::arch::wasm32::unreachable();
864 }
865 if (seq & 1) == 0 {
866 return;
867 }
868 core::hint::spin_loop();
869 }
870 }
871 }
872
873 #[inline(always)]
874 fn check_stable(seq: u32) -> bool {
875 unsafe { header().resize_seq.load(Ordering::Acquire) == seq }
876 }
877
878 // -------------------------------------------------------------------------
879 // Arena init / connect
880 // -------------------------------------------------------------------------
881 unsafe fn zero_region(start: u32, len: u32) {
882 core::ptr::write_bytes((ARENA_BASE_ADDR + start) as *mut u8, 0, len as usize);
883 }
884
885 unsafe fn init_header(capacity: u32) {
886 let (offset_sq, offset_cq, total_size) = SabHeader::layout(capacity);
887 let h = &mut *(ARENA_BASE_ADDR as *mut SabHeader);
888 h.magic = ARENA_MAGIC;
889 h.ring_entries = RING_ENTRIES;
890 h.ring_mask = RING_ENTRIES - 1;
891 h.offset_sq = offset_sq;
892 h.offset_cq = offset_cq;
893 h.offset_kind = align64(offset_cq + ring_bytes::<Cqe>(RING_ENTRIES));
894 h.offset_sym = h.offset_kind + capacity;
895 h.offset_left_id = align64(h.offset_sym + capacity);
896 h.offset_right_id = h.offset_left_id + 4 * capacity;
897 h.offset_hash32 = h.offset_right_id + 4 * capacity;
898 h.offset_next_idx = h.offset_hash32 + 4 * capacity;
899 h.offset_buckets = align64(h.offset_next_idx + 4 * capacity);
900 h.offset_term_cache = h.offset_buckets + 4 * capacity;
901 h.capacity = capacity;
902 h.bucket_mask = capacity - 1;
903 h.resize_seq.store(0, Ordering::Relaxed);
904 h.top.store(0, Ordering::Relaxed);
905
906 zero_region((core::mem::size_of::<SabHeader>()) as u32, total_size - core::mem::size_of::<SabHeader>() as u32);
907
908 Ring::<Sqe>::init_at((ARENA_BASE_ADDR + h.offset_sq) as *mut u8, RING_ENTRIES);
909 Ring::<Cqe>::init_at((ARENA_BASE_ADDR + h.offset_cq) as *mut u8, RING_ENTRIES);
910
911 // Buckets + cache init
912 let buckets = buckets_ptr();
913 for i in 0..capacity as usize {
914 buckets.add(i).write(AtomicU32::new(EMPTY));
915 }
916 let cache = term_cache_ptr();
917 for i in 0..4 {
918 cache.add(i).write(AtomicU32::new(EMPTY));
919 }
920 }
921
922 unsafe fn allocate_raw_arena(capacity: u32) -> *mut SabHeader {
923 let (_, _, total_size) = SabHeader::layout(capacity);
924 let pages_needed = (total_size as usize + WASM_PAGE_SIZE - 1) / WASM_PAGE_SIZE;
925 let old_pages = wasm32::memory_grow(0, pages_needed);
926 if old_pages == usize::MAX {
927 return core::ptr::null_mut();
928 }
929 let base_addr = (old_pages * WASM_PAGE_SIZE) as u32;
930 ARENA_BASE_ADDR = base_addr;
931 init_header(capacity);
932 base_addr as *mut SabHeader
933 }
934
935 // -------------------------------------------------------------------------
936 // Exports: init/connect/reset
937 // -------------------------------------------------------------------------
938 #[no_mangle]
939 pub extern "C" fn initArena(initial_capacity: u32) -> u32 {
940 if initial_capacity < 1024 || initial_capacity > MAX_CAP || !initial_capacity.is_power_of_two() {
941 return 0;
942 }
943 unsafe {
944 if ARENA_BASE_ADDR != 0 {
945 return ARENA_BASE_ADDR;
946 }
947 let ptr = allocate_raw_arena(initial_capacity);
948 if ptr.is_null() {
949 return 1;
950 }
951 ARENA_MODE = 1;
952 ARENA_BASE_ADDR
953 }
954 }
955
956 #[no_mangle]
957 pub extern "C" fn connectArena(ptr_addr: u32) -> u32 {
958 if ptr_addr == 0 || ptr_addr % 64 != 0 {
959 return 0;
960 }
961 unsafe {
962 ARENA_BASE_ADDR = ptr_addr;
963 ARENA_MODE = 1;
964 let h = header();
965 if h.magic != ARENA_MAGIC {
966 return 5;
967 }
968 1
969 }
970 }
971
972 #[no_mangle]
973 pub extern "C" fn reset() {
974 unsafe {
975 ensure_arena();
976 let h = header_mut();
977 h.top.store(0, Ordering::Release);
978 let buckets = buckets_ptr();
979 for i in 0..h.capacity as usize {
980 (*buckets.add(i)).store(EMPTY, Ordering::Release);
981 }
982 let cache = term_cache_ptr();
983 for i in 0..4 {
984 (*cache.add(i)).store(EMPTY, Ordering::Release);
985 }
986 h.resize_seq.store(h.resize_seq.load(Ordering::Relaxed) & !1, Ordering::Release);
987 }
988 }
989
990 // -------------------------------------------------------------------------
991 // Accessors
992 // -------------------------------------------------------------------------
993 #[no_mangle]
994 pub extern "C" fn kindOf(n: u32) -> u32 {
995 unsafe {
996 ensure_arena();
997 loop {
998 let (seq, h) = enter_stable();
999 let cap = h.capacity;
1000 if n >= cap {
1001 return 0;
1002 }
1003 let val = (*kind_ptr().add(n as usize)).load(Ordering::Acquire) as u32;
1004 core::sync::atomic::fence(Ordering::Acquire);
1005 if check_stable(seq) {
1006 return val;
1007 }
1008 }
1009 }
1010 }
1011
1012 #[no_mangle]
1013 pub extern "C" fn symOf(n: u32) -> u32 {
1014 unsafe {
1015 ensure_arena();
1016 loop {
1017 let (seq, h) = enter_stable();
1018 if n >= h.capacity {
1019 return 0;
1020 }
1021 let val = (*sym_ptr().add(n as usize)).load(Ordering::Acquire) as u32;
1022 core::sync::atomic::fence(Ordering::Acquire);
1023 if check_stable(seq) {
1024 return val;
1025 }
1026 }
1027 }
1028 }
1029
1030 #[no_mangle]
1031 pub extern "C" fn leftOf(n: u32) -> u32 {
1032 unsafe {
1033 ensure_arena();
1034 loop {
1035 let (seq, h) = enter_stable();
1036 if n >= h.capacity {
1037 return 0;
1038 }
1039 let val = (*left_ptr().add(n as usize)).load(Ordering::Acquire);
1040 core::sync::atomic::fence(Ordering::Acquire);
1041 if check_stable(seq) {
1042 return val;
1043 }
1044 }
1045 }
1046 }
1047
1048 #[no_mangle]
1049 pub extern "C" fn rightOf(n: u32) -> u32 {
1050 unsafe {
1051 ensure_arena();
1052 loop {
1053 let (seq, h) = enter_stable();
1054 if n >= h.capacity {
1055 return 0;
1056 }
1057 let val = (*right_ptr().add(n as usize)).load(Ordering::Acquire);
1058 core::sync::atomic::fence(Ordering::Acquire);
1059 if check_stable(seq) {
1060 return val;
1061 }
1062 }
1063 }
1064 }
1065
1066 // -------------------------------------------------------------------------
1067 // Allocation (speculative, lock-free)
1068 // -------------------------------------------------------------------------
1069 #[no_mangle]
1070 pub extern "C" fn allocTerminal(sym: u32) -> u32 {
1071 unsafe {
1072 ensure_arena();
1073 let h = header();
1074
1075 if sym < 4 {
1076 let cached = (*term_cache_ptr().add(sym as usize)).load(Ordering::Acquire);
1077 if cached != EMPTY {
1078 return cached;
1079 }
1080 }
1081
1082 loop {
1083 wait_resize_stable();
1084 let id = h.top.fetch_add(1, Ordering::AcqRel);
1085 if id >= h.capacity {
1086 grow();
1087 continue;
1088 }
1089
1090 (*kind_ptr().add(id as usize)).store(ArenaKind::Terminal as u8, Ordering::Release);
1091 (*sym_ptr().add(id as usize)).store(sym as u8, Ordering::Release);
1092 (*hash_ptr().add(id as usize)).store(sym, Ordering::Release);
1093
1094 if sym < 4 {
1095 (*term_cache_ptr().add(sym as usize)).store(id, Ordering::Release);
1096 }
1097 return id;
1098 }
1099 }
1100 }
1101
1102 #[no_mangle]
1103 /// Allocate a NonTerm (application) node with hash-consing optimization.
1104 ///
1105 /// This is the core optimization that enables structural sharing in the arena.
1106 /// Instead of always creating new nodes, it checks if an identical `(l r)` pair
1107 /// already exists and returns the existing node ID if found.
1108 ///
1109 /// ## Hash-Consing Algorithm
1110 ///
1111 /// 1. **Compute hash**: `mix(hash(l), hash(r))` using avalanche hashing
1112 /// 2. **Lookup in hash table**: Check bucket for existing `(l,r)` pairs
1113 /// 3. **Return existing**: If found, return the shared node ID
1114 /// 4. **Allocate new**: If not found, create new node and add to hash table
1115 ///
1116 /// ## Memory Efficiency
1117 ///
1118 /// - **DAG representation**: Common subexpressions are shared
1119 /// - **Reduced allocations**: Avoids duplicate node creation
1120 /// - **Cache-friendly**: Hash table enables O(1) lookups
1121 ///
1122 /// ## Thread Safety
1123 ///
1124 /// - Uses seqlock to handle concurrent resizing
1125 /// - Atomic operations for hash table consistency
1126 /// - CAS-based insertion prevents race conditions
1127 pub extern "C" fn allocCons(l: u32, r: u32) -> u32 {
1128 unsafe {
1129 ensure_arena();
1130
1131 // Compute hash value (doesn't depend on header state)
1132 let hl = loop {
1133 let (seq, h) = enter_stable();
1134 if l >= h.capacity {
1135 if !check_stable(seq) { continue; }
1136 return EMPTY; // Invalid left node
1137 }
1138 let val = (*hash_ptr().add(l as usize)).load(Ordering::Acquire);
1139 core::sync::atomic::fence(Ordering::Acquire);
1140 if check_stable(seq) {
1141 break val;
1142 }
1143 };
1144 let hr = loop {
1145 let (seq, h) = enter_stable();
1146 if r >= h.capacity {
1147 if !check_stable(seq) { continue; }
1148 return EMPTY; // Invalid right node
1149 }
1150 let val = (*hash_ptr().add(r as usize)).load(Ordering::Acquire);
1151 core::sync::atomic::fence(Ordering::Acquire);
1152 if check_stable(seq) {
1153 break val;
1154 }
1155 };
1156 let hval = mix(hl, hr);
1157
1158 // Retry loop for stable reads
1159 let _ = loop {
1160 let (seq, h) = enter_stable(); // Wait if resizing
1161 let mask = h.bucket_mask;
1162 let bucket_idx = (hval & mask) as usize;
1163
1164 // Validate bucket index is safe before dereferencing
1165 if bucket_idx >= h.capacity as usize {
1166 // Capacity changed mid-read? Retry.
1167 if !check_stable(seq) { continue; }
1168 // Should be unreachable if logic is correct, but safe fallback
1169 continue;
1170 }
1171
1172 let buckets = buckets_ptr();
1173 let next = next_ptr();
1174
1175 let mut cur = (*buckets.add(bucket_idx)).load(Ordering::Acquire);
1176 let mut found = EMPTY;
1177
1178 while cur != EMPTY {
1179 // Bounds check for safety
1180 if cur >= h.capacity { break; }
1181
1182 let k = (*kind_ptr().add(cur as usize)).load(Ordering::Acquire);
1183 if k == ArenaKind::NonTerm as u8 {
1184 let ch = (*hash_ptr().add(cur as usize)).load(Ordering::Acquire);
1185 if ch == hval {
1186 let cl = (*left_ptr().add(cur as usize)).load(Ordering::Acquire);
1187 let cr = (*right_ptr().add(cur as usize)).load(Ordering::Acquire);
1188 if cl == l && cr == r {
1189 found = cur;
1190 break;
1191 }
1192 }
1193 }
1194 cur = (*next.add(cur as usize)).load(Ordering::Acquire);
1195 }
1196
1197 // If we found it, verify the lock is still valid.
1198 // If lock changed, our read might be garbage -> Retry.
1199 if check_stable(seq) {
1200 if found != EMPTY {
1201 return found;
1202 }
1203 // If not found, we break to the allocation logic with bucket index
1204 break bucket_idx;
1205 }
1206 // If lock changed, retry the whole lookup
1207 };
1208
1209 // allocate new
1210 loop {
1211 wait_resize_stable();
1212
1213 // Reload pointers/mask in case they changed during wait_resize_stable()
1214 let h = header();
1215 let buckets = buckets_ptr();
1216 let current_mask = h.bucket_mask;
1217 let b = (hval & current_mask) as usize;
1218
1219 let id = h.top.fetch_add(1, Ordering::AcqRel);
1220 if id >= h.capacity {
1221 grow();
1222 continue;
1223 }
1224
1225 (*kind_ptr().add(id as usize)).store(ArenaKind::NonTerm as u8, Ordering::Release);
1226 (*left_ptr().add(id as usize)).store(l, Ordering::Release);
1227 (*right_ptr().add(id as usize)).store(r, Ordering::Release);
1228 (*hash_ptr().add(id as usize)).store(hval, Ordering::Release);
1229
1230 // insert into bucket with CAS; if we lose, drop id as hole (kind=0)
1231 let next = next_ptr();
1232 loop {
1233 let head = (*buckets.add(b)).load(Ordering::Acquire);
1234 (*next.add(id as usize)).store(head, Ordering::Relaxed);
1235 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1236 if (*buckets.add(b))
1237 .compare_exchange(head, id, Ordering::Release, Ordering::Relaxed)
1238 .is_ok()
1239 {
1240 return id;
1241 }
1242 // someone inserted; check if it matches now
1243 let mut cur2 = (*buckets.add(b)).load(Ordering::Acquire);
1244 while cur2 != EMPTY {
1245 let ck2 = (*kind_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1246 if ck2 != ArenaKind::NonTerm as u8 {
1247 cur2 = (*next.add(cur2 as usize)).load(Ordering::Acquire);
1248 continue;
1249 }
1250 let ch2 = (*hash_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1251 if ch2 == hval {
1252 let cl2 = (*left_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1253 let cr2 = (*right_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1254 if cl2 == l && cr2 == r {
1255 // mark hole
1256 (*kind_ptr().add(id as usize)).store(0, Ordering::Release);
1257 return cur2;
1258 }
1259 }
1260 cur2 = (*next.add(cur2 as usize)).load(Ordering::Acquire);
1261 }
1262 }
1263 }
1264 }
1265 }
1266
1267 // Generic allocation (non hash-consed; NOT inserted into buckets).
1268 // This is used for reducer continuations/suspensions.
1269 #[inline(always)]
1270 unsafe fn alloc_generic(kind: u8, sym: u8, left: u32, right: u32, hash: u32) -> u32 {
1271 ensure_arena();
1272 let h = header();
1273 loop {
1274 wait_resize_stable();
1275 let id = h.top.fetch_add(1, Ordering::AcqRel);
1276 if id >= h.capacity {
1277 grow();
1278 continue;
1279 }
1280 // Publish payload, then kind last.
1281 (*sym_ptr().add(id as usize)).store(sym, Ordering::Release);
1282 (*left_ptr().add(id as usize)).store(left, Ordering::Release);
1283 (*right_ptr().add(id as usize)).store(right, Ordering::Release);
1284 (*hash_ptr().add(id as usize)).store(hash, Ordering::Release);
1285 (*kind_ptr().add(id as usize)).store(kind, Ordering::Release);
1286 return id;
1287 }
1288 }
1289
1290 // -------------------------------------------------------------------------
1291 // Resize (stop-the-world via odd/even seq)
1292 // -------------------------------------------------------------------------
1293 fn grow() {
1294 unsafe {
1295 let h = header_mut();
1296 let mut expected = h.resize_seq.load(Ordering::Acquire);
1297 loop {
1298 if expected & 1 == 1 {
1299 core::hint::spin_loop();
1300 expected = h.resize_seq.load(Ordering::Acquire);
1301 continue;
1302 }
1303 // Acquire exclusive "writer" by flipping even->odd with CAS.
1304 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1305 if h.resize_seq.compare_exchange(expected, expected | 1, Ordering::AcqRel, Ordering::Acquire).is_ok() {
1306 break;
1307 }
1308 expected = h.resize_seq.load(Ordering::Acquire);
1309 }
1310
1311 let old_cap = h.capacity;
1312 // Capture OLD offsets before we overwrite the header with new ones.
1313 let old_offset_kind = h.offset_kind;
1314 let old_offset_sym = h.offset_sym;
1315 let old_offset_left = h.offset_left_id;
1316 let old_offset_right = h.offset_right_id;
1317 let old_offset_hash = h.offset_hash32;
1318 let old_offset_next = h.offset_next_idx;
1319 let old_offset_term_cache = h.offset_term_cache;
1320 let old_top = h.top.load(Ordering::Acquire);
1321
1322 if old_cap >= MAX_CAP {
1323 // Poison so other threads trap instead of spinning on odd resize_seq.
1324 h.resize_seq.store(POISON_SEQ, Ordering::Release);
1325 core::arch::wasm32::unreachable();
1326 }
1327 let new_cap = (old_cap * 2).min(MAX_CAP);
1328
1329 let (offset_sq, offset_cq, total_size) = SabHeader::layout(new_cap);
1330 let needed_bytes = ARENA_BASE_ADDR as usize + total_size as usize;
1331 let current_bytes = wasm32::memory_size(0) * WASM_PAGE_SIZE;
1332 if needed_bytes > current_bytes {
1333 let extra = needed_bytes - current_bytes;
1334 let pages = (extra + WASM_PAGE_SIZE - 1) / WASM_PAGE_SIZE;
1335 let res = wasm32::memory_grow(0, pages);
1336 if res == usize::MAX {
1337 // OOM (or denied grow). Poison so other threads trap instead of spinning.
1338 h.resize_seq.store(POISON_SEQ, Ordering::Release);
1339 core::arch::wasm32::unreachable();
1340 }
1341 }
1342
1343 // Compute new offsets (rings stay in place, but compute for completeness).
1344 let offset_kind = align64(offset_cq + ring_bytes::<Cqe>(RING_ENTRIES));
1345 let offset_sym = offset_kind + new_cap;
1346 let offset_left_id = align64(offset_sym + new_cap);
1347 let offset_right_id = offset_left_id + 4 * new_cap;
1348 let offset_hash32 = offset_right_id + 4 * new_cap;
1349 let offset_next_idx = offset_hash32 + 4 * new_cap;
1350 let offset_buckets = align64(offset_next_idx + 4 * new_cap);
1351 let offset_term_cache = offset_buckets + 4 * new_cap;
1352
1353 // Update header (rings untouched)
1354 h.capacity = new_cap;
1355 h.bucket_mask = new_cap - 1;
1356 h.offset_sq = offset_sq;
1357 h.offset_cq = offset_cq;
1358 h.offset_kind = offset_kind;
1359 h.offset_sym = offset_sym;
1360 h.offset_left_id = offset_left_id;
1361 h.offset_right_id = offset_right_id;
1362 h.offset_hash32 = offset_hash32;
1363 h.offset_next_idx = offset_next_idx;
1364 h.offset_buckets = offset_buckets;
1365 h.offset_term_cache = offset_term_cache;
1366
1367 // Preserve top
1368 let count = old_top.min(old_cap);
1369 h.top.store(count, Ordering::Release);
1370
1371 // IMPORTANT: reverse-copy order is mandatory.
1372 // The layout is packed contiguously, and new_cap > old_cap means new regions can overlap
1373 // old regions during migration. Copy from the end back to the front.
1374
1375 // Term cache (16 bytes)
1376 core::ptr::copy(
1377 (ARENA_BASE_ADDR + old_offset_term_cache) as *const u8,
1378 (ARENA_BASE_ADDR + h.offset_term_cache) as *mut u8,
1379 16,
1380 );
1381
1382 // Next (u32)
1383 core::ptr::copy(
1384 (ARENA_BASE_ADDR + old_offset_next) as *const u8,
1385 (ARENA_BASE_ADDR + h.offset_next_idx) as *mut u8,
1386 (count as usize) * 4,
1387 );
1388 if new_cap > old_cap {
1389 zero_region(
1390 h.offset_next_idx + old_cap * 4,
1391 (new_cap - old_cap) * 4,
1392 );
1393 }
1394
1395 // Hash (u32)
1396 core::ptr::copy(
1397 (ARENA_BASE_ADDR + old_offset_hash) as *const u8,
1398 (ARENA_BASE_ADDR + h.offset_hash32) as *mut u8,
1399 (count as usize) * 4,
1400 );
1401 if new_cap > old_cap {
1402 zero_region(
1403 h.offset_hash32 + old_cap * 4,
1404 (new_cap - old_cap) * 4,
1405 );
1406 }
1407
1408 // Right (u32)
1409 core::ptr::copy(
1410 (ARENA_BASE_ADDR + old_offset_right) as *const u8,
1411 (ARENA_BASE_ADDR + h.offset_right_id) as *mut u8,
1412 (count as usize) * 4,
1413 );
1414 if new_cap > old_cap {
1415 zero_region(
1416 h.offset_right_id + old_cap * 4,
1417 (new_cap - old_cap) * 4,
1418 );
1419 }
1420
1421 // Left (u32)
1422 core::ptr::copy(
1423 (ARENA_BASE_ADDR + old_offset_left) as *const u8,
1424 (ARENA_BASE_ADDR + h.offset_left_id) as *mut u8,
1425 (count as usize) * 4,
1426 );
1427 if new_cap > old_cap {
1428 zero_region(
1429 h.offset_left_id + old_cap * 4,
1430 (new_cap - old_cap) * 4,
1431 );
1432 }
1433
1434 // Sym (u8)
1435 core::ptr::copy(
1436 (ARENA_BASE_ADDR + old_offset_sym) as *const u8,
1437 (ARENA_BASE_ADDR + h.offset_sym) as *mut u8,
1438 count as usize,
1439 );
1440 if new_cap > old_cap {
1441 zero_region(
1442 h.offset_sym + old_cap,
1443 new_cap - old_cap,
1444 );
1445 }
1446
1447 // Kind (u8)
1448 core::ptr::copy(
1449 (ARENA_BASE_ADDR + old_offset_kind) as *const u8,
1450 (ARENA_BASE_ADDR + h.offset_kind) as *mut u8,
1451 count as usize,
1452 );
1453 if new_cap > old_cap {
1454 zero_region(
1455 h.offset_kind + old_cap,
1456 new_cap - old_cap,
1457 );
1458 }
1459
1460 // Rebuild buckets (hash-consing table) for NonTerm only.
1461 let buckets = buckets_ptr();
1462 let next = next_ptr();
1463 for i in 0..new_cap as usize {
1464 (*buckets.add(i)).store(EMPTY, Ordering::Release);
1465 }
1466 for i in 0..count {
1467 let k = (*kind_ptr().add(i as usize)).load(Ordering::Acquire);
1468 if k != ArenaKind::NonTerm as u8 {
1469 continue;
1470 }
1471 let hv = (*hash_ptr().add(i as usize)).load(Ordering::Acquire);
1472 let b = (hv & h.bucket_mask) as usize;
1473 loop {
1474 let head = (*buckets.add(b)).load(Ordering::Acquire);
1475 (*next.add(i as usize)).store(head, Ordering::Relaxed);
1476 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1477 if (*buckets.add(b))
1478 .compare_exchange(head, i, Ordering::Release, Ordering::Relaxed)
1479 .is_ok()
1480 {
1481 break;
1482 }
1483 }
1484 }
1485
1486 h.resize_seq.fetch_add(1, Ordering::Release);
1487 }
1488 }
1489
1490 // -------------------------------------------------------------------------
1491 // Reducer
1492 // -------------------------------------------------------------------------
1493 // Continuation frame: kind=Continuation, sym=stage, left=parent_stack, right=parent_node
1494 const STAGE_LEFT: u8 = 0;
1495 const STAGE_RIGHT: u8 = 1;
1496
1497 // Suspension: kind=Suspension, sym=mode (0=descend, 1=return), left=curr, right=stack, hash=remaining_steps
1498 const MODE_DESCEND: u8 = 0;
1499 const MODE_RETURN: u8 = 1;
1500
1501 #[inline(always)]
1502 unsafe fn alloc_continuation(parent: u32, target: u32, stage: u8) -> u32 {
1503 alloc_generic(ArenaKind::Continuation as u8, stage, parent, target, 0)
1504 }
1505
1506 #[inline(always)]
1507 unsafe fn alloc_suspension(curr: u32, stack: u32, mode: u8, remaining_steps: u32) -> u32 {
1508 alloc_generic(ArenaKind::Suspension as u8, mode, curr, stack, remaining_steps)
1509 }
1510
1511 // --- OPTIMIZATION: Update existing node (Slot Reuse) ---
1512 #[inline(always)]
1513 unsafe fn update_continuation(id: u32, parent: u32, target: u32, stage: u8) {
1514 (*left_ptr().add(id as usize)).store(parent, Ordering::Relaxed);
1515 (*right_ptr().add(id as usize)).store(target, Ordering::Relaxed);
1516 (*sym_ptr().add(id as usize)).store(stage, Ordering::Relaxed);
1517 // Ensure it is marked as Continuation (in case we recycled a Suspension)
1518 (*kind_ptr().add(id as usize)).store(ArenaKind::Continuation as u8, Ordering::Release);
1519 }
1520
1521 #[inline(always)]
1522 fn hash_of_internal(n: u32) -> u32 {
1523 unsafe {
1524 ensure_arena();
1525 loop {
1526 let (seq, h) = enter_stable();
1527 if n >= h.capacity {
1528 return 0;
1529 }
1530 let val = (*hash_ptr().add(n as usize)).load(Ordering::Acquire);
1531 core::sync::atomic::fence(Ordering::Acquire);
1532 if check_stable(seq) {
1533 return val;
1534 }
1535 }
1536 }
1537 }
1538
1539 /// Unwind the continuation stack to reconstruct the full expression tree.
1540 ///
1541 /// When the step limit is exhausted, the worker may be deep in the expression tree
1542 /// with a non-empty stack. This function walks up the stack, rebuilding parent nodes
1543 /// as necessary, to return the root of the expression rather than a sub-expression.
1544 #[inline(always)]
1545 unsafe fn unwind_to_root(mut curr: u32, mut stack: u32) -> u32 {
1546 while stack != EMPTY {
1547 let recycled = stack;
1548 stack = leftOf(recycled);
1549 let parent_node = rightOf(recycled);
1550 let stage = symOf(recycled) as u8;
1551
1552 if stage == STAGE_LEFT {
1553 let orig_left = leftOf(parent_node);
1554 if curr != orig_left {
1555 // Left child changed, must allocate new parent
1556 curr = allocCons(curr, rightOf(parent_node));
1557 } else {
1558 // Left child didn't change, reuse existing parent
1559 curr = parent_node;
1560 }
1561 } else {
1562 // STAGE_RIGHT
1563 let orig_right = rightOf(parent_node);
1564 if curr != orig_right {
1565 // Right child changed, must allocate new parent
1566 curr = allocCons(leftOf(parent_node), curr);
1567 } else {
1568 curr = parent_node;
1569 }
1570 }
1571 }
1572 curr
1573 }
1574
1575 enum StepResult {
1576 Done(u32),
1577 Yield(u32), // Suspension node id
1578 }
1579
1580 /// Perform one iterative reduction step with preemptive yielding.
1581 ///
1582 /// This implements the core SKI reduction algorithm using an iterative approach
1583 /// with explicit stack management instead of recursion. It can yield mid-step
1584 /// when traversal gas is exhausted, enabling cooperative multitasking.
1585 ///
1586 /// ## Iterative Reduction Algorithm
1587 ///
1588 /// Instead of recursive function calls, uses an explicit stack of Continuation nodes:
1589 ///
1590 /// - **MODE_DESCEND**: Traverse down the expression tree looking for redexes
1591 /// - **MODE_RETURN**: Return up the tree after reducing a subexpression
1592 /// - **Continuation frames**: Represent suspended stack frames as arena nodes
1593 ///
1594 /// ## Gas-Based Preemption
1595 ///
1596 /// - **Traversal gas**: Limits AST traversal depth per step
1597 /// - **Yield on exhaustion**: Returns Suspension node for later resumption
1598 /// - **Cooperative multitasking**: Prevents worker starvation in parallel evaluation
1599 ///
1600 /// ## Step Counting
1601 ///
1602 /// - **Accurate counting**: Decrements `remaining_steps` immediately when each reduction occurs
1603 /// - **Multiple reductions per call**: Can perform multiple reductions in a single call
1604 /// - **Deterministic**: Every reduction is counted exactly once, regardless of batching
1605 ///
1606 /// ## Node Recycling Optimization
1607 ///
1608 /// - **Free node reuse**: Dead continuation frames are recycled immediately
1609 /// - **Memory efficiency**: Reduces allocation pressure during reduction
1610 /// - **Cache locality**: Reuses recently freed nodes
1611 ///
1612 /// ## Parameters
1613 ///
1614 /// - `curr`: Current expression node being evaluated
1615 /// - `stack`: Stack of continuation frames (linked list of nodes)
1616 /// - `mode`: Current evaluation mode (descend/return)
1617 /// - `gas`: Remaining traversal gas (mutable, decremented during execution)
1618 /// - `remaining_steps`: Mutable reference to reduction steps remaining (decremented on each reduction)
1619 /// - `free_node`: Recyclable node ID from previous operations
1620 ///
1621 /// ## Returns
1622 ///
1623 /// - `StepResult::Done(node)`: Reduction completed, `node` is the result
1624 /// - `StepResult::Yield(susp_id)`: Yielded mid-step, `susp_id` is Suspension node
1625 unsafe fn step_iterative(mut curr: u32, mut stack: u32, mut mode: u8, gas: &mut u32, remaining_steps: &mut u32, mut free_node: u32) -> StepResult {
1626 loop {
1627 // Gas exhaustion yield
1628 if *gas == 0 {
1629 // If we have a free_node we didn't use, it's just a hole now.
1630 return StepResult::Yield(alloc_suspension(curr, stack, mode, *remaining_steps));
1631 }
1632 *gas -= 1;
1633
1634 if mode == MODE_RETURN {
1635 if stack == EMPTY {
1636 return StepResult::Done(curr);
1637 }
1638
1639 // POP FRAME
1640 let recycled = stack; // <--- This frame is now dead/recyclable
1641 stack = leftOf(recycled); // Parent
1642 let parent_node = rightOf(recycled);
1643 let stage = symOf(recycled) as u8;
1644
1645 if stage == STAGE_LEFT {
1646 let orig_left = leftOf(parent_node);
1647 if curr != orig_left {
1648 // Rebuild parent
1649 curr = allocCons(curr, rightOf(parent_node));
1650 // 'recycled' is still free, we are returning up.
1651 free_node = recycled; // Keep for next push or pop
1652 mode = MODE_RETURN;
1653 continue;
1654 }
1655 // Left stable, DESCEND RIGHT
1656 // Reuse 'recycled' as the new Continuation frame!
1657 update_continuation(recycled, stack, parent_node, STAGE_RIGHT);
1658 stack = recycled;
1659 mode = MODE_DESCEND;
1660 curr = rightOf(parent_node);
1661 continue;
1662 } else {
1663 let orig_right = rightOf(parent_node);
1664 if curr != orig_right {
1665 curr = allocCons(leftOf(parent_node), curr);
1666 free_node = recycled;
1667 mode = MODE_RETURN;
1668 continue;
1669 }
1670 // Both stable
1671 curr = parent_node;
1672 free_node = recycled;
1673 mode = MODE_RETURN;
1674 continue;
1675 }
1676 }
1677
1678 // MODE_DESCEND
1679 let k = kindOf(curr);
1680 if k != ArenaKind::NonTerm as u32 {
1681 mode = MODE_RETURN;
1682 continue;
1683 }
1684
1685 // NonTerm: check for I/K/S redex at this node.
1686 let left = leftOf(curr);
1687 let right = rightOf(curr);
1688
1689 // [REDUCTION LOGIC START] -----------------------------------------
1690
1691 // I x -> x
1692 if kindOf(left) == ArenaKind::Terminal as u32 && symOf(left) == ArenaSym::I as u32 {
1693 if *remaining_steps == 0 {
1694 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1695 }
1696 *remaining_steps = remaining_steps.saturating_sub(1);
1697
1698 curr = right;
1699 mode = MODE_RETURN;
1700
1701 // Yield IMMEDIATELY if limit hit zero.
1702 // Don't waste gas traversing to the next redex.
1703 if *remaining_steps == 0 {
1704 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1705 }
1706 continue;
1707 }
1708
1709 if kindOf(left) == ArenaKind::NonTerm as u32 {
1710 let ll = leftOf(left);
1711 // K x y -> x
1712 if kindOf(ll) == ArenaKind::Terminal as u32 && symOf(ll) == ArenaSym::K as u32 {
1713 if *remaining_steps == 0 {
1714 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1715 }
1716 *remaining_steps = remaining_steps.saturating_sub(1);
1717
1718 curr = rightOf(left);
1719 mode = MODE_RETURN;
1720
1721 // Yield IMMEDIATELY
1722 if *remaining_steps == 0 {
1723 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1724 }
1725 continue;
1726 }
1727 // S x y z -> x z (y z)
1728 if kindOf(ll) == ArenaKind::NonTerm as u32 {
1729 let lll = leftOf(ll);
1730 if kindOf(lll) == ArenaKind::Terminal as u32 && symOf(lll) == ArenaSym::S as u32 {
1731 if *remaining_steps == 0 {
1732 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1733 }
1734 // Use saturating_sub for consistency
1735 *remaining_steps = remaining_steps.saturating_sub(1);
1736
1737 let x = rightOf(ll);
1738 let y = rightOf(left);
1739 let z = right;
1740 let xz = allocCons(x, z);
1741 let yz = allocCons(y, z);
1742 curr = allocCons(xz, yz);
1743 mode = MODE_RETURN;
1744
1745 // Yield IMMEDIATELY
1746 if *remaining_steps == 0 {
1747 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1748 }
1749 continue;
1750 }
1751 }
1752 }
1753
1754 // [REDUCTION LOGIC END] -------------------------------------------
1755
1756 // No redex: PUSH frame to descend left
1757 if free_node != EMPTY {
1758 update_continuation(free_node, stack, curr, STAGE_LEFT);
1759 stack = free_node;
1760 free_node = EMPTY;
1761 } else {
1762 stack = alloc_continuation(stack, curr, STAGE_LEFT);
1763 }
1764 curr = left;
1765 mode = MODE_DESCEND;
1766 }
1767 }
1768
1769 fn step_internal(expr: u32) -> u32 {
1770 unsafe {
1771 let mut gas = u32::MAX;
1772 let mut steps = u32::MAX; // Dummy - not used for single-step calls
1773 match step_iterative(expr, EMPTY, MODE_DESCEND, &mut gas, &mut steps, EMPTY) {
1774 StepResult::Done(x) => x,
1775 StepResult::Yield(_) => expr, // unreachable with u32::MAX gas; keep total safety
1776 }
1777 }
1778 }
1779
1780 #[no_mangle]
1781 pub extern "C" fn arenaKernelStep(expr: u32) -> u32 {
1782 unsafe { ensure_arena(); }
1783 step_internal(expr)
1784 }
1785
1786 #[no_mangle]
1787 pub extern "C" fn reduce(expr: u32, max: u32) -> u32 {
1788 unsafe { ensure_arena(); }
1789 let limit = if max == 0xffff_ffff { u32::MAX } else { max };
1790 let mut cur = expr;
1791 for _ in 0..limit {
1792 let next = step_internal(cur);
1793 if next == cur {
1794 break;
1795 }
1796 cur = next;
1797 }
1798 cur
1799 }
1800
1801 // -------------------------------------------------------------------------
1802 // Host/worker ring APIs
1803 // -------------------------------------------------------------------------
1804 #[no_mangle]
1805 pub extern "C" fn hostPull() -> i64 {
1806 unsafe {
1807 if ARENA_BASE_ADDR == 0 {
1808 return -1;
1809 }
1810 if let Some(cqe) = cq_ring().try_dequeue() {
1811 // Pack: high 32 bits = req_id, low 32 bits = node_id
1812 let packed: u64 = ((cqe.req_id as u64) << 32) | (cqe.node_id as u64);
1813 packed as i64
1814 } else {
1815 -1
1816 }
1817 }
1818 }
1819
1820 /// Debug/diagnostic helper: expose ring capacity to the host.
1821 ///
1822 /// Useful for tests and for sizing stress workloads without duplicating a JS-side constant.
1823 #[no_mangle]
1824 pub extern "C" fn debugGetRingEntries() -> u32 {
1825 RING_ENTRIES
1826 }
1827
1828 #[no_mangle]
1829 /// Submit work with explicit correlation id and max reduction steps.
1830 /// Returns: 0=ok, 1=full, 2=not connected.
1831 pub extern "C" fn hostSubmit(node_id: u32, req_id: u32, max_steps: u32) -> u32 {
1832 unsafe {
1833 if ARENA_BASE_ADDR == 0 {
1834 return 2;
1835 }
1836 if sq_ring().try_enqueue(Sqe { node_id, req_id, max_steps }) {
1837 0
1838 } else {
1839 1
1840 }
1841 }
1842 }
1843
1844 /// Main worker loop for parallel SKI evaluation.
1845 ///
1846 /// Processes evaluation requests from the submission queue and posts results
1847 /// to the completion queue. Implements preemptive multitasking using gas-based
1848 /// yielding to prevent worker starvation in concurrent workloads.
1849 ///
1850 /// ## Processing Model
1851 ///
1852 /// 1. **Dequeue request** from submission queue (blocking)
1853 /// 2. **Initialize evaluation state** from request or suspension
1854 /// 3. **Iterative reduction loop** with gas-based preemption
1855 /// 4. **Yield or complete** based on gas and step limits
1856 /// 5. **Enqueue result** to completion queue
1857 /// 6. **Repeat** for next request
1858 ///
1859 /// ## Preemption Strategy
1860 ///
1861 /// - **Batch gas limit**: `20,000` AST nodes per dequeue batch
1862 /// - **Cooperative yielding**: Returns control when gas exhausted
1863 /// - **Suspension resumption**: Can restart from any yield point
1864 /// - **Fair scheduling**: Prevents any single expression from monopolizing CPU
1865 ///
1866 /// ## Step Counting Semantics
1867 ///
1868 /// - **Step decrement**: Only occurs when `StepResult::Done` is returned (step completed)
1869 /// - **Yield behavior**: When `step_iterative` yields, it saves the UN-DECREMENTED `remaining_steps`
1870 /// because the step did not finish. This is correct: the suspension captures the state before
1871 /// the step completes, so resumption continues with the same budget.
1872 /// - **Deterministic counting**: Each reduction step is counted exactly once, regardless of
1873 /// how many times the work is suspended and resumed.
1874 ///
1875 /// ## Memory Management
1876 ///
1877 /// - **Node recycling**: Reuses dead continuation frames immediately
1878 /// - **Shared arena**: All workers access the same memory space
1879 /// - **Atomic operations**: Safe concurrent access to shared structures
1880 ///
1881 /// ## Performance Optimizations
1882 ///
1883 /// - **Lock-free queues**: Minimal synchronization overhead
1884 /// - **Batched processing**: Amortizes dequeue overhead
1885 /// - **In-place updates**: Modifies nodes directly in shared memory
1886 /// - **Cache-friendly**: Sequential memory access patterns
1887 #[no_mangle]
1888 pub extern "C" fn workerLoop() {
1889 unsafe {
1890 let sq = sq_ring();
1891 let cq = cq_ring();
1892 // Per-dequeue traversal gas (preemption) to prevent starvation.
1893 // This is NOT the same as max reduction steps.
1894 let batch_gas: u32 = 20000;
1895 loop {
1896 let job = sq.dequeue_blocking();
1897 let mut curr = job.node_id;
1898 let mut stack = EMPTY;
1899 let mut mode = MODE_DESCEND;
1900 let mut remaining_steps: u32;
1901
1902 // If resuming a suspension, the suspension node ITSELF is now free to be reused!
1903 let mut free_node = EMPTY;
1904
1905 // Resume from suspension?
1906 if kindOf(curr) == ArenaKind::Suspension as u32 {
1907 let susp = curr;
1908 curr = leftOf(susp);
1909 stack = rightOf(susp);
1910 mode = symOf(susp) as u8;
1911 // Strict resume: Trust the suspension's counter 100%
1912 // The suspension node's hash field contains the remaining_steps that were
1913 // saved when the step yielded (before decrement, because the step didn't finish).
1914 remaining_steps = hash_of_internal(susp);
1915 free_node = susp; // <--- RECYCLE START
1916 } else {
1917 // Set strict limit from the job packet
1918 let limit = job.max_steps;
1919 remaining_steps = if limit == 0xffff_ffff { u32::MAX } else { limit };
1920 }
1921
1922 loop {
1923 // Check budget BEFORE trying a step
1924 if remaining_steps == 0 {
1925 // If we have a stack, we are deep in the tree.
1926 // We must unwind to the root to return a valid full expression.
1927 if stack != EMPTY {
1928 curr = unwind_to_root(curr, stack);
1929 stack = EMPTY;
1930 }
1931
1932 // Step budget exhausted; return (partial) result.
1933 cq.enqueue_blocking(Cqe {
1934 node_id: curr,
1935 req_id: job.req_id,
1936 _pad: 0,
1937 });
1938 break;
1939 }
1940
1941 let mut gas = batch_gas;
1942
1943 match step_iterative(curr, stack, mode, &mut gas, &mut remaining_steps, free_node) {
1944 StepResult::Yield(susp_id) => {
1945 // Yielded (gas or limit). 'susp_id' has the correct remaining count
1946 // because step_iterative updated 'remaining_steps' in place for each
1947 // reduction that occurred before yielding.
1948 cq.enqueue_blocking(Cqe {
1949 node_id: susp_id,
1950 req_id: job.req_id,
1951 _pad: 0,
1952 });
1953 break;
1954 }
1955 StepResult::Done(next_node) => {
1956 if next_node == curr {
1957 // Fixpoint reached.
1958 cq.enqueue_blocking(Cqe {
1959 node_id: curr,
1960 req_id: job.req_id,
1961 _pad: 0,
1962 });
1963 break;
1964 }
1965
1966 // Setup for next iteration
1967 curr = next_node;
1968 stack = EMPTY;
1969 mode = MODE_DESCEND;
1970 free_node = EMPTY;
1971
1972 // Loop continues; 'remaining_steps' was updated by reference in step_iterative
1973 }
1974 }
1975 }
1976 }
1977 }
1978 }
1979
1980 // -------------------------------------------------------------------------
1981 // Debug helpers
1982 // -------------------------------------------------------------------------
1983 #[no_mangle]
1984 pub extern "C" fn debugGetArenaBaseAddr() -> u32 {
1985 unsafe { ARENA_BASE_ADDR }
1986 }
1987
1988 #[no_mangle]
1989 pub extern "C" fn getArenaMode() -> u32 {
1990 unsafe { ARENA_MODE }
1991 }
1992
1993 #[no_mangle]
1994 pub extern "C" fn debugCalculateArenaSize(capacity: u32) -> u32 {
1995 let (_, _, total_size) = SabHeader::layout(capacity);
1996 total_size
1997 }
1998
1999 #[no_mangle]
2000 pub extern "C" fn debugLockState() -> u32 {
2001 // Full resize sequence word:
2002 // - even: stable
2003 // - odd: resize in progress
2004 // - 0xFFFF_FFFF: poisoned (unrecoverable resize failure)
2005 unsafe { header().resize_seq.load(Ordering::Relaxed) }
2006 }
2007}
2008
2009// Re-export WASM symbols at crate root
2010#[cfg(target_arch = "wasm32")]
2011pub use wasm::*;