Skip to main content

typed_ski/
arena.rs

1//! # Arena-Based Memory Management for SKI Expressions (WASM, no_std)
2//!
3//! This module implements a high-performance, thread-safe arena allocator optimized
4//! for SKI combinator calculus evaluation. It supports both single-threaded heap-based
5//! allocation and multi-threaded SharedArrayBuffer (SAB) based allocation for Web Workers.
6//!
7//! ## Core Architecture
8//!
9//! ### Arena Node Types
10//!
11//! The arena uses four distinct node types to represent SKI expressions and evaluation state:
12//!
13//! - **`Terminal`**: Leaf nodes containing SKI combinators (S, K, I)
14//!   - `kind = 1`, `sym` contains the combinator symbol
15//!
16//! - **`NonTerm`**: Application nodes (function application)
17//!   - `kind = 2`, `left` and `right` point to subexpressions
18//!   - Represents expressions of the form `(left right)`
19//!
20//! - **`Continuation`**: Stack frames for iterative reduction (optimization)
21//!   - `kind = 3`, `sym` indicates reduction stage, `left`/`right` point to parent stack and node
22//!   - Used by the iterative reduction algorithm to avoid recursion stack overflow
23//!
24//! - **`Suspension`**: Paused evaluation state for preemptive multitasking
25//!   - `kind = 4`, `sym` contains evaluation mode, `left`/`right` contain current expression and stack
26//!   - `hash` field stores remaining reduction steps for resumption
27//!   - Enables cooperative multitasking across Web Workers
28//!
29//! ### Memory Layout (SharedArrayBuffer Mode)
30//!
31//! ```text
32//! +-------------------+ <-- ARENA_BASE_ADDR
33//! | SabHeader         |
34//! | - Magic, offsets  |
35//! | - Rings, capacity |
36//! | - Atomic counters |
37//! +-------------------+ <-- offset_sq (64-byte aligned)
38//! | Submission Ring   |
39//! | (1024 entries)    |
40//! +-------------------+ <-- offset_cq (64-byte aligned)
41//! | Completion Ring   |
42//! | (1024 entries)    |
43//! +-------------------+ <-- offset_stdin (64-byte aligned)
44//! | Stdin Ring (u8)   |
45//! +-------------------+ <-- offset_stdout (64-byte aligned)
46//! | Stdout Ring (u8)  |
47//! +-------------------+ <-- offset_stdin_wait (64-byte aligned)
48//! | Stdin Wait Ring   |
49//! | (u32 node ids)    |
50//! +-------------------+ <-- offset_kind (64-byte aligned)
51//! | Kind Array        |  u8[capacity] - Node types
52//! +-------------------+ <-- offset_sym
53//! | Sym Array         |  u8[capacity] - Symbols/modes
54//! +-------------------+ <-- offset_left_id (64-byte aligned)
55//! | Left Array        | u32[capacity] - Left child pointers
56//! +-------------------+ <-- offset_right_id
57//! | Right Array       | u32[capacity] - Right child pointers
58//! +-------------------+ <-- offset_hash32
59//! | Hash Array        | u32[capacity] - Hash values for deduplication
60//! +-------------------+ <-- offset_next_idx
61//! | Next Array        | u32[capacity] - Hash table collision chains
62//! +-------------------+ <-- offset_buckets (64-byte aligned)
63//! | Bucket Array      | u32[capacity] - Hash table buckets
64//! +-------------------+ <-- offset_term_cache
65//! | Terminal Cache    | u32[4] - Cached S/K/I node IDs
66//! +-------------------+ <-- End of arena
67//! ```
68//!
69//! ### Key Optimizations
70//!
71//! #### 1. Hash-Consing (Structural Sharing)
72//!
73//! - **[Hash consing](https://en.wikipedia.org/wiki/Hash_consing)** dedupes identical subexpressions to prevent redundant allocations
74//! - **Uses avalanche hash** of `(left, right)` pairs for fast lookups
75//! - **Collision resolution** via separate chaining in the bucket array
76//! - **Memory efficiency**: [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph) representation instead of tree
77//!
78//! #### 2. Iterative Reduction with Continuations
79//!
80//! - **Avoids recursion stack overflow** on deep expressions
81//! - **Continuation nodes** represent suspended stack frames
82//! - **Two-stage reduction**: left child first, then right child
83//! - **Memory reuse**: Dead continuation frames are recycled
84//!
85//! #### 3. Preemptive Multitasking (Suspensions)
86//!
87//! - **Cooperative yielding** when traversal gas is exhausted
88//! - **Suspension nodes** capture complete evaluation state
89//! - **Worker preemption** prevents starvation in parallel evaluation
90//! - **State resumption** via suspension node deserialization
91//!
92//! #### 4. Lock-Free Ring Buffers (io_uring Style)
93//!
94//! - **Submission Queue (SQ)**: Main thread → Worker communication
95//! - **Completion Queue (CQ)**: Worker → Main thread results
96//! - **Atomic operations** for thread-safe producer/consumer patterns
97//! - **Blocking waits** using WASM atomic wait/notify
98//!
99//! #### 5. Concurrent Resizing
100//!
101//! - **[Seqlock](https://en.wikipedia.org/wiki/Seqlock)-style synchronization** for arena growth
102//! - **Stop-the-world pauses** during resize operations
103//! - **Reverse-order copying** to handle overlapping memory regions
104//! - **Poisoning** on OOM to prevent infinite waits
105//!
106//! ### Performance Characteristics
107//!
108//! - **O(1) allocation** for new nodes (amortized)
109//! - **O(1) lookup** for existing subexpressions (hash-consing)
110//! - **O(depth) reduction** with iterative algorithm (no stack overflow)
111//! - **Lock-free communication** between main thread and workers
112//! - **Memory efficient**: ~16 bytes per node, structural sharing
113//!
114//! ### Thread Safety
115//!
116//! - **Atomic operations** for all shared state access
117//! - **Seqlock** for resize synchronization
118//! - **Separate arenas** per worker (no cross-worker sharing)
119//! - **Ring buffer fences** prevent data races in communication
120//!
121//! ### Integration with JavaScript
122//!
123//! - **[SharedArrayBuffer](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer)** enables cross-thread memory access
124//! - **Cross-origin isolation** required for SAB support
125//! - **Typed array views** provide efficient memory access from JS
126//! - **Ring polling** in main thread for completion handling
127
128#![allow(dead_code)]
129
130// =============================================================================
131// Public enums (shared with JS)
132// =============================================================================
133
134/// Arena node types supporting SKI evaluation and parallel execution.
135///
136/// See module-level documentation for detailed descriptions of each variant.
137#[repr(u8)]
138#[derive(Clone, Copy, PartialEq, Eq, Debug)]
139pub enum ArenaKind {
140    /// Terminal node containing an SKI combinator (S, K, or I).
141    /// - `sym`: The combinator symbol
142    /// - `left`/`right`: Unused (reserved)
143    Terminal = 1,
144
145    /// Application node representing function application `(left right)`.
146    /// - `left`: Function expression
147    /// - `right`: Argument expression
148    /// - `sym`: Unused (reserved)
149    NonTerm = 2,
150
151    /// Stack frame for iterative reduction algorithm.
152    /// Used to avoid recursion stack overflow on deep expressions.
153    /// - `sym`: Reduction stage (0=left child, 1=right child)
154    /// - `left`: Parent stack frame
155    /// - `right`: Parent expression node
156    Continuation = 3,
157
158    /// Paused evaluation state for preemptive multitasking.
159    /// Captures complete evaluation context for resumption.
160    /// - `sym`: Evaluation mode (0=descend, 1=return)
161    /// - `left`: Current expression being evaluated
162    /// - `right`: Evaluation stack
163    /// - `hash`: Remaining reduction steps
164    Suspension = 4,
165}
166
167/// SKI combinator symbols and evaluation state markers.
168///
169/// Used in both Terminal nodes (for combinators) and Continuation/Suspension
170/// nodes (for evaluation state).
171#[repr(u8)]
172#[derive(Clone, Copy, PartialEq, Eq, Debug)]
173pub enum ArenaSym {
174    /// S combinator: `S x y z → x z (y z)`
175    /// The most complex combinator, enabling arbitrary computation.
176    S = 1,
177
178    /// K combinator: `K x y → x`
179    /// The constant function, discards its second argument.
180    K = 2,
181
182    /// I combinator: `I x → x`
183    /// The identity function, returns its argument unchanged.
184    I = 3,
185
186    /// readOne terminal: consumes one byte from stdin, passes Church numeral to continuation.
187    ReadOne = 4,
188
189    /// writeOne terminal: writes one byte to stdout, returns its argument.
190    WriteOne = 5,
191
192    // Internal-only primitives for Church decoding (not surfaced in the language).
193    Inc = 6,
194    Num = 7,
195}
196
197// =============================================================================
198// Non-WASM stubs (keep TS type-checking happy)
199// =============================================================================
200#[cfg(not(target_arch = "wasm32"))]
201#[no_mangle]
202pub extern "C" fn initArena(_cap: u32) -> u32 { 0 }
203#[cfg(not(target_arch = "wasm32"))]
204#[no_mangle]
205pub extern "C" fn connectArena(_p: u32) -> u32 { 0 }
206#[cfg(not(target_arch = "wasm32"))]
207#[no_mangle]
208pub extern "C" fn allocTerminal(_s: u32) -> u32 { 0 }
209#[cfg(not(target_arch = "wasm32"))]
210#[no_mangle]
211pub extern "C" fn allocCons(_l: u32, _r: u32) -> u32 { 0 }
212#[cfg(not(target_arch = "wasm32"))]
213#[no_mangle]
214pub extern "C" fn kindOf(_n: u32) -> u32 { 0 }
215#[cfg(not(target_arch = "wasm32"))]
216#[no_mangle]
217pub extern "C" fn symOf(_n: u32) -> u32 { 0 }
218#[cfg(not(target_arch = "wasm32"))]
219#[no_mangle]
220pub extern "C" fn leftOf(_n: u32) -> u32 { 0 }
221#[cfg(not(target_arch = "wasm32"))]
222#[no_mangle]
223pub extern "C" fn rightOf(_n: u32) -> u32 { 0 }
224#[cfg(not(target_arch = "wasm32"))]
225#[no_mangle]
226pub extern "C" fn reset() {}
227#[cfg(not(target_arch = "wasm32"))]
228#[no_mangle]
229pub extern "C" fn arenaKernelStep(x: u32) -> u32 { x }
230#[cfg(not(target_arch = "wasm32"))]
231#[no_mangle]
232pub extern "C" fn hostSubmit(_id: u32) -> u32 { 1 }
233#[cfg(not(target_arch = "wasm32"))]
234#[no_mangle]
235pub extern "C" fn hostPull() -> u32 { u32::MAX }
236#[cfg(not(target_arch = "wasm32"))]
237#[no_mangle]
238pub extern "C" fn workerLoop() {}
239#[cfg(not(target_arch = "wasm32"))]
240#[no_mangle]
241pub extern "C" fn debugGetArenaBaseAddr() -> u32 { 0 }
242#[cfg(not(target_arch = "wasm32"))]
243#[no_mangle]
244pub extern "C" fn getArenaMode() -> u32 { 0 }
245#[cfg(not(target_arch = "wasm32"))]
246#[no_mangle]
247pub extern "C" fn debugCalculateArenaSize(_c: u32) -> u32 { 0 }
248#[cfg(not(target_arch = "wasm32"))]
249#[no_mangle]
250pub extern "C" fn debugLockState() -> u32 { 0xffff_ffff }
251
252// =============================================================================
253// WASM implementation
254// =============================================================================
255#[cfg(target_arch = "wasm32")]
256mod wasm {
257    use core::arch::wasm32;
258    use core::cell::UnsafeCell;
259    use core::sync::atomic::{AtomicU32, AtomicU8, Ordering};
260    use crate::{ArenaKind, ArenaSym};
261
262    // -------------------------------------------------------------------------
263    // Constants / helpers
264    // -------------------------------------------------------------------------
265    pub const EMPTY: u32 = 0xffff_ffff;
266    const ARENA_MAGIC: u32 = 0x534B_4941; // "SKIA"
267    const INITIAL_CAP: u32 = 1 << 20;
268    const MAX_CAP: u32 = 1 << 27;
269    const WASM_PAGE_SIZE: usize = 65536;
270    // Size of SQ/CQ rings (power of two).
271    //
272    // Larger rings reduce backpressure + atomic wait/notify churn when many workers
273    // produce results faster than the host can drain them (a common cause of "stuttering"
274    // worker timelines and main-thread saturation in profiles).
275    const RING_ENTRIES: u32 = 1 << 16; // 65536
276    const TERM_CACHE_LEN: usize = 6; // 0..=WriteOne
277
278    #[inline(always)]
279    const fn align64(x: u32) -> u32 {
280        (x + 63) & !63
281    }
282
283    // -------------------------------------------------------------------------
284    // Atomics + wait/notify
285    // -------------------------------------------------------------------------
286    // WASM atomics / CAS safety notes (applies to all compare_exchange* uses below)
287    // -------------------------------------------------------------------------
288    //
289    // This module relies heavily on CAS (compare_exchange / compare_exchange_weak),
290    // which compiles to a single atomic read-modify-write instruction in WebAssembly
291    // (e.g. `i32.atomic.rmw.cmpxchg`). That property prevents the classic "check-then-
292    // store" race where two threads both observe a value and both write the same
293    // update thinking they won; CAS has a single winner and forces losers to retry.
294    //
295    // ABA notes:
296    // - Many CAS patterns are vulnerable to ABA when the *same* value can reappear
297    //   (e.g. counters wrapping or pointer reuse). In this codebase, ABA is either
298    //   explicitly prevented (ring slot sequence numbers advance by whole cycles)
299    //   or is practically irrelevant in context (e.g. seqlock-style counters would
300    //   require billions of expensive operations to wrap during one read section).
301    //
302    // WASM platform particularities:
303    // - When wasm threads are enabled, the linear memory is backed by a
304    //   `SharedArrayBuffer`, and atomic ops synchronize across Web Workers.
305    //   Rust `Ordering::{Acquire,Release,AcqRel,SeqCst}` map onto WASM atomics/fences
306    //   to provide the needed visibility guarantees.
307    // - `memory.grow` increases linear memory size but does not "move" addresses
308    //   from the module's perspective; what changes is our chosen layout within
309    //   that address space (offsets/capacity), guarded by atomics/seqlock.
310    //
311    // External references:
312    // - Seqlock concept: https://en.wikipedia.org/wiki/Seqlock
313    // - SharedArrayBuffer (required for wasm threads): https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer
314    // - WebAssembly features (threads/atomics): https://webassembly.org/features/
315    // - Rust atomic memory orderings: https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html
316    // - WebAssembly `memory.grow`: https://developer.mozilla.org/en-US/docs/WebAssembly/Reference/Memory/Grow
317    //
318    mod sys {
319        use super::*;
320        #[inline(always)]
321        pub fn wait32(ptr: &AtomicU32, expected: u32) {
322            unsafe {
323                let _ = wasm32::memory_atomic_wait32(
324                    ptr as *const _ as *mut i32,
325                    expected as i32,
326                    -1,
327                );
328            }
329        }
330        #[inline(always)]
331        pub fn notify(ptr: &AtomicU32, count: u32) {
332            unsafe {
333                let _ = wasm32::memory_atomic_notify(ptr as *const _ as *mut i32, count);
334            }
335        }
336    }
337
338    // -------------------------------------------------------------------------
339    // Ring Buffer Types (io_uring Style)
340    // -------------------------------------------------------------------------
341
342    /// Submission Queue Entry: Main thread → Worker communication.
343    ///
344    /// Sent from main thread to worker to request evaluation of an expression.
345    /// Workers dequeue these and perform the actual reduction work.
346    #[repr(C)]
347    #[derive(Clone, Copy)]
348    pub struct Sqe {
349        /// Arena node ID of the expression to evaluate.
350        /// This is the root of the expression tree to reduce.
351        pub node_id: u32,
352
353        /// Unique request identifier for correlation.
354        /// Used to match completion queue entries back to the original request.
355        /// Must be unique across all outstanding requests.
356        pub req_id: u32,
357
358        /// Max reduction steps for this specific request.
359        /// Replaces previous _pad field. Each request carries its own immutable limit.
360        pub max_steps: u32,
361    }
362
363    /// Completion Queue Entry: Worker → Main thread results.
364    ///
365    /// Workers enqueue these when they complete (or yield) evaluation work.
366    /// The main thread polls the completion queue to retrieve results.
367    #[repr(C)]
368    #[derive(Clone, Copy)]
369    pub struct Cqe {
370        /// Result node ID or Suspension node ID (for yields).
371        /// - If reduction completed: The fully reduced expression node
372        /// - If evaluation yielded: A Suspension node for resumption
373        pub node_id: u32,
374
375        /// Request identifier matching the original Sqe.req_id.
376        /// Used by main thread to correlate completions with pending requests.
377        pub req_id: u32,
378
379        /// Padding to align Slot<Cqe> to 16 bytes (power of 2) for efficient indexing.
380        pub _pad: u32,
381    }
382
383    /// Ring buffer slot with sequence number for ABA prevention.
384    ///
385    /// Each slot contains a sequence number and payload. The sequence number
386    /// prevents ABA problems in concurrent CAS operations by ensuring that
387    /// a slot can only be reused after a full cycle of the ring.
388    ///
389    /// ## Sequence Number Protocol
390    ///
391    /// - **Initial state**: `seq = slot_index`
392    /// - **Producer stores**: `seq = tail + 1`, payload written, `seq = tail + 1` (final)
393    /// - **Consumer loads**: Checks `seq == head + 1`, payload read, `seq = head + mask + 1`
394    ///
395    /// This ensures proper ordering and prevents race conditions.
396    #[repr(C)]
397    struct Slot<T> {
398        /// Sequence number for synchronization and ABA prevention.
399        /// Must be atomically updated to maintain memory ordering.
400        seq: AtomicU32,
401
402        /// The actual data payload stored in this slot.
403        /// Access must be synchronized with sequence number updates.
404        payload: UnsafeCell<T>,
405    }
406
407    // SAFETY: Ring<T> ensures proper synchronization of Slot<T> access.
408    // The UnsafeCell is only accessed after proper sequence number validation.
409    unsafe impl<T> Sync for Slot<T> {}
410
411    /// Lock-free, wait-free ring buffer for inter-thread communication.
412    ///
413    /// Uses [io_uring](https://en.wikipedia.org/wiki/Io_uring)-style producer/consumer pattern with atomic operations.
414    /// Supports both non-blocking (try_*) and blocking (*_blocking) operations.
415    ///
416    /// ## Design Principles
417    ///
418    /// - **Single-producer, single-consumer** per ring instance
419    /// - **Power-of-two sizing** for efficient masking operations
420    /// - **Cache-line alignment** (64-byte) to prevent false sharing
421    /// - **Atomic wait/notify** for efficient blocking operations
422    /// - **Sequence numbers** prevent ABA problems in concurrent access
423    ///
424    /// ## Memory Layout
425    ///
426    /// ```text
427    /// +----------------------+ <-- 64-byte aligned
428    /// | head: AtomicU32      |     Consumer position
429    /// | not_full: AtomicU32  |     Wait/notify for producers
430    /// | _pad1: [u8; 56]      |     Cache line padding
431    /// +----------------------+ <-- 64-byte aligned
432    /// | tail: AtomicU32      |     Producer position
433    /// | not_empty: AtomicU32 |     Wait/notify for consumers
434    /// | _pad2: [u8; 56]      |     Cache line padding
435    /// +----------------------+
436    /// | mask: u32            |     entries - 1 (for fast modulo)
437    /// | entries: u32         |     Ring capacity (power of 2)
438    /// +----------------------+
439    /// | slots[entries]       |     Array of Slot<T> entries
440    /// +----------------------+
441    /// ```
442    ///
443    /// ## Thread Safety
444    ///
445    /// - **Producer calls**: `try_enqueue()`, `enqueue_blocking()`
446    /// - **Consumer calls**: `try_dequeue()`, `dequeue_blocking()`
447    /// - **No internal locking**: Uses atomic CAS operations only
448    /// - **Wait-free progress**: No thread can be indefinitely blocked
449    #[repr(C, align(64))]
450    pub struct Ring<T> {
451        /// Consumer position (head of queue).
452        /// Only modified by consumer thread via CAS.
453        head: AtomicU32,
454
455        /// Producer wait/notify synchronization.
456        /// Used by producers to wait when ring is full.
457        not_full: AtomicU32,
458
459        /// Cache line padding to prevent false sharing with tail.
460        _pad1: [u8; 56],
461
462        /// Producer position (tail of queue).
463        /// Only modified by producer thread via CAS.
464        tail: AtomicU32,
465
466        /// Consumer wait/notify synchronization.
467        /// Used by consumers to wait when ring is empty.
468        not_empty: AtomicU32,
469
470        /// Cache line padding to prevent false sharing with head.
471        _pad2: [u8; 56],
472
473        /// Bitmask for fast modulo: `index & mask` ≡ `index % entries`
474        mask: u32,
475
476        /// Ring capacity (must be power of 2).
477        entries: u32,
478
479        /// Zero-sized type marker for generic parameter.
480        _marker: core::marker::PhantomData<T>,
481    }
482
483    impl<T: Copy> Ring<T> {
484        /// Get pointer to the slots array following the Ring header.
485        #[inline(always)]
486        fn slots_ptr(&self) -> *const Slot<T> {
487            unsafe { (self as *const Ring<T>).add(1) as *const Slot<T> }
488        }
489
490        /// Get reference to slot at the given index (masked for wraparound).
491        #[inline(always)]
492        unsafe fn slot_at(&self, i: u32) -> &Slot<T> {
493            &*self.slots_ptr().add((i & self.mask) as usize)
494        }
495
496        /// Initialize a ring buffer at the given memory location.
497        ///
498        /// # Safety
499        ///
500        /// - `ptr` must point to sufficient memory for the ring and all slots
501        /// - `entries_pow2` must be a power of 2
502        /// - The ring must not be accessed concurrently during initialization
503        #[inline(always)]
504        pub unsafe fn init_at(ptr: *mut u8, entries_pow2: u32) -> &'static Self {
505            let ring = &mut *(ptr as *mut Ring<T>);
506            // Initialize producer/consumer positions
507            ring.head.store(0, Ordering::Relaxed);
508            ring.tail.store(0, Ordering::Relaxed);
509            // Initialize wait/notify counters
510            ring.not_empty.store(0, Ordering::Relaxed);
511            ring.not_full.store(0, Ordering::Relaxed);
512            // Set ring parameters
513            ring.entries = entries_pow2;
514            ring.mask = entries_pow2 - 1;
515            // Initialize slot sequence numbers to their indices
516            for i in 0..entries_pow2 {
517                ring.slot_at(i).seq.store(i, Ordering::Relaxed);
518            }
519            ring
520        }
521
522        /// Attempt to enqueue an item without blocking.
523        ///
524        /// Returns `true` if the item was successfully enqueued, `false` if the ring is full.
525        ///
526        /// ## Algorithm
527        ///
528        /// 1. Load current tail position
529        /// 2. Check if the corresponding slot is available (`seq == tail`)
530        /// 3. Attempt to claim the slot by CAS'ing tail forward
531        /// 4. If successful: write payload, update sequence, notify consumers
532        /// 5. If slot unavailable: check if ring is full or retry
533        ///
534        /// ## Memory Ordering
535        ///
536        /// - `Acquire` load of sequence number ensures payload visibility
537        /// - `Release` store of sequence number publishes payload to consumers
538        /// - `Release` notify ensures consumer sees all prior writes
539        ///
540        /// See [memory barriers](https://en.wikipedia.org/wiki/Memory_barrier) for details.
541        #[inline(always)]
542        pub fn try_enqueue(&self, item: T) -> bool {
543            unsafe {
544                loop {
545                    // Load producer position (relaxed: no ordering requirements)
546                    let t = self.tail.load(Ordering::Relaxed);
547                    let slot = self.slot_at(t);
548
549                    // Check if slot is available for us (Acquire: see previous publications)
550                    let s = slot.seq.load(Ordering::Acquire);
551                    let diff = s.wrapping_sub(t);
552
553                    if diff == 0 {
554                        // Slot is free. Try to claim it by advancing tail.
555                        // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
556                        if self
557                            .tail
558                            .compare_exchange_weak(
559                                t,
560                                t.wrapping_add(1),
561                                Ordering::Relaxed, // Success: no special ordering
562                                Ordering::Relaxed, // Failure: no special ordering
563                            )
564                            .is_ok()
565                        {
566                            // Successfully claimed slot. Write payload and publish.
567                            *slot.payload.get() = item;
568                            slot.seq.store(t.wrapping_add(1), Ordering::Release);
569                            // Notify waiting consumers (Release: publish all prior writes)
570                            self.not_empty.fetch_add(1, Ordering::Release);
571                            sys::notify(&self.not_empty, 1);
572                            return true;
573                        }
574                        // CAS failed: another producer claimed it, retry
575                    } else if (diff as i32) < 0 {
576                        // Ring is full: slot sequence is too far ahead
577                        return false;
578                    }
579                    // Slot not ready yet, retry (another iteration in progress)
580                }
581            }
582        }
583
584        /// Attempt to dequeue an item without blocking.
585        ///
586        /// Returns `Some(item)` if an item was successfully dequeued, `None` if the ring is empty.
587        ///
588        /// ## Algorithm
589        ///
590        /// 1. Load current head position
591        /// 2. Check if the corresponding slot has data (`seq == head + 1`)
592        /// 3. Attempt to claim the slot by CAS'ing head forward
593        /// 4. If successful: read payload, update sequence for reuse, notify producers
594        /// 5. If slot unavailable: check if ring is empty or retry
595        ///
596        /// ## Sequence Number Reuse
597        ///
598        /// After consuming, sequence is set to `head + mask + 1`. Since `mask = entries - 1`,
599        /// this ensures the slot won't be reused until after a full ring cycle, preventing ABA.
600        #[inline(always)]
601        pub fn try_dequeue(&self) -> Option<T> {
602            unsafe {
603                loop {
604                    // Load consumer position (relaxed: no ordering requirements)
605                    let h = self.head.load(Ordering::Relaxed);
606                    let slot = self.slot_at(h);
607
608                    // Check if slot has data for us (Acquire: see producer's publication)
609                    let s = slot.seq.load(Ordering::Acquire);
610                    let diff = s.wrapping_sub(h.wrapping_add(1));
611
612                    if diff == 0 {
613                        // Slot has data. Try to claim it by advancing head.
614                        // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
615                        if self
616                            .head
617                            .compare_exchange_weak(
618                                h,
619                                h.wrapping_add(1),
620                                Ordering::Relaxed, // Success: no special ordering
621                                Ordering::Relaxed, // Failure: no special ordering
622                            )
623                            .is_ok()
624                        {
625                            // Successfully claimed slot. Read payload and release for reuse.
626                            let item = *slot.payload.get();
627                            // Set sequence for next cycle: h + mask + 1 prevents ABA issues
628                            slot.seq
629                                .store(h.wrapping_add(self.mask).wrapping_add(1), Ordering::Release);
630                            // Notify waiting producers (Release: publish slot availability)
631                            self.not_full.fetch_add(1, Ordering::Release);
632                            sys::notify(&self.not_full, 1);
633                            return Some(item);
634                        }
635                        // CAS failed: another consumer claimed it, retry
636                    } else if (diff as i32) < 0 {
637                        // Ring is empty: no data available
638                        return None;
639                    }
640                    // Slot not ready yet, retry (another operation in progress)
641                }
642            }
643        }
644
645        /// Enqueue an item, blocking until space is available.
646        ///
647        /// Uses WASM atomic wait/notify for efficient blocking when the ring is full.
648        /// The wait is interruptible and will retry the enqueue operation.
649        #[inline(always)]
650        pub fn enqueue_blocking(&self, item: T) {
651            while !self.try_enqueue(item) {
652                // Load current state and wait for notification
653                let v = self.not_full.load(Ordering::Acquire);
654                // Double-check after loading (spurious wakeup protection)
655                if self.try_enqueue(item) {
656                    return;
657                }
658                // Wait for producer to notify us of available space
659                sys::wait32(&self.not_full, v);
660            }
661        }
662
663        /// Dequeue an item, blocking until data is available.
664        ///
665        /// Uses WASM atomic wait/notify for efficient blocking when the ring is empty.
666        /// The wait is interruptible and will retry the dequeue operation.
667        #[inline(always)]
668        pub fn dequeue_blocking(&self) -> T {
669            loop {
670                if let Some(x) = self.try_dequeue() {
671                    return x;
672                }
673                // Load current state and wait for notification
674                let v = self.not_empty.load(Ordering::Acquire);
675                // Double-check after loading (spurious wakeup protection)
676                if let Some(x) = self.try_dequeue() {
677                    return x;
678                }
679                // Wait for consumer to notify us of available data
680                sys::wait32(&self.not_empty, v);
681            }
682        }
683    }
684
685    #[inline(always)]
686    const fn ring_bytes<T>(entries: u32) -> u32 {
687        let header = core::mem::size_of::<Ring<T>>() as u32;
688        let slot = core::mem::size_of::<Slot<T>>() as u32;
689        align64(header + entries * slot)
690    }
691
692    // -------------------------------------------------------------------------
693    // Header layout (fixed offsets)
694    // -------------------------------------------------------------------------
695    struct SabLayout {
696        offset_sq: u32,
697        offset_cq: u32,
698        offset_stdin: u32,
699        offset_stdout: u32,
700        offset_stdin_wait: u32,
701        offset_kind: u32,
702        offset_sym: u32,
703        offset_left_id: u32,
704        offset_right_id: u32,
705        offset_hash32: u32,
706        offset_next_idx: u32,
707        offset_buckets: u32,
708        offset_term_cache: u32,
709        total_size: u32,
710    }
711
712    #[repr(C, align(64))]
713    struct SabHeader {
714        magic: u32,
715        ring_entries: u32,
716        ring_mask: u32,
717        offset_sq: u32,
718        offset_cq: u32,
719        offset_stdin: u32,
720        offset_stdout: u32,
721        offset_stdin_wait: u32,
722        offset_kind: u32,
723        offset_sym: u32,
724        offset_left_id: u32,
725        offset_right_id: u32,
726        offset_hash32: u32,
727        offset_next_idx: u32,
728        offset_buckets: u32,
729        offset_term_cache: u32,
730        capacity: u32,
731        bucket_mask: u32,
732        resize_seq: AtomicU32,
733        top: AtomicU32,
734    }
735
736    impl SabHeader {
737        fn layout(capacity: u32) -> SabLayout {
738            let header_size = core::mem::size_of::<SabHeader>() as u32;
739            let offset_sq = align64(header_size);
740            let offset_cq = align64(offset_sq + ring_bytes::<Sqe>(RING_ENTRIES));
741            let offset_stdin = align64(offset_cq + ring_bytes::<Cqe>(RING_ENTRIES));
742            let offset_stdout = align64(offset_stdin + ring_bytes::<u8>(RING_ENTRIES));
743            let offset_stdin_wait = align64(offset_stdout + ring_bytes::<u8>(RING_ENTRIES));
744
745            let offset_kind = align64(offset_stdin_wait + ring_bytes::<u32>(RING_ENTRIES));
746            let offset_sym = offset_kind + capacity;
747            let offset_left_id = align64(offset_sym + capacity);
748            let offset_right_id = offset_left_id + 4 * capacity;
749            let offset_hash32 = offset_right_id + 4 * capacity;
750            let offset_next_idx = offset_hash32 + 4 * capacity;
751            let offset_buckets = align64(offset_next_idx + 4 * capacity);
752            let offset_term_cache = offset_buckets + 4 * capacity;
753            let total_size = offset_term_cache + (TERM_CACHE_LEN as u32) * 4;
754            SabLayout {
755                offset_sq,
756                offset_cq,
757                offset_stdin,
758                offset_stdout,
759                offset_stdin_wait,
760                offset_kind,
761                offset_sym,
762                offset_left_id,
763                offset_right_id,
764                offset_hash32,
765                offset_next_idx,
766                offset_buckets,
767                offset_term_cache,
768                total_size,
769            }
770        }
771    }
772
773    // -------------------------------------------------------------------------
774    // Globals
775    // -------------------------------------------------------------------------
776    static mut ARENA_BASE_ADDR: u32 = 0;
777    static mut ARENA_MODE: u32 = 0;
778
779    #[inline(always)]
780    unsafe fn ensure_arena() {
781        if ARENA_BASE_ADDR != 0 {
782            return;
783        }
784        // If we were supposed to be in SAB mode but have no base, that's fatal.
785        if ARENA_MODE == 1 {
786            wasm32::unreachable();
787        }
788        let ptr = allocate_raw_arena(INITIAL_CAP);
789        if ptr.is_null() {
790            wasm32::unreachable();
791        }
792        // Heap / single-instance mode
793        ARENA_MODE = 0;
794    }
795
796    // -------------------------------------------------------------------------
797    // Helpers to locate structures
798    // -------------------------------------------------------------------------
799    #[inline(always)]
800    unsafe fn header() -> &'static SabHeader {
801        &*(ARENA_BASE_ADDR as *const SabHeader)
802    }
803
804    #[inline(always)]
805    unsafe fn header_mut() -> &'static mut SabHeader {
806        &mut *(ARENA_BASE_ADDR as *mut SabHeader)
807    }
808
809    #[inline(always)]
810    unsafe fn sq_ring() -> &'static Ring<Sqe> {
811        let h = header();
812        &*((ARENA_BASE_ADDR + h.offset_sq) as *const Ring<Sqe>)
813    }
814
815    #[inline(always)]
816    unsafe fn cq_ring() -> &'static Ring<Cqe> {
817        let h = header();
818        &*((ARENA_BASE_ADDR + h.offset_cq) as *const Ring<Cqe>)
819    }
820
821    #[inline(always)]
822    unsafe fn stdin_ring() -> &'static Ring<u8> {
823        let h = header();
824        &*((ARENA_BASE_ADDR + h.offset_stdin) as *const Ring<u8>)
825    }
826
827    #[inline(always)]
828    unsafe fn stdout_ring() -> &'static Ring<u8> {
829        let h = header();
830        &*((ARENA_BASE_ADDR + h.offset_stdout) as *const Ring<u8>)
831    }
832
833    #[inline(always)]
834    unsafe fn stdin_wait_ring() -> &'static Ring<u32> {
835        let h = header();
836        &*((ARENA_BASE_ADDR + h.offset_stdin_wait) as *const Ring<u32>)
837    }
838
839    // Array helpers
840    #[inline(always)]
841    unsafe fn kind_ptr() -> *mut AtomicU8 {
842        (ARENA_BASE_ADDR + header().offset_kind) as *mut AtomicU8
843    }
844    #[inline(always)]
845    unsafe fn sym_ptr() -> *mut AtomicU8 {
846        (ARENA_BASE_ADDR + header().offset_sym) as *mut AtomicU8
847    }
848    #[inline(always)]
849    unsafe fn left_ptr() -> *mut AtomicU32 {
850        (ARENA_BASE_ADDR + header().offset_left_id) as *mut AtomicU32
851    }
852    #[inline(always)]
853    unsafe fn right_ptr() -> *mut AtomicU32 {
854        (ARENA_BASE_ADDR + header().offset_right_id) as *mut AtomicU32
855    }
856    #[inline(always)]
857    unsafe fn hash_ptr() -> *mut AtomicU32 {
858        (ARENA_BASE_ADDR + header().offset_hash32) as *mut AtomicU32
859    }
860    #[inline(always)]
861    unsafe fn next_ptr() -> *mut AtomicU32 {
862        (ARENA_BASE_ADDR + header().offset_next_idx) as *mut AtomicU32
863    }
864    #[inline(always)]
865    unsafe fn buckets_ptr() -> *mut AtomicU32 {
866        (ARENA_BASE_ADDR + header().offset_buckets) as *mut AtomicU32
867    }
868    #[inline(always)]
869    unsafe fn term_cache_ptr() -> *mut AtomicU32 {
870        (ARENA_BASE_ADDR + header().offset_term_cache) as *mut AtomicU32
871    }
872
873    // -------------------------------------------------------------------------
874    // Hashing helpers
875    // -------------------------------------------------------------------------
876    fn avalanche32(mut x: u32) -> u32 {
877        x ^= x >> 16;
878        x = x.wrapping_mul(0x7feb_352d);
879        x ^= x >> 15;
880        x = x.wrapping_mul(0x846c_a68b);
881        x ^= x >> 16;
882        x
883    }
884    const GOLD: u32 = 0x9e37_79b9;
885    fn mix(a: u32, b: u32) -> u32 {
886        avalanche32(a ^ b.wrapping_mul(GOLD))
887    }
888
889    // If a resize fails (OOM / max cap), poison the seqlock so other threads trap
890    // instead of spinning forever on an odd seq.
891    const POISON_SEQ: u32 = 0xffff_ffff;
892
893    // -------------------------------------------------------------------------
894    // Resize guard (seqlock-style)
895    // -------------------------------------------------------------------------
896    // See "WASM atomics / CAS safety notes" above for CAS/ABA discussion.
897    // This specific guard is seqlock-style: even=stable, odd=resize in progress.
898    // For READING existing data. Returns (seq, h) to enable read-verify-retry pattern:
899    // 1. Call enter_stable() to get sequence number
900    // 2. Read the data you need
901    // 3. Call check_stable(seq) to verify sequence didn't change
902    // 4. If changed, retry (resize occurred during read)
903    // Used in: kindOf(), symOf(), leftOf(), rightOf(), hash lookups
904    #[inline(always)]
905    fn enter_stable() -> (u32, &'static SabHeader) {
906        unsafe {
907            let h = header();
908            loop {
909                let seq = h.resize_seq.load(Ordering::Acquire);
910                if seq == POISON_SEQ {
911                    core::arch::wasm32::unreachable();
912                }
913                if seq & 1 == 1 {
914                    core::hint::spin_loop();
915                    continue;
916                }
917                return (seq, h);
918            }
919        }
920    }
921
922    // For WRITING new data. See comment above enter_stable() for distinction.
923    // Simply waits until resize completes (sequence is even).
924    // No verification needed since we're not reading existing data.
925    // Used in: allocTerminal(), allocCons() allocation path, alloc_generic()
926    #[inline(always)]
927    fn wait_resize_stable() {
928        unsafe {
929            let h = header();
930            loop {
931                let seq = h.resize_seq.load(Ordering::Acquire);
932                if seq == POISON_SEQ {
933                    core::arch::wasm32::unreachable();
934                }
935                if (seq & 1) == 0 {
936                    return;
937                }
938                core::hint::spin_loop();
939            }
940        }
941    }
942
943    #[inline(always)]
944    fn check_stable(seq: u32) -> bool {
945        unsafe { header().resize_seq.load(Ordering::Acquire) == seq }
946    }
947
948    // -------------------------------------------------------------------------
949    // Arena init / connect
950    // -------------------------------------------------------------------------
951    unsafe fn zero_region(start: u32, len: u32) {
952        core::ptr::write_bytes((ARENA_BASE_ADDR + start) as *mut u8, 0, len as usize);
953    }
954
955    unsafe fn init_header(capacity: u32) {
956        let layout = SabHeader::layout(capacity);
957        let h = &mut *(ARENA_BASE_ADDR as *mut SabHeader);
958        h.magic = ARENA_MAGIC;
959        h.ring_entries = RING_ENTRIES;
960        h.ring_mask = RING_ENTRIES - 1;
961        h.offset_sq = layout.offset_sq;
962        h.offset_cq = layout.offset_cq;
963        h.offset_stdin = layout.offset_stdin;
964        h.offset_stdout = layout.offset_stdout;
965        h.offset_stdin_wait = layout.offset_stdin_wait;
966        h.offset_kind = layout.offset_kind;
967        h.offset_sym = layout.offset_sym;
968        h.offset_left_id = layout.offset_left_id;
969        h.offset_right_id = layout.offset_right_id;
970        h.offset_hash32 = layout.offset_hash32;
971        h.offset_next_idx = layout.offset_next_idx;
972        h.offset_buckets = layout.offset_buckets;
973        h.offset_term_cache = layout.offset_term_cache;
974        h.capacity = capacity;
975        h.bucket_mask = capacity - 1;
976        h.resize_seq.store(0, Ordering::Relaxed);
977        h.top.store(0, Ordering::Relaxed);
978
979        zero_region(
980            (core::mem::size_of::<SabHeader>()) as u32,
981            layout.total_size - core::mem::size_of::<SabHeader>() as u32,
982        );
983
984        Ring::<Sqe>::init_at((ARENA_BASE_ADDR + h.offset_sq) as *mut u8, RING_ENTRIES);
985        Ring::<Cqe>::init_at((ARENA_BASE_ADDR + h.offset_cq) as *mut u8, RING_ENTRIES);
986        Ring::<u8>::init_at((ARENA_BASE_ADDR + h.offset_stdin) as *mut u8, RING_ENTRIES);
987        Ring::<u8>::init_at((ARENA_BASE_ADDR + h.offset_stdout) as *mut u8, RING_ENTRIES);
988        Ring::<u32>::init_at((ARENA_BASE_ADDR + h.offset_stdin_wait) as *mut u8, RING_ENTRIES);
989
990        // Buckets + cache init
991        let buckets = buckets_ptr();
992        for i in 0..capacity as usize {
993            buckets.add(i).write(AtomicU32::new(EMPTY));
994        }
995        let cache = term_cache_ptr();
996        for i in 0..TERM_CACHE_LEN {
997            cache.add(i).write(AtomicU32::new(EMPTY));
998        }
999    }
1000
1001    unsafe fn allocate_raw_arena(capacity: u32) -> *mut SabHeader {
1002        let layout = SabHeader::layout(capacity);
1003        let pages_needed = (layout.total_size as usize + WASM_PAGE_SIZE - 1) / WASM_PAGE_SIZE;
1004        let old_pages = wasm32::memory_grow(0, pages_needed);
1005        if old_pages == usize::MAX {
1006            return core::ptr::null_mut();
1007        }
1008        let base_addr = (old_pages * WASM_PAGE_SIZE) as u32;
1009        ARENA_BASE_ADDR = base_addr;
1010        init_header(capacity);
1011        base_addr as *mut SabHeader
1012    }
1013
1014    // -------------------------------------------------------------------------
1015    // Exports: init/connect/reset
1016    // -------------------------------------------------------------------------
1017    #[no_mangle]
1018    pub extern "C" fn initArena(initial_capacity: u32) -> u32 {
1019        if initial_capacity < 1024 || initial_capacity > MAX_CAP || !initial_capacity.is_power_of_two() {
1020            return 0;
1021        }
1022        unsafe {
1023            if ARENA_BASE_ADDR != 0 {
1024                return ARENA_BASE_ADDR;
1025            }
1026            let ptr = allocate_raw_arena(initial_capacity);
1027            if ptr.is_null() {
1028                return 1;
1029            }
1030            ARENA_MODE = 1;
1031            ARENA_BASE_ADDR
1032        }
1033    }
1034
1035    #[no_mangle]
1036    pub extern "C" fn connectArena(ptr_addr: u32) -> u32 {
1037        if ptr_addr == 0 || ptr_addr % 64 != 0 {
1038            return 0;
1039        }
1040        unsafe {
1041            ARENA_BASE_ADDR = ptr_addr;
1042            ARENA_MODE = 1;
1043            let h = header();
1044            if h.magic != ARENA_MAGIC {
1045                return 5;
1046            }
1047            1
1048        }
1049    }
1050
1051    #[no_mangle]
1052    pub extern "C" fn reset() {
1053        unsafe {
1054            ensure_arena();
1055            let h = header_mut();
1056            h.top.store(0, Ordering::Release);
1057            let buckets = buckets_ptr();
1058            for i in 0..h.capacity as usize {
1059                (*buckets.add(i)).store(EMPTY, Ordering::Release);
1060            }
1061            let cache = term_cache_ptr();
1062            for i in 0..TERM_CACHE_LEN {
1063                (*cache.add(i)).store(EMPTY, Ordering::Release);
1064            }
1065            h.resize_seq.store(h.resize_seq.load(Ordering::Relaxed) & !1, Ordering::Release);
1066        }
1067    }
1068
1069    // -------------------------------------------------------------------------
1070    // Accessors
1071    // -------------------------------------------------------------------------
1072    #[no_mangle]
1073    pub extern "C" fn kindOf(n: u32) -> u32 {
1074        unsafe {
1075            ensure_arena();
1076            loop {
1077                let (seq, h) = enter_stable();
1078                let cap = h.capacity;
1079                if n >= cap {
1080                    return 0;
1081                }
1082                let val = (*kind_ptr().add(n as usize)).load(Ordering::Acquire) as u32;
1083                core::sync::atomic::fence(Ordering::Acquire);
1084                if check_stable(seq) {
1085                    return val;
1086                }
1087            }
1088        }
1089    }
1090
1091    #[no_mangle]
1092    pub extern "C" fn symOf(n: u32) -> u32 {
1093        unsafe {
1094            ensure_arena();
1095            loop {
1096                let (seq, h) = enter_stable();
1097                if n >= h.capacity {
1098                    return 0;
1099                }
1100                let val = (*sym_ptr().add(n as usize)).load(Ordering::Acquire) as u32;
1101                core::sync::atomic::fence(Ordering::Acquire);
1102                if check_stable(seq) {
1103                    return val;
1104                }
1105            }
1106        }
1107    }
1108
1109    #[no_mangle]
1110    pub extern "C" fn leftOf(n: u32) -> u32 {
1111        unsafe {
1112            ensure_arena();
1113            loop {
1114                let (seq, h) = enter_stable();
1115                if n >= h.capacity {
1116                    return 0;
1117                }
1118                let val = (*left_ptr().add(n as usize)).load(Ordering::Acquire);
1119                core::sync::atomic::fence(Ordering::Acquire);
1120                if check_stable(seq) {
1121                    return val;
1122                }
1123            }
1124        }
1125    }
1126
1127    #[no_mangle]
1128    pub extern "C" fn rightOf(n: u32) -> u32 {
1129        unsafe {
1130            ensure_arena();
1131            loop {
1132                let (seq, h) = enter_stable();
1133                if n >= h.capacity {
1134                    return 0;
1135                }
1136                let val = (*right_ptr().add(n as usize)).load(Ordering::Acquire);
1137                core::sync::atomic::fence(Ordering::Acquire);
1138                if check_stable(seq) {
1139                    return val;
1140                }
1141            }
1142        }
1143    }
1144
1145    // -------------------------------------------------------------------------
1146    // Allocation (speculative, lock-free)
1147    // -------------------------------------------------------------------------
1148    #[no_mangle]
1149    pub extern "C" fn allocTerminal(sym: u32) -> u32 {
1150        unsafe {
1151            ensure_arena();
1152            let h = header();
1153
1154            if sym < TERM_CACHE_LEN as u32 {
1155                let cached = (*term_cache_ptr().add(sym as usize)).load(Ordering::Acquire);
1156                if cached != EMPTY {
1157                    return cached;
1158                }
1159            }
1160
1161            loop {
1162                wait_resize_stable();
1163                let id = h.top.fetch_add(1, Ordering::AcqRel);
1164                if id >= h.capacity {
1165                    grow();
1166                    continue;
1167                }
1168
1169                (*kind_ptr().add(id as usize)).store(ArenaKind::Terminal as u8, Ordering::Release);
1170                (*sym_ptr().add(id as usize)).store(sym as u8, Ordering::Release);
1171                (*hash_ptr().add(id as usize)).store(sym, Ordering::Release);
1172
1173                if sym < TERM_CACHE_LEN as u32 {
1174                    (*term_cache_ptr().add(sym as usize)).store(id, Ordering::Release);
1175                }
1176                return id;
1177            }
1178        }
1179    }
1180
1181    #[no_mangle]
1182    /// Allocate a NonTerm (application) node with hash-consing optimization.
1183    ///
1184    /// This is the core optimization that enables structural sharing in the arena.
1185    /// Instead of always creating new nodes, it checks if an identical `(l r)` pair
1186    /// already exists and returns the existing node ID if found.
1187    ///
1188    /// ## Hash-Consing Algorithm
1189    ///
1190    /// 1. **Compute hash**: `mix(hash(l), hash(r))` using avalanche hashing
1191    /// 2. **Lookup in hash table**: Check bucket for existing `(l,r)` pairs
1192    /// 3. **Return existing**: If found, return the shared node ID
1193    /// 4. **Allocate new**: If not found, create new node and add to hash table
1194    ///
1195    /// ## Memory Efficiency
1196    ///
1197    /// - **DAG representation**: Common subexpressions are shared
1198    /// - **Reduced allocations**: Avoids duplicate node creation
1199    /// - **Cache-friendly**: Hash table enables O(1) lookups
1200    ///
1201    /// ## Thread Safety
1202    ///
1203    /// - Uses seqlock to handle concurrent resizing
1204    /// - Atomic operations for hash table consistency
1205    /// - CAS-based insertion prevents race conditions
1206    pub extern "C" fn allocCons(l: u32, r: u32) -> u32 {
1207        unsafe {
1208            ensure_arena();
1209
1210            // Compute hash value (doesn't depend on header state)
1211            let hl = loop {
1212                let (seq, h) = enter_stable();
1213                if l >= h.capacity {
1214                    if !check_stable(seq) { continue; }
1215                    return EMPTY; // Invalid left node
1216                }
1217                let val = (*hash_ptr().add(l as usize)).load(Ordering::Acquire);
1218                core::sync::atomic::fence(Ordering::Acquire);
1219                if check_stable(seq) {
1220                    break val;
1221                }
1222            };
1223            let hr = loop {
1224                let (seq, h) = enter_stable();
1225                if r >= h.capacity {
1226                    if !check_stable(seq) { continue; }
1227                    return EMPTY; // Invalid right node
1228                }
1229                let val = (*hash_ptr().add(r as usize)).load(Ordering::Acquire);
1230                core::sync::atomic::fence(Ordering::Acquire);
1231                if check_stable(seq) {
1232                    break val;
1233                }
1234            };
1235            let hval = mix(hl, hr);
1236
1237            // Retry loop for stable reads
1238            let _ = loop {
1239                let (seq, h) = enter_stable(); // Wait if resizing
1240                let mask = h.bucket_mask;
1241                let bucket_idx = (hval & mask) as usize;
1242
1243                // Validate bucket index is safe before dereferencing
1244                if bucket_idx >= h.capacity as usize {
1245                    // Capacity changed mid-read? Retry.
1246                    if !check_stable(seq) { continue; }
1247                    // Should be unreachable if logic is correct, but safe fallback
1248                    continue;
1249                }
1250
1251                let buckets = buckets_ptr();
1252                let next = next_ptr();
1253
1254                let mut cur = (*buckets.add(bucket_idx)).load(Ordering::Acquire);
1255                let mut found = EMPTY;
1256
1257                while cur != EMPTY {
1258                    // Bounds check for safety
1259                    if cur >= h.capacity { break; }
1260
1261                    let k = (*kind_ptr().add(cur as usize)).load(Ordering::Acquire);
1262                    if k == ArenaKind::NonTerm as u8 {
1263                        let ch = (*hash_ptr().add(cur as usize)).load(Ordering::Acquire);
1264                        if ch == hval {
1265                            let cl = (*left_ptr().add(cur as usize)).load(Ordering::Acquire);
1266                            let cr = (*right_ptr().add(cur as usize)).load(Ordering::Acquire);
1267                            if cl == l && cr == r {
1268                                found = cur;
1269                                break;
1270                            }
1271                        }
1272                    }
1273                    cur = (*next.add(cur as usize)).load(Ordering::Acquire);
1274                }
1275
1276                // If we found it, verify the lock is still valid.
1277                // If lock changed, our read might be garbage -> Retry.
1278                if check_stable(seq) {
1279                    if found != EMPTY {
1280                        return found;
1281                    }
1282                    // If not found, we break to the allocation logic with bucket index
1283                    break bucket_idx;
1284                }
1285                // If lock changed, retry the whole lookup
1286            };
1287
1288            // allocate new
1289            loop {
1290                wait_resize_stable();
1291
1292                // Reload pointers/mask in case they changed during wait_resize_stable()
1293                let h = header();
1294                let buckets = buckets_ptr();
1295                let current_mask = h.bucket_mask;
1296                let b = (hval & current_mask) as usize;
1297
1298                let id = h.top.fetch_add(1, Ordering::AcqRel);
1299                if id >= h.capacity {
1300                    grow();
1301                    continue;
1302                }
1303
1304                (*kind_ptr().add(id as usize)).store(ArenaKind::NonTerm as u8, Ordering::Release);
1305                (*left_ptr().add(id as usize)).store(l, Ordering::Release);
1306                (*right_ptr().add(id as usize)).store(r, Ordering::Release);
1307                (*hash_ptr().add(id as usize)).store(hval, Ordering::Release);
1308
1309                // insert into bucket with CAS; if we lose, drop id as hole (kind=0)
1310                let next = next_ptr();
1311                loop {
1312                    let head = (*buckets.add(b)).load(Ordering::Acquire);
1313                    (*next.add(id as usize)).store(head, Ordering::Relaxed);
1314                    // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1315                    if (*buckets.add(b))
1316                        .compare_exchange(head, id, Ordering::Release, Ordering::Relaxed)
1317                        .is_ok()
1318                    {
1319                        return id;
1320                    }
1321                    // someone inserted; check if it matches now
1322                    let mut cur2 = (*buckets.add(b)).load(Ordering::Acquire);
1323                    while cur2 != EMPTY {
1324                        let ck2 = (*kind_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1325                        if ck2 != ArenaKind::NonTerm as u8 {
1326                            cur2 = (*next.add(cur2 as usize)).load(Ordering::Acquire);
1327                            continue;
1328                        }
1329                        let ch2 = (*hash_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1330                        if ch2 == hval {
1331                            let cl2 = (*left_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1332                            let cr2 = (*right_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1333                            if cl2 == l && cr2 == r {
1334                                // mark hole
1335                                (*kind_ptr().add(id as usize)).store(0, Ordering::Release);
1336                                return cur2;
1337                            }
1338                        }
1339                        cur2 = (*next.add(cur2 as usize)).load(Ordering::Acquire);
1340                    }
1341                }
1342            }
1343        }
1344    }
1345
1346    // Generic allocation (non hash-consed; NOT inserted into buckets).
1347    // This is used for reducer continuations/suspensions.
1348    #[inline(always)]
1349    unsafe fn alloc_generic(kind: u8, sym: u8, left: u32, right: u32, hash: u32) -> u32 {
1350        ensure_arena();
1351        let h = header();
1352        loop {
1353            wait_resize_stable();
1354            let id = h.top.fetch_add(1, Ordering::AcqRel);
1355            if id >= h.capacity {
1356                grow();
1357                continue;
1358            }
1359            // Publish payload, then kind last.
1360            (*sym_ptr().add(id as usize)).store(sym, Ordering::Release);
1361            (*left_ptr().add(id as usize)).store(left, Ordering::Release);
1362            (*right_ptr().add(id as usize)).store(right, Ordering::Release);
1363            (*hash_ptr().add(id as usize)).store(hash, Ordering::Release);
1364            (*kind_ptr().add(id as usize)).store(kind, Ordering::Release);
1365            return id;
1366        }
1367    }
1368
1369    // -------------------------------------------------------------------------
1370    // Resize (stop-the-world via odd/even seq)
1371    // -------------------------------------------------------------------------
1372    fn grow() {
1373        unsafe {
1374            let h = header_mut();
1375            let mut expected = h.resize_seq.load(Ordering::Acquire);
1376            loop {
1377                if expected & 1 == 1 {
1378                    core::hint::spin_loop();
1379                    expected = h.resize_seq.load(Ordering::Acquire);
1380                    continue;
1381                }
1382                // Acquire exclusive "writer" by flipping even->odd with CAS.
1383                // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1384                if h.resize_seq.compare_exchange(expected, expected | 1, Ordering::AcqRel, Ordering::Acquire).is_ok() {
1385                    break;
1386                }
1387                expected = h.resize_seq.load(Ordering::Acquire);
1388            }
1389
1390            let old_cap = h.capacity;
1391            // Capture OLD offsets before we overwrite the header with new ones.
1392            let old_offset_kind = h.offset_kind;
1393            let old_offset_sym = h.offset_sym;
1394            let old_offset_left = h.offset_left_id;
1395            let old_offset_right = h.offset_right_id;
1396            let old_offset_hash = h.offset_hash32;
1397            let old_offset_next = h.offset_next_idx;
1398            let old_offset_term_cache = h.offset_term_cache;
1399            let old_top = h.top.load(Ordering::Acquire);
1400
1401            if old_cap >= MAX_CAP {
1402                // Poison so other threads trap instead of spinning on odd resize_seq.
1403                h.resize_seq.store(POISON_SEQ, Ordering::Release);
1404                core::arch::wasm32::unreachable();
1405            }
1406            let new_cap = (old_cap * 2).min(MAX_CAP);
1407
1408            let layout = SabHeader::layout(new_cap);
1409            let needed_bytes = ARENA_BASE_ADDR as usize + layout.total_size as usize;
1410            let current_bytes = wasm32::memory_size(0) * WASM_PAGE_SIZE;
1411            if needed_bytes > current_bytes {
1412                let extra = needed_bytes - current_bytes;
1413                let pages = (extra + WASM_PAGE_SIZE - 1) / WASM_PAGE_SIZE;
1414                let res = wasm32::memory_grow(0, pages);
1415                if res == usize::MAX {
1416                    // OOM (or denied grow). Poison so other threads trap instead of spinning.
1417                    h.resize_seq.store(POISON_SEQ, Ordering::Release);
1418                    core::arch::wasm32::unreachable();
1419                }
1420            }
1421
1422            // Update header (rings untouched)
1423            h.capacity = new_cap;
1424            h.bucket_mask = new_cap - 1;
1425            h.offset_sq = layout.offset_sq;
1426            h.offset_cq = layout.offset_cq;
1427            h.offset_stdin = layout.offset_stdin;
1428            h.offset_stdout = layout.offset_stdout;
1429            h.offset_stdin_wait = layout.offset_stdin_wait;
1430            h.offset_kind = layout.offset_kind;
1431            h.offset_sym = layout.offset_sym;
1432            h.offset_left_id = layout.offset_left_id;
1433            h.offset_right_id = layout.offset_right_id;
1434            h.offset_hash32 = layout.offset_hash32;
1435            h.offset_next_idx = layout.offset_next_idx;
1436            h.offset_buckets = layout.offset_buckets;
1437            h.offset_term_cache = layout.offset_term_cache;
1438
1439            // Preserve top
1440            let count = old_top.min(old_cap);
1441            h.top.store(count, Ordering::Release);
1442
1443            // IMPORTANT: reverse-copy order is mandatory.
1444            // The layout is packed contiguously, and new_cap > old_cap means new regions can overlap
1445            // old regions during migration. Copy from the end back to the front.
1446
1447            // Term cache
1448            core::ptr::copy(
1449                (ARENA_BASE_ADDR + old_offset_term_cache) as *const u8,
1450                (ARENA_BASE_ADDR + h.offset_term_cache) as *mut u8,
1451                (TERM_CACHE_LEN * 4) as usize,
1452            );
1453
1454            // Next (u32)
1455            core::ptr::copy(
1456                (ARENA_BASE_ADDR + old_offset_next) as *const u8,
1457                (ARENA_BASE_ADDR + h.offset_next_idx) as *mut u8,
1458                (count as usize) * 4,
1459            );
1460            if new_cap > old_cap {
1461                zero_region(
1462                    h.offset_next_idx + old_cap * 4,
1463                    (new_cap - old_cap) * 4,
1464                );
1465            }
1466
1467            // Hash (u32)
1468            core::ptr::copy(
1469                (ARENA_BASE_ADDR + old_offset_hash) as *const u8,
1470                (ARENA_BASE_ADDR + h.offset_hash32) as *mut u8,
1471                (count as usize) * 4,
1472            );
1473            if new_cap > old_cap {
1474                zero_region(
1475                    h.offset_hash32 + old_cap * 4,
1476                    (new_cap - old_cap) * 4,
1477                );
1478            }
1479
1480            // Right (u32)
1481            core::ptr::copy(
1482                (ARENA_BASE_ADDR + old_offset_right) as *const u8,
1483                (ARENA_BASE_ADDR + h.offset_right_id) as *mut u8,
1484                (count as usize) * 4,
1485            );
1486            if new_cap > old_cap {
1487                zero_region(
1488                    h.offset_right_id + old_cap * 4,
1489                    (new_cap - old_cap) * 4,
1490                );
1491            }
1492
1493            // Left (u32)
1494            core::ptr::copy(
1495                (ARENA_BASE_ADDR + old_offset_left) as *const u8,
1496                (ARENA_BASE_ADDR + h.offset_left_id) as *mut u8,
1497                (count as usize) * 4,
1498            );
1499            if new_cap > old_cap {
1500                zero_region(
1501                    h.offset_left_id + old_cap * 4,
1502                    (new_cap - old_cap) * 4,
1503                );
1504            }
1505
1506            // Sym (u8)
1507            core::ptr::copy(
1508                (ARENA_BASE_ADDR + old_offset_sym) as *const u8,
1509                (ARENA_BASE_ADDR + h.offset_sym) as *mut u8,
1510                count as usize,
1511            );
1512            if new_cap > old_cap {
1513                zero_region(
1514                    h.offset_sym + old_cap,
1515                    new_cap - old_cap,
1516                );
1517            }
1518
1519            // Kind (u8)
1520            core::ptr::copy(
1521                (ARENA_BASE_ADDR + old_offset_kind) as *const u8,
1522                (ARENA_BASE_ADDR + h.offset_kind) as *mut u8,
1523                count as usize,
1524            );
1525            if new_cap > old_cap {
1526                zero_region(
1527                    h.offset_kind + old_cap,
1528                    new_cap - old_cap,
1529                );
1530            }
1531
1532            // Rebuild buckets (hash-consing table) for NonTerm only.
1533            let buckets = buckets_ptr();
1534            let next = next_ptr();
1535            for i in 0..new_cap as usize {
1536                (*buckets.add(i)).store(EMPTY, Ordering::Release);
1537            }
1538            for i in 0..count {
1539                let k = (*kind_ptr().add(i as usize)).load(Ordering::Acquire);
1540                if k != ArenaKind::NonTerm as u8 {
1541                    continue;
1542                }
1543                let hv = (*hash_ptr().add(i as usize)).load(Ordering::Acquire);
1544                let b = (hv & h.bucket_mask) as usize;
1545                loop {
1546                    let head = (*buckets.add(b)).load(Ordering::Acquire);
1547                    (*next.add(i as usize)).store(head, Ordering::Relaxed);
1548                    // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1549                    if (*buckets.add(b))
1550                        .compare_exchange(head, i, Ordering::Release, Ordering::Relaxed)
1551                        .is_ok()
1552                    {
1553                        break;
1554                    }
1555                }
1556            }
1557
1558            h.resize_seq.fetch_add(1, Ordering::Release);
1559        }
1560    }
1561
1562    // -------------------------------------------------------------------------
1563    // Reducer
1564    // -------------------------------------------------------------------------
1565    // Continuation frame: kind=Continuation, sym=stage, left=parent_stack, right=parent_node
1566    const STAGE_LEFT: u8 = 0;
1567    const STAGE_RIGHT: u8 = 1;
1568
1569    // Suspension: kind=Suspension, sym=mode (0=descend, 1=return), left=curr, right=stack, hash=remaining_steps
1570    const MODE_DESCEND: u8 = 0;
1571    const MODE_RETURN: u8 = 1;
1572    const MODE_IO_WAIT: u8 = 2;
1573
1574    #[inline(always)]
1575    unsafe fn alloc_continuation(parent: u32, target: u32, stage: u8) -> u32 {
1576        alloc_generic(ArenaKind::Continuation as u8, stage, parent, target, 0)
1577    }
1578
1579    #[inline(always)]
1580    unsafe fn alloc_suspension(curr: u32, stack: u32, mode: u8, remaining_steps: u32) -> u32 {
1581        alloc_generic(ArenaKind::Suspension as u8, mode, curr, stack, remaining_steps)
1582    }
1583
1584    #[inline(always)]
1585    unsafe fn alloc_num(value: u32) -> u32 {
1586        alloc_generic(ArenaKind::Terminal as u8, ArenaSym::Num as u8, 0, 0, value)
1587    }
1588
1589    #[inline(always)]
1590    unsafe fn alloc_inc() -> u32 {
1591        allocTerminal(ArenaSym::Inc as u32)
1592    }
1593
1594    #[inline(always)]
1595    unsafe fn alloc_church_zero() -> u32 {
1596        let k = allocTerminal(ArenaSym::K as u32);
1597        let i = allocTerminal(ArenaSym::I as u32);
1598        allocCons(k, i)
1599    }
1600
1601    #[inline(always)]
1602    unsafe fn alloc_church_succ() -> u32 {
1603        let s = allocTerminal(ArenaSym::S as u32);
1604        let k = allocTerminal(ArenaSym::K as u32);
1605        let ks = allocCons(k, s);
1606        let b = allocCons(allocCons(s, ks), k); // S(KS)K
1607        allocCons(s, b) // S B
1608    }
1609
1610    #[inline(always)]
1611    unsafe fn alloc_church_byte(value: u8) -> u32 {
1612        let mut cur = alloc_church_zero();
1613        if value == 0 {
1614            return cur;
1615        }
1616        let succ = alloc_church_succ();
1617        for _ in 0..value {
1618            cur = allocCons(succ, cur);
1619        }
1620        cur
1621    }
1622
1623    #[inline(always)]
1624    unsafe fn decode_church_u32(expr: u32) -> u32 {
1625        let inc = alloc_inc();
1626        let zero = alloc_num(0);
1627        let app = allocCons(allocCons(expr, inc), zero);
1628        let reduced = reduce(app, 1_000_000);
1629        if kindOf(reduced) == ArenaKind::Terminal as u32
1630            && symOf(reduced) == ArenaSym::Num as u32
1631        {
1632            hash_of_internal(reduced)
1633        } else {
1634            0
1635        }
1636    }
1637
1638    // --- OPTIMIZATION: Update existing node (Slot Reuse) ---
1639    #[inline(always)]
1640    unsafe fn update_continuation(id: u32, parent: u32, target: u32, stage: u8) {
1641        (*left_ptr().add(id as usize)).store(parent, Ordering::Relaxed);
1642        (*right_ptr().add(id as usize)).store(target, Ordering::Relaxed);
1643        (*sym_ptr().add(id as usize)).store(stage, Ordering::Relaxed);
1644        // Ensure it is marked as Continuation (in case we recycled a Suspension)
1645        (*kind_ptr().add(id as usize)).store(ArenaKind::Continuation as u8, Ordering::Release);
1646    }
1647
1648    #[inline(always)]
1649    fn hash_of_internal(n: u32) -> u32 {
1650        unsafe {
1651            ensure_arena();
1652            loop {
1653                let (seq, h) = enter_stable();
1654                if n >= h.capacity {
1655                    return 0;
1656                }
1657                let val = (*hash_ptr().add(n as usize)).load(Ordering::Acquire);
1658                core::sync::atomic::fence(Ordering::Acquire);
1659                if check_stable(seq) {
1660                    return val;
1661                }
1662            }
1663        }
1664    }
1665
1666    /// Unwind the continuation stack to reconstruct the full expression tree.
1667    ///
1668    /// When the step limit is exhausted, the worker may be deep in the expression tree
1669    /// with a non-empty stack. This function walks up the stack, rebuilding parent nodes
1670    /// as necessary, to return the root of the expression rather than a sub-expression.
1671    #[inline(always)]
1672    unsafe fn unwind_to_root(mut curr: u32, mut stack: u32) -> u32 {
1673        while stack != EMPTY {
1674            let recycled = stack;
1675            stack = leftOf(recycled);
1676            let parent_node = rightOf(recycled);
1677            let stage = symOf(recycled) as u8;
1678
1679            if stage == STAGE_LEFT {
1680                let orig_left = leftOf(parent_node);
1681                if curr != orig_left {
1682                    // Left child changed, must allocate new parent
1683                    curr = allocCons(curr, rightOf(parent_node));
1684                } else {
1685                    // Left child didn't change, reuse existing parent
1686                    curr = parent_node;
1687                }
1688            } else {
1689                // STAGE_RIGHT
1690                let orig_right = rightOf(parent_node);
1691                if curr != orig_right {
1692                    // Right child changed, must allocate new parent
1693                    curr = allocCons(leftOf(parent_node), curr);
1694                } else {
1695                    curr = parent_node;
1696                }
1697            }
1698        }
1699        curr
1700    }
1701
1702    enum StepResult {
1703        Done(u32),
1704        Yield(u32), // Suspension node id
1705    }
1706
1707    /// Perform one iterative reduction step with preemptive yielding.
1708    ///
1709    /// This implements the core SKI reduction algorithm using an iterative approach
1710    /// with explicit stack management instead of recursion. It can yield mid-step
1711    /// when traversal gas is exhausted, enabling cooperative multitasking.
1712    ///
1713    /// ## Iterative Reduction Algorithm
1714    ///
1715    /// Instead of recursive function calls, uses an explicit stack of Continuation nodes:
1716    ///
1717    /// - **MODE_DESCEND**: Traverse down the expression tree looking for redexes
1718    /// - **MODE_RETURN**: Return up the tree after reducing a subexpression
1719    /// - **Continuation frames**: Represent suspended stack frames as arena nodes
1720    ///
1721    /// ## Gas-Based Preemption
1722    ///
1723    /// - **Traversal gas**: Limits AST traversal depth per step
1724    /// - **Yield on exhaustion**: Returns Suspension node for later resumption
1725    /// - **Cooperative multitasking**: Prevents worker starvation in parallel evaluation
1726    ///
1727    /// ## Step Counting
1728    ///
1729    /// - **Accurate counting**: Decrements `remaining_steps` immediately when each reduction occurs
1730    /// - **Multiple reductions per call**: Can perform multiple reductions in a single call
1731    /// - **Deterministic**: Every reduction is counted exactly once, regardless of batching
1732    ///
1733    /// ## Node Recycling Optimization
1734    ///
1735    /// - **Free node reuse**: Dead continuation frames are recycled immediately
1736    /// - **Memory efficiency**: Reduces allocation pressure during reduction
1737    /// - **Cache locality**: Reuses recently freed nodes
1738    ///
1739    /// ## Parameters
1740    ///
1741    /// - `curr`: Current expression node being evaluated
1742    /// - `stack`: Stack of continuation frames (linked list of nodes)
1743    /// - `mode`: Current evaluation mode (descend/return)
1744    /// - `gas`: Remaining traversal gas (mutable, decremented during execution)
1745    /// - `remaining_steps`: Mutable reference to reduction steps remaining (decremented on each reduction)
1746    /// - `free_node`: Recyclable node ID from previous operations
1747    ///
1748    /// ## Returns
1749    ///
1750    /// - `StepResult::Done(node)`: Reduction completed, `node` is the result
1751    /// - `StepResult::Yield(susp_id)`: Yielded mid-step, `susp_id` is Suspension node
1752    unsafe fn step_iterative(mut curr: u32, mut stack: u32, mut mode: u8, gas: &mut u32, remaining_steps: &mut u32, mut free_node: u32) -> StepResult {
1753        loop {
1754            // Gas exhaustion yield
1755            if *gas == 0 {
1756                // If we have a free_node we didn't use, it's just a hole now.
1757                return StepResult::Yield(alloc_suspension(curr, stack, mode, *remaining_steps));
1758            }
1759            *gas -= 1;
1760
1761            if mode == MODE_RETURN {
1762                if stack == EMPTY {
1763                    return StepResult::Done(curr);
1764                }
1765
1766                // POP FRAME
1767                let recycled = stack;         // <--- This frame is now dead/recyclable
1768                stack = leftOf(recycled);     // Parent
1769                let parent_node = rightOf(recycled);
1770                let stage = symOf(recycled) as u8;
1771
1772                if stage == STAGE_LEFT {
1773                    let orig_left = leftOf(parent_node);
1774                    if curr != orig_left {
1775                        // Rebuild parent
1776                        curr = allocCons(curr, rightOf(parent_node));
1777                        // 'recycled' is still free, we are returning up.
1778                        free_node = recycled; // Keep for next push or pop
1779                        mode = MODE_RETURN;
1780                        continue;
1781                    }
1782                    // Left stable, DESCEND RIGHT
1783                    // Reuse 'recycled' as the new Continuation frame!
1784                    update_continuation(recycled, stack, parent_node, STAGE_RIGHT);
1785                    stack = recycled;
1786                    mode = MODE_DESCEND;
1787                    curr = rightOf(parent_node);
1788                    continue;
1789                } else {
1790                    let orig_right = rightOf(parent_node);
1791                    if curr != orig_right {
1792                        curr = allocCons(leftOf(parent_node), curr);
1793                        free_node = recycled;
1794                        mode = MODE_RETURN;
1795                        continue;
1796                    }
1797                    // Both stable
1798                    curr = parent_node;
1799                    free_node = recycled;
1800                    mode = MODE_RETURN;
1801                    continue;
1802                }
1803            }
1804
1805            // MODE_DESCEND
1806            let k = kindOf(curr);
1807            if k != ArenaKind::NonTerm as u32 {
1808                mode = MODE_RETURN;
1809                continue;
1810            }
1811
1812            // NonTerm: check for I/K/S redex at this node.
1813            let left = leftOf(curr);
1814            let right = rightOf(curr);
1815
1816            // [REDUCTION LOGIC START] -----------------------------------------
1817
1818            // I x -> x
1819            if kindOf(left) == ArenaKind::Terminal as u32 && symOf(left) == ArenaSym::I as u32 {
1820                if *remaining_steps == 0 {
1821                    return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1822                }
1823                *remaining_steps = remaining_steps.saturating_sub(1);
1824
1825                curr = right;
1826                mode = MODE_RETURN;
1827
1828                // Yield IMMEDIATELY if limit hit zero.
1829                // Don't waste gas traversing to the next redex.
1830                if *remaining_steps == 0 {
1831                    return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1832                }
1833                continue;
1834            }
1835
1836            // readOne k -> k (Church byte)
1837            if kindOf(left) == ArenaKind::Terminal as u32
1838                && symOf(left) == ArenaSym::ReadOne as u32
1839            {
1840                if let Some(byte) = stdin_ring().try_dequeue() {
1841                    if *remaining_steps == 0 {
1842                        return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1843                    }
1844                    *remaining_steps = remaining_steps.saturating_sub(1);
1845
1846                    let numeral = alloc_church_byte(byte);
1847                    curr = allocCons(right, numeral);
1848                    mode = MODE_RETURN;
1849
1850                    if *remaining_steps == 0 {
1851                        return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1852                    }
1853                    continue;
1854                }
1855
1856                let susp_id = alloc_suspension(curr, stack, MODE_IO_WAIT, *remaining_steps);
1857                stdin_wait_ring().enqueue_blocking(susp_id);
1858                return StepResult::Yield(susp_id);
1859            }
1860
1861            // writeOne n -> n (enqueue byte)
1862            if kindOf(left) == ArenaKind::Terminal as u32
1863                && symOf(left) == ArenaSym::WriteOne as u32
1864            {
1865                if *remaining_steps == 0 {
1866                    return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1867                }
1868                *remaining_steps = remaining_steps.saturating_sub(1);
1869
1870                let value = decode_church_u32(right);
1871                let byte = (value & 0xff) as u8;
1872                stdout_ring().enqueue_blocking(byte);
1873
1874                curr = right;
1875                mode = MODE_RETURN;
1876
1877                if *remaining_steps == 0 {
1878                    return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1879                }
1880                continue;
1881            }
1882
1883            // Inc (Num n) -> Num (n + 1) [internal Church decoding]
1884            if kindOf(left) == ArenaKind::Terminal as u32
1885                && symOf(left) == ArenaSym::Inc as u32
1886                && kindOf(right) == ArenaKind::Terminal as u32
1887                && symOf(right) == ArenaSym::Num as u32
1888            {
1889                if *remaining_steps == 0 {
1890                    return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1891                }
1892                *remaining_steps = remaining_steps.saturating_sub(1);
1893
1894                let next_val = hash_of_internal(right).wrapping_add(1);
1895                curr = alloc_num(next_val);
1896                mode = MODE_RETURN;
1897
1898                if *remaining_steps == 0 {
1899                    return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1900                }
1901                continue;
1902            }
1903
1904            if kindOf(left) == ArenaKind::NonTerm as u32 {
1905                let ll = leftOf(left);
1906                // K x y -> x
1907                if kindOf(ll) == ArenaKind::Terminal as u32 && symOf(ll) == ArenaSym::K as u32 {
1908                    if *remaining_steps == 0 {
1909                        return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1910                    }
1911                    *remaining_steps = remaining_steps.saturating_sub(1);
1912
1913                    curr = rightOf(left);
1914                    mode = MODE_RETURN;
1915
1916                    // Yield IMMEDIATELY
1917                    if *remaining_steps == 0 {
1918                        return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1919                    }
1920                    continue;
1921                }
1922                // S x y z -> x z (y z)
1923                if kindOf(ll) == ArenaKind::NonTerm as u32 {
1924                    let lll = leftOf(ll);
1925                    if kindOf(lll) == ArenaKind::Terminal as u32 && symOf(lll) == ArenaSym::S as u32 {
1926                        if *remaining_steps == 0 {
1927                            return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1928                        }
1929                        // Use saturating_sub for consistency
1930                        *remaining_steps = remaining_steps.saturating_sub(1);
1931
1932                        let x = rightOf(ll);
1933                        let y = rightOf(left);
1934                        let z = right;
1935                        let xz = allocCons(x, z);
1936                        let yz = allocCons(y, z);
1937                        curr = allocCons(xz, yz);
1938                        mode = MODE_RETURN;
1939
1940                        // Yield IMMEDIATELY
1941                        if *remaining_steps == 0 {
1942                            return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1943                        }
1944                        continue;
1945                    }
1946                }
1947            }
1948
1949            // [REDUCTION LOGIC END] -------------------------------------------
1950
1951            // No redex: PUSH frame to descend left
1952            if free_node != EMPTY {
1953                update_continuation(free_node, stack, curr, STAGE_LEFT);
1954                stack = free_node;
1955                free_node = EMPTY;
1956            } else {
1957                stack = alloc_continuation(stack, curr, STAGE_LEFT);
1958            }
1959            curr = left;
1960            mode = MODE_DESCEND;
1961        }
1962    }
1963
1964    fn step_internal(expr: u32) -> u32 {
1965        unsafe {
1966            let mut gas = u32::MAX;
1967            let mut steps = u32::MAX; // Dummy - not used for single-step calls
1968            match step_iterative(expr, EMPTY, MODE_DESCEND, &mut gas, &mut steps, EMPTY) {
1969                StepResult::Done(x) => x,
1970                StepResult::Yield(_) => expr, // unreachable with u32::MAX gas; keep total safety
1971            }
1972        }
1973    }
1974
1975    #[no_mangle]
1976    pub extern "C" fn arenaKernelStep(expr: u32) -> u32 {
1977        unsafe { ensure_arena(); }
1978        step_internal(expr)
1979    }
1980
1981    #[no_mangle]
1982    pub extern "C" fn reduce(expr: u32, max: u32) -> u32 {
1983        unsafe { ensure_arena(); }
1984        let limit = if max == 0xffff_ffff { u32::MAX } else { max };
1985        let mut cur = expr;
1986        for _ in 0..limit {
1987            let next = step_internal(cur);
1988            if next == cur {
1989                break;
1990            }
1991            cur = next;
1992        }
1993        cur
1994    }
1995
1996    // -------------------------------------------------------------------------
1997    // Host/worker ring APIs
1998    // -------------------------------------------------------------------------
1999    #[no_mangle]
2000    pub extern "C" fn hostPull() -> i64 {
2001        unsafe {
2002            if ARENA_BASE_ADDR == 0 {
2003                return -1;
2004            }
2005            if let Some(cqe) = cq_ring().try_dequeue() {
2006                // Pack: high 32 bits = req_id, low 32 bits = node_id
2007                let packed: u64 = ((cqe.req_id as u64) << 32) | (cqe.node_id as u64);
2008                packed as i64
2009            } else {
2010                -1
2011            }
2012        }
2013    }
2014
2015    /// Debug/diagnostic helper: expose ring capacity to the host.
2016    ///
2017    /// Useful for tests and for sizing stress workloads without duplicating a JS-side constant.
2018    #[no_mangle]
2019    pub extern "C" fn debugGetRingEntries() -> u32 {
2020        RING_ENTRIES
2021    }
2022
2023    #[no_mangle]
2024    /// Submit work with explicit correlation id and max reduction steps.
2025    /// Returns: 0=ok, 1=full, 2=not connected.
2026    pub extern "C" fn hostSubmit(node_id: u32, req_id: u32, max_steps: u32) -> u32 {
2027        unsafe {
2028            if ARENA_BASE_ADDR == 0 {
2029                return 2;
2030            }
2031            if sq_ring().try_enqueue(Sqe { node_id, req_id, max_steps }) {
2032                0
2033            } else {
2034                1
2035            }
2036        }
2037    }
2038
2039    /// Main worker loop for parallel SKI evaluation.
2040    ///
2041    /// Processes evaluation requests from the submission queue and posts results
2042    /// to the completion queue. Implements preemptive multitasking using gas-based
2043    /// yielding to prevent worker starvation in concurrent workloads.
2044    ///
2045    /// ## Processing Model
2046    ///
2047    /// 1. **Dequeue request** from submission queue (blocking)
2048    /// 2. **Initialize evaluation state** from request or suspension
2049    /// 3. **Iterative reduction loop** with gas-based preemption
2050    /// 4. **Yield or complete** based on gas and step limits
2051    /// 5. **Enqueue result** to completion queue
2052    /// 6. **Repeat** for next request
2053    ///
2054    /// ## Preemption Strategy
2055    ///
2056    /// - **Batch gas limit**: `20,000` AST nodes per dequeue batch
2057    /// - **Cooperative yielding**: Returns control when gas exhausted
2058    /// - **Suspension resumption**: Can restart from any yield point
2059    /// - **Fair scheduling**: Prevents any single expression from monopolizing CPU
2060    ///
2061    /// ## Step Counting Semantics
2062    ///
2063    /// - **Step decrement**: Only occurs when `StepResult::Done` is returned (step completed)
2064    /// - **Yield behavior**: When `step_iterative` yields, it saves the UN-DECREMENTED `remaining_steps`
2065    ///   because the step did not finish. This is correct: the suspension captures the state before
2066    ///   the step completes, so resumption continues with the same budget.
2067    /// - **Deterministic counting**: Each reduction step is counted exactly once, regardless of
2068    ///   how many times the work is suspended and resumed.
2069    ///
2070    /// ## Memory Management
2071    ///
2072    /// - **Node recycling**: Reuses dead continuation frames immediately
2073    /// - **Shared arena**: All workers access the same memory space
2074    /// - **Atomic operations**: Safe concurrent access to shared structures
2075    ///
2076    /// ## Performance Optimizations
2077    ///
2078    /// - **Lock-free queues**: Minimal synchronization overhead
2079    /// - **Batched processing**: Amortizes dequeue overhead
2080    /// - **In-place updates**: Modifies nodes directly in shared memory
2081    /// - **Cache-friendly**: Sequential memory access patterns
2082    #[no_mangle]
2083    pub extern "C" fn workerLoop() {
2084        unsafe {
2085            let sq = sq_ring();
2086            let cq = cq_ring();
2087            // Per-dequeue traversal gas (preemption) to prevent starvation.
2088            // This is NOT the same as max reduction steps.
2089            let batch_gas: u32 = 20000;
2090            loop {
2091                let job = sq.dequeue_blocking();
2092                let mut curr = job.node_id;
2093                let mut stack = EMPTY;
2094                let mut mode = MODE_DESCEND;
2095                let mut remaining_steps: u32;
2096
2097                // If resuming a suspension, the suspension node ITSELF is now free to be reused!
2098                let mut free_node = EMPTY;
2099
2100                // Resume from suspension?
2101                if kindOf(curr) == ArenaKind::Suspension as u32 {
2102                    let susp = curr;
2103                    curr = leftOf(susp);
2104                    stack = rightOf(susp);
2105                    mode = symOf(susp) as u8;
2106                    if mode == MODE_IO_WAIT {
2107                        mode = MODE_DESCEND;
2108                    }
2109                    // Strict resume: Trust the suspension's counter 100%
2110                    // The suspension node's hash field contains the remaining_steps that were
2111                    // saved when the step yielded (before decrement, because the step didn't finish).
2112                    remaining_steps = hash_of_internal(susp);
2113                    free_node = susp; // <--- RECYCLE START
2114                } else {
2115                    // Set strict limit from the job packet
2116                    let limit = job.max_steps;
2117                    remaining_steps = if limit == 0xffff_ffff { u32::MAX } else { limit };
2118                }
2119
2120                loop {
2121                    // Check budget BEFORE trying a step
2122                    if remaining_steps == 0 {
2123                        // If we have a stack, we are deep in the tree.
2124                        // We must unwind to the root to return a valid full expression.
2125                        if stack != EMPTY {
2126                            curr = unwind_to_root(curr, stack);
2127                            stack = EMPTY;
2128                        }
2129
2130                        // Step budget exhausted; return (partial) result.
2131                        cq.enqueue_blocking(Cqe {
2132                            node_id: curr,
2133                            req_id: job.req_id,
2134                            _pad: 0,
2135                        });
2136                        break;
2137                    }
2138
2139                    let mut gas = batch_gas;
2140
2141                    match step_iterative(curr, stack, mode, &mut gas, &mut remaining_steps, free_node) {
2142                        StepResult::Yield(susp_id) => {
2143                            // Yielded (gas or limit). 'susp_id' has the correct remaining count
2144                            // because step_iterative updated 'remaining_steps' in place for each
2145                            // reduction that occurred before yielding.
2146                            cq.enqueue_blocking(Cqe {
2147                                node_id: susp_id,
2148                                req_id: job.req_id,
2149                                _pad: 0,
2150                            });
2151                            break;
2152                        }
2153                        StepResult::Done(next_node) => {
2154                            if next_node == curr {
2155                                // Fixpoint reached.
2156                                cq.enqueue_blocking(Cqe {
2157                                    node_id: curr,
2158                                    req_id: job.req_id,
2159                                    _pad: 0,
2160                                });
2161                                break;
2162                            }
2163
2164                            // Setup for next iteration
2165                            curr = next_node;
2166                            stack = EMPTY;
2167                            mode = MODE_DESCEND;
2168                            free_node = EMPTY;
2169
2170                            // Loop continues; 'remaining_steps' was updated by reference in step_iterative
2171                        }
2172                    }
2173                }
2174            }
2175        }
2176    }
2177
2178    // -------------------------------------------------------------------------
2179    // Debug helpers
2180    // -------------------------------------------------------------------------
2181    #[no_mangle]
2182    pub extern "C" fn debugGetArenaBaseAddr() -> u32 {
2183        unsafe { ARENA_BASE_ADDR }
2184    }
2185
2186    #[no_mangle]
2187    pub extern "C" fn getArenaMode() -> u32 {
2188        unsafe { ARENA_MODE }
2189    }
2190
2191    #[no_mangle]
2192    pub extern "C" fn debugCalculateArenaSize(capacity: u32) -> u32 {
2193        SabHeader::layout(capacity).total_size
2194    }
2195
2196    #[no_mangle]
2197    pub extern "C" fn debugLockState() -> u32 {
2198        // Full resize sequence word:
2199        // - even: stable
2200        // - odd:  resize in progress
2201        // - 0xFFFF_FFFF: poisoned (unrecoverable resize failure)
2202        unsafe { header().resize_seq.load(Ordering::Relaxed) }
2203    }
2204}
2205
2206// Re-export WASM symbols at crate root
2207#[cfg(target_arch = "wasm32")]
2208pub use wasm::*;