typed_ski/arena.rs
1//! # Arena-Based Memory Management for SKI Expressions (WASM, no_std)
2//!
3//! This module implements a high-performance, thread-safe arena allocator optimized
4//! for SKI combinator calculus evaluation. It supports both single-threaded heap-based
5//! allocation and multi-threaded SharedArrayBuffer (SAB) based allocation for Web Workers.
6//!
7//! ## Core Architecture
8//!
9//! ### Arena Node Types
10//!
11//! The arena uses four distinct node types to represent SKI expressions and evaluation state:
12//!
13//! - **`Terminal`**: Leaf nodes containing SKI combinators (S, K, I)
14//! - `kind = 1`, `sym` contains the combinator symbol
15//!
16//! - **`NonTerm`**: Application nodes (function application)
17//! - `kind = 2`, `left` and `right` point to subexpressions
18//! - Represents expressions of the form `(left right)`
19//!
20//! - **`Continuation`**: Stack frames for iterative reduction (optimization)
21//! - `kind = 3`, `sym` indicates reduction stage, `left`/`right` point to parent stack and node
22//! - Used by the iterative reduction algorithm to avoid recursion stack overflow
23//!
24//! - **`Suspension`**: Paused evaluation state for preemptive multitasking
25//! - `kind = 4`, `sym` contains evaluation mode, `left`/`right` contain current expression and stack
26//! - `hash` field stores remaining reduction steps for resumption
27//! - Enables cooperative multitasking across Web Workers
28//!
29//! ### Memory Layout (SharedArrayBuffer Mode)
30//!
31//! ```text
32//! +-------------------+ <-- ARENA_BASE_ADDR
33//! | SabHeader |
34//! | - Magic, offsets |
35//! | - Rings, capacity |
36//! | - Atomic counters |
37//! +-------------------+ <-- offset_sq (64-byte aligned)
38//! | Submission Ring |
39//! | (1024 entries) |
40//! +-------------------+ <-- offset_cq (64-byte aligned)
41//! | Completion Ring |
42//! | (1024 entries) |
43//! +-------------------+ <-- offset_stdin (64-byte aligned)
44//! | Stdin Ring (u8) |
45//! +-------------------+ <-- offset_stdout (64-byte aligned)
46//! | Stdout Ring (u8) |
47//! +-------------------+ <-- offset_stdin_wait (64-byte aligned)
48//! | Stdin Wait Ring |
49//! | (u32 node ids) |
50//! +-------------------+ <-- offset_kind (64-byte aligned)
51//! | Kind Array | u8[capacity] - Node types
52//! +-------------------+ <-- offset_sym
53//! | Sym Array | u8[capacity] - Symbols/modes
54//! +-------------------+ <-- offset_left_id (64-byte aligned)
55//! | Left Array | u32[capacity] - Left child pointers
56//! +-------------------+ <-- offset_right_id
57//! | Right Array | u32[capacity] - Right child pointers
58//! +-------------------+ <-- offset_hash32
59//! | Hash Array | u32[capacity] - Hash values for deduplication
60//! +-------------------+ <-- offset_next_idx
61//! | Next Array | u32[capacity] - Hash table collision chains
62//! +-------------------+ <-- offset_buckets (64-byte aligned)
63//! | Bucket Array | u32[capacity] - Hash table buckets
64//! +-------------------+ <-- offset_term_cache
65//! | Terminal Cache | u32[4] - Cached S/K/I node IDs
66//! +-------------------+ <-- End of arena
67//! ```
68//!
69//! ### Key Optimizations
70//!
71//! #### 1. Hash-Consing (Structural Sharing)
72//!
73//! - **[Hash consing](https://en.wikipedia.org/wiki/Hash_consing)** dedupes identical subexpressions to prevent redundant allocations
74//! - **Uses avalanche hash** of `(left, right)` pairs for fast lookups
75//! - **Collision resolution** via separate chaining in the bucket array
76//! - **Memory efficiency**: [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph) representation instead of tree
77//!
78//! #### 2. Iterative Reduction with Continuations
79//!
80//! - **Avoids recursion stack overflow** on deep expressions
81//! - **Continuation nodes** represent suspended stack frames
82//! - **Two-stage reduction**: left child first, then right child
83//! - **Memory reuse**: Dead continuation frames are recycled
84//!
85//! #### 3. Preemptive Multitasking (Suspensions)
86//!
87//! - **Cooperative yielding** when traversal gas is exhausted
88//! - **Suspension nodes** capture complete evaluation state
89//! - **Worker preemption** prevents starvation in parallel evaluation
90//! - **State resumption** via suspension node deserialization
91//!
92//! #### 4. Lock-Free Ring Buffers (io_uring Style)
93//!
94//! - **Submission Queue (SQ)**: Main thread → Worker communication
95//! - **Completion Queue (CQ)**: Worker → Main thread results
96//! - **Atomic operations** for thread-safe producer/consumer patterns
97//! - **Blocking waits** using WASM atomic wait/notify
98//!
99//! #### 5. Concurrent Resizing
100//!
101//! - **[Seqlock](https://en.wikipedia.org/wiki/Seqlock)-style synchronization** for arena growth
102//! - **Stop-the-world pauses** during resize operations
103//! - **Reverse-order copying** to handle overlapping memory regions
104//! - **Poisoning** on OOM to prevent infinite waits
105//!
106//! ### Performance Characteristics
107//!
108//! - **O(1) allocation** for new nodes (amortized)
109//! - **O(1) lookup** for existing subexpressions (hash-consing)
110//! - **O(depth) reduction** with iterative algorithm (no stack overflow)
111//! - **Lock-free communication** between main thread and workers
112//! - **Memory efficient**: ~16 bytes per node, structural sharing
113//!
114//! ### Thread Safety
115//!
116//! - **Atomic operations** for all shared state access
117//! - **Seqlock** for resize synchronization
118//! - **Separate arenas** per worker (no cross-worker sharing)
119//! - **Ring buffer fences** prevent data races in communication
120//!
121//! ### Integration with JavaScript
122//!
123//! - **[SharedArrayBuffer](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer)** enables cross-thread memory access
124//! - **Cross-origin isolation** required for SAB support
125//! - **Typed array views** provide efficient memory access from JS
126//! - **Ring polling** in main thread for completion handling
127
128#![allow(dead_code)]
129
130// =============================================================================
131// Public enums (shared with JS)
132// =============================================================================
133
134/// Arena node types supporting SKI evaluation and parallel execution.
135///
136/// See module-level documentation for detailed descriptions of each variant.
137#[repr(u8)]
138#[derive(Clone, Copy, PartialEq, Eq, Debug)]
139pub enum ArenaKind {
140 /// Terminal node containing an SKI combinator (S, K, or I).
141 /// - `sym`: The combinator symbol
142 /// - `left`/`right`: Unused (reserved)
143 Terminal = 1,
144
145 /// Application node representing function application `(left right)`.
146 /// - `left`: Function expression
147 /// - `right`: Argument expression
148 /// - `sym`: Unused (reserved)
149 NonTerm = 2,
150
151 /// Stack frame for iterative reduction algorithm.
152 /// Used to avoid recursion stack overflow on deep expressions.
153 /// - `sym`: Reduction stage (0=left child, 1=right child)
154 /// - `left`: Parent stack frame
155 /// - `right`: Parent expression node
156 Continuation = 3,
157
158 /// Paused evaluation state for preemptive multitasking.
159 /// Captures complete evaluation context for resumption.
160 /// - `sym`: Evaluation mode (0=descend, 1=return)
161 /// - `left`: Current expression being evaluated
162 /// - `right`: Evaluation stack
163 /// - `hash`: Remaining reduction steps
164 Suspension = 4,
165}
166
167/// SKI combinator symbols and evaluation state markers.
168///
169/// Used in both Terminal nodes (for combinators) and Continuation/Suspension
170/// nodes (for evaluation state).
171#[repr(u8)]
172#[derive(Clone, Copy, PartialEq, Eq, Debug)]
173pub enum ArenaSym {
174 /// S combinator: `S x y z → x z (y z)`
175 /// The most complex combinator, enabling arbitrary computation.
176 S = 1,
177
178 /// K combinator: `K x y → x`
179 /// The constant function, discards its second argument.
180 K = 2,
181
182 /// I combinator: `I x → x`
183 /// The identity function, returns its argument unchanged.
184 I = 3,
185
186 /// readOne terminal: consumes one byte from stdin, passes Church numeral to continuation.
187 ReadOne = 4,
188
189 /// writeOne terminal: writes one byte to stdout, returns its argument.
190 WriteOne = 5,
191
192 // Internal-only primitives for Church decoding (not surfaced in the language).
193 Inc = 6,
194 Num = 7,
195}
196
197// =============================================================================
198// Non-WASM stubs (keep TS type-checking happy)
199// =============================================================================
200#[cfg(not(target_arch = "wasm32"))]
201#[no_mangle]
202pub extern "C" fn initArena(_cap: u32) -> u32 { 0 }
203#[cfg(not(target_arch = "wasm32"))]
204#[no_mangle]
205pub extern "C" fn connectArena(_p: u32) -> u32 { 0 }
206#[cfg(not(target_arch = "wasm32"))]
207#[no_mangle]
208pub extern "C" fn allocTerminal(_s: u32) -> u32 { 0 }
209#[cfg(not(target_arch = "wasm32"))]
210#[no_mangle]
211pub extern "C" fn allocCons(_l: u32, _r: u32) -> u32 { 0 }
212#[cfg(not(target_arch = "wasm32"))]
213#[no_mangle]
214pub extern "C" fn kindOf(_n: u32) -> u32 { 0 }
215#[cfg(not(target_arch = "wasm32"))]
216#[no_mangle]
217pub extern "C" fn symOf(_n: u32) -> u32 { 0 }
218#[cfg(not(target_arch = "wasm32"))]
219#[no_mangle]
220pub extern "C" fn leftOf(_n: u32) -> u32 { 0 }
221#[cfg(not(target_arch = "wasm32"))]
222#[no_mangle]
223pub extern "C" fn rightOf(_n: u32) -> u32 { 0 }
224#[cfg(not(target_arch = "wasm32"))]
225#[no_mangle]
226pub extern "C" fn reset() {}
227#[cfg(not(target_arch = "wasm32"))]
228#[no_mangle]
229pub extern "C" fn arenaKernelStep(x: u32) -> u32 { x }
230#[cfg(not(target_arch = "wasm32"))]
231#[no_mangle]
232pub extern "C" fn hostSubmit(_id: u32) -> u32 { 1 }
233#[cfg(not(target_arch = "wasm32"))]
234#[no_mangle]
235pub extern "C" fn hostPull() -> u32 { u32::MAX }
236#[cfg(not(target_arch = "wasm32"))]
237#[no_mangle]
238pub extern "C" fn workerLoop() {}
239#[cfg(not(target_arch = "wasm32"))]
240#[no_mangle]
241pub extern "C" fn debugGetArenaBaseAddr() -> u32 { 0 }
242#[cfg(not(target_arch = "wasm32"))]
243#[no_mangle]
244pub extern "C" fn getArenaMode() -> u32 { 0 }
245#[cfg(not(target_arch = "wasm32"))]
246#[no_mangle]
247pub extern "C" fn debugCalculateArenaSize(_c: u32) -> u32 { 0 }
248#[cfg(not(target_arch = "wasm32"))]
249#[no_mangle]
250pub extern "C" fn debugLockState() -> u32 { 0xffff_ffff }
251
252// =============================================================================
253// WASM implementation
254// =============================================================================
255#[cfg(target_arch = "wasm32")]
256mod wasm {
257 use core::arch::wasm32;
258 use core::cell::UnsafeCell;
259 use core::sync::atomic::{AtomicU32, AtomicU8, Ordering};
260 use crate::{ArenaKind, ArenaSym};
261
262 // -------------------------------------------------------------------------
263 // Constants / helpers
264 // -------------------------------------------------------------------------
265 pub const EMPTY: u32 = 0xffff_ffff;
266 const ARENA_MAGIC: u32 = 0x534B_4941; // "SKIA"
267 const INITIAL_CAP: u32 = 1 << 20;
268 const MAX_CAP: u32 = 1 << 27;
269 const WASM_PAGE_SIZE: usize = 65536;
270 // Size of SQ/CQ rings (power of two).
271 //
272 // Larger rings reduce backpressure + atomic wait/notify churn when many workers
273 // produce results faster than the host can drain them (a common cause of "stuttering"
274 // worker timelines and main-thread saturation in profiles).
275 const RING_ENTRIES: u32 = 1 << 16; // 65536
276 const TERM_CACHE_LEN: usize = 6; // 0..=WriteOne
277
278 #[inline(always)]
279 const fn align64(x: u32) -> u32 {
280 (x + 63) & !63
281 }
282
283 // -------------------------------------------------------------------------
284 // Atomics + wait/notify
285 // -------------------------------------------------------------------------
286 // WASM atomics / CAS safety notes (applies to all compare_exchange* uses below)
287 // -------------------------------------------------------------------------
288 //
289 // This module relies heavily on CAS (compare_exchange / compare_exchange_weak),
290 // which compiles to a single atomic read-modify-write instruction in WebAssembly
291 // (e.g. `i32.atomic.rmw.cmpxchg`). That property prevents the classic "check-then-
292 // store" race where two threads both observe a value and both write the same
293 // update thinking they won; CAS has a single winner and forces losers to retry.
294 //
295 // ABA notes:
296 // - Many CAS patterns are vulnerable to ABA when the *same* value can reappear
297 // (e.g. counters wrapping or pointer reuse). In this codebase, ABA is either
298 // explicitly prevented (ring slot sequence numbers advance by whole cycles)
299 // or is practically irrelevant in context (e.g. seqlock-style counters would
300 // require billions of expensive operations to wrap during one read section).
301 //
302 // WASM platform particularities:
303 // - When wasm threads are enabled, the linear memory is backed by a
304 // `SharedArrayBuffer`, and atomic ops synchronize across Web Workers.
305 // Rust `Ordering::{Acquire,Release,AcqRel,SeqCst}` map onto WASM atomics/fences
306 // to provide the needed visibility guarantees.
307 // - `memory.grow` increases linear memory size but does not "move" addresses
308 // from the module's perspective; what changes is our chosen layout within
309 // that address space (offsets/capacity), guarded by atomics/seqlock.
310 //
311 // External references:
312 // - Seqlock concept: https://en.wikipedia.org/wiki/Seqlock
313 // - SharedArrayBuffer (required for wasm threads): https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer
314 // - WebAssembly features (threads/atomics): https://webassembly.org/features/
315 // - Rust atomic memory orderings: https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html
316 // - WebAssembly `memory.grow`: https://developer.mozilla.org/en-US/docs/WebAssembly/Reference/Memory/Grow
317 //
318 mod sys {
319 use super::*;
320 #[inline(always)]
321 pub fn wait32(ptr: &AtomicU32, expected: u32) {
322 unsafe {
323 let _ = wasm32::memory_atomic_wait32(
324 ptr as *const _ as *mut i32,
325 expected as i32,
326 -1,
327 );
328 }
329 }
330 #[inline(always)]
331 pub fn notify(ptr: &AtomicU32, count: u32) {
332 unsafe {
333 let _ = wasm32::memory_atomic_notify(ptr as *const _ as *mut i32, count);
334 }
335 }
336 }
337
338 // -------------------------------------------------------------------------
339 // Ring Buffer Types (io_uring Style)
340 // -------------------------------------------------------------------------
341
342 /// Submission Queue Entry: Main thread → Worker communication.
343 ///
344 /// Sent from main thread to worker to request evaluation of an expression.
345 /// Workers dequeue these and perform the actual reduction work.
346 #[repr(C)]
347 #[derive(Clone, Copy)]
348 pub struct Sqe {
349 /// Arena node ID of the expression to evaluate.
350 /// This is the root of the expression tree to reduce.
351 pub node_id: u32,
352
353 /// Unique request identifier for correlation.
354 /// Used to match completion queue entries back to the original request.
355 /// Must be unique across all outstanding requests.
356 pub req_id: u32,
357
358 /// Max reduction steps for this specific request.
359 /// Replaces previous _pad field. Each request carries its own immutable limit.
360 pub max_steps: u32,
361 }
362
363 /// Completion Queue Entry: Worker → Main thread results.
364 ///
365 /// Workers enqueue these when they complete (or yield) evaluation work.
366 /// The main thread polls the completion queue to retrieve results.
367 #[repr(C)]
368 #[derive(Clone, Copy)]
369 pub struct Cqe {
370 /// Result node ID or Suspension node ID (for yields).
371 /// - If reduction completed: The fully reduced expression node
372 /// - If evaluation yielded: A Suspension node for resumption
373 pub node_id: u32,
374
375 /// Request identifier matching the original Sqe.req_id.
376 /// Used by main thread to correlate completions with pending requests.
377 pub req_id: u32,
378
379 /// Padding to align Slot<Cqe> to 16 bytes (power of 2) for efficient indexing.
380 pub _pad: u32,
381 }
382
383 /// Ring buffer slot with sequence number for ABA prevention.
384 ///
385 /// Each slot contains a sequence number and payload. The sequence number
386 /// prevents ABA problems in concurrent CAS operations by ensuring that
387 /// a slot can only be reused after a full cycle of the ring.
388 ///
389 /// ## Sequence Number Protocol
390 ///
391 /// - **Initial state**: `seq = slot_index`
392 /// - **Producer stores**: `seq = tail + 1`, payload written, `seq = tail + 1` (final)
393 /// - **Consumer loads**: Checks `seq == head + 1`, payload read, `seq = head + mask + 1`
394 ///
395 /// This ensures proper ordering and prevents race conditions.
396 #[repr(C)]
397 struct Slot<T> {
398 /// Sequence number for synchronization and ABA prevention.
399 /// Must be atomically updated to maintain memory ordering.
400 seq: AtomicU32,
401
402 /// The actual data payload stored in this slot.
403 /// Access must be synchronized with sequence number updates.
404 payload: UnsafeCell<T>,
405 }
406
407 // SAFETY: Ring<T> ensures proper synchronization of Slot<T> access.
408 // The UnsafeCell is only accessed after proper sequence number validation.
409 unsafe impl<T> Sync for Slot<T> {}
410
411 /// Lock-free, wait-free ring buffer for inter-thread communication.
412 ///
413 /// Uses [io_uring](https://en.wikipedia.org/wiki/Io_uring)-style producer/consumer pattern with atomic operations.
414 /// Supports both non-blocking (try_*) and blocking (*_blocking) operations.
415 ///
416 /// ## Design Principles
417 ///
418 /// - **Single-producer, single-consumer** per ring instance
419 /// - **Power-of-two sizing** for efficient masking operations
420 /// - **Cache-line alignment** (64-byte) to prevent false sharing
421 /// - **Atomic wait/notify** for efficient blocking operations
422 /// - **Sequence numbers** prevent ABA problems in concurrent access
423 ///
424 /// ## Memory Layout
425 ///
426 /// ```text
427 /// +----------------------+ <-- 64-byte aligned
428 /// | head: AtomicU32 | Consumer position
429 /// | not_full: AtomicU32 | Wait/notify for producers
430 /// | _pad1: [u8; 56] | Cache line padding
431 /// +----------------------+ <-- 64-byte aligned
432 /// | tail: AtomicU32 | Producer position
433 /// | not_empty: AtomicU32 | Wait/notify for consumers
434 /// | _pad2: [u8; 56] | Cache line padding
435 /// +----------------------+
436 /// | mask: u32 | entries - 1 (for fast modulo)
437 /// | entries: u32 | Ring capacity (power of 2)
438 /// +----------------------+
439 /// | slots[entries] | Array of Slot<T> entries
440 /// +----------------------+
441 /// ```
442 ///
443 /// ## Thread Safety
444 ///
445 /// - **Producer calls**: `try_enqueue()`, `enqueue_blocking()`
446 /// - **Consumer calls**: `try_dequeue()`, `dequeue_blocking()`
447 /// - **No internal locking**: Uses atomic CAS operations only
448 /// - **Wait-free progress**: No thread can be indefinitely blocked
449 #[repr(C, align(64))]
450 pub struct Ring<T> {
451 /// Consumer position (head of queue).
452 /// Only modified by consumer thread via CAS.
453 head: AtomicU32,
454
455 /// Producer wait/notify synchronization.
456 /// Used by producers to wait when ring is full.
457 not_full: AtomicU32,
458
459 /// Cache line padding to prevent false sharing with tail.
460 _pad1: [u8; 56],
461
462 /// Producer position (tail of queue).
463 /// Only modified by producer thread via CAS.
464 tail: AtomicU32,
465
466 /// Consumer wait/notify synchronization.
467 /// Used by consumers to wait when ring is empty.
468 not_empty: AtomicU32,
469
470 /// Cache line padding to prevent false sharing with head.
471 _pad2: [u8; 56],
472
473 /// Bitmask for fast modulo: `index & mask` ≡ `index % entries`
474 mask: u32,
475
476 /// Ring capacity (must be power of 2).
477 entries: u32,
478
479 /// Zero-sized type marker for generic parameter.
480 _marker: core::marker::PhantomData<T>,
481 }
482
483 impl<T: Copy> Ring<T> {
484 /// Get pointer to the slots array following the Ring header.
485 #[inline(always)]
486 fn slots_ptr(&self) -> *const Slot<T> {
487 unsafe { (self as *const Ring<T>).add(1) as *const Slot<T> }
488 }
489
490 /// Get reference to slot at the given index (masked for wraparound).
491 #[inline(always)]
492 unsafe fn slot_at(&self, i: u32) -> &Slot<T> {
493 &*self.slots_ptr().add((i & self.mask) as usize)
494 }
495
496 /// Initialize a ring buffer at the given memory location.
497 ///
498 /// # Safety
499 ///
500 /// - `ptr` must point to sufficient memory for the ring and all slots
501 /// - `entries_pow2` must be a power of 2
502 /// - The ring must not be accessed concurrently during initialization
503 #[inline(always)]
504 pub unsafe fn init_at(ptr: *mut u8, entries_pow2: u32) -> &'static Self {
505 let ring = &mut *(ptr as *mut Ring<T>);
506 // Initialize producer/consumer positions
507 ring.head.store(0, Ordering::Relaxed);
508 ring.tail.store(0, Ordering::Relaxed);
509 // Initialize wait/notify counters
510 ring.not_empty.store(0, Ordering::Relaxed);
511 ring.not_full.store(0, Ordering::Relaxed);
512 // Set ring parameters
513 ring.entries = entries_pow2;
514 ring.mask = entries_pow2 - 1;
515 // Initialize slot sequence numbers to their indices
516 for i in 0..entries_pow2 {
517 ring.slot_at(i).seq.store(i, Ordering::Relaxed);
518 }
519 ring
520 }
521
522 /// Attempt to enqueue an item without blocking.
523 ///
524 /// Returns `true` if the item was successfully enqueued, `false` if the ring is full.
525 ///
526 /// ## Algorithm
527 ///
528 /// 1. Load current tail position
529 /// 2. Check if the corresponding slot is available (`seq == tail`)
530 /// 3. Attempt to claim the slot by CAS'ing tail forward
531 /// 4. If successful: write payload, update sequence, notify consumers
532 /// 5. If slot unavailable: check if ring is full or retry
533 ///
534 /// ## Memory Ordering
535 ///
536 /// - `Acquire` load of sequence number ensures payload visibility
537 /// - `Release` store of sequence number publishes payload to consumers
538 /// - `Release` notify ensures consumer sees all prior writes
539 ///
540 /// See [memory barriers](https://en.wikipedia.org/wiki/Memory_barrier) for details.
541 #[inline(always)]
542 pub fn try_enqueue(&self, item: T) -> bool {
543 unsafe {
544 loop {
545 // Load producer position (relaxed: no ordering requirements)
546 let t = self.tail.load(Ordering::Relaxed);
547 let slot = self.slot_at(t);
548
549 // Check if slot is available for us (Acquire: see previous publications)
550 let s = slot.seq.load(Ordering::Acquire);
551 let diff = s.wrapping_sub(t);
552
553 if diff == 0 {
554 // Slot is free. Try to claim it by advancing tail.
555 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
556 if self
557 .tail
558 .compare_exchange_weak(
559 t,
560 t.wrapping_add(1),
561 Ordering::Relaxed, // Success: no special ordering
562 Ordering::Relaxed, // Failure: no special ordering
563 )
564 .is_ok()
565 {
566 // Successfully claimed slot. Write payload and publish.
567 *slot.payload.get() = item;
568 slot.seq.store(t.wrapping_add(1), Ordering::Release);
569 // Notify waiting consumers (Release: publish all prior writes)
570 self.not_empty.fetch_add(1, Ordering::Release);
571 sys::notify(&self.not_empty, 1);
572 return true;
573 }
574 // CAS failed: another producer claimed it, retry
575 } else if (diff as i32) < 0 {
576 // Ring is full: slot sequence is too far ahead
577 return false;
578 }
579 // Slot not ready yet, retry (another iteration in progress)
580 }
581 }
582 }
583
584 /// Attempt to dequeue an item without blocking.
585 ///
586 /// Returns `Some(item)` if an item was successfully dequeued, `None` if the ring is empty.
587 ///
588 /// ## Algorithm
589 ///
590 /// 1. Load current head position
591 /// 2. Check if the corresponding slot has data (`seq == head + 1`)
592 /// 3. Attempt to claim the slot by CAS'ing head forward
593 /// 4. If successful: read payload, update sequence for reuse, notify producers
594 /// 5. If slot unavailable: check if ring is empty or retry
595 ///
596 /// ## Sequence Number Reuse
597 ///
598 /// After consuming, sequence is set to `head + mask + 1`. Since `mask = entries - 1`,
599 /// this ensures the slot won't be reused until after a full ring cycle, preventing ABA.
600 #[inline(always)]
601 pub fn try_dequeue(&self) -> Option<T> {
602 unsafe {
603 loop {
604 // Load consumer position (relaxed: no ordering requirements)
605 let h = self.head.load(Ordering::Relaxed);
606 let slot = self.slot_at(h);
607
608 // Check if slot has data for us (Acquire: see producer's publication)
609 let s = slot.seq.load(Ordering::Acquire);
610 let diff = s.wrapping_sub(h.wrapping_add(1));
611
612 if diff == 0 {
613 // Slot has data. Try to claim it by advancing head.
614 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
615 if self
616 .head
617 .compare_exchange_weak(
618 h,
619 h.wrapping_add(1),
620 Ordering::Relaxed, // Success: no special ordering
621 Ordering::Relaxed, // Failure: no special ordering
622 )
623 .is_ok()
624 {
625 // Successfully claimed slot. Read payload and release for reuse.
626 let item = *slot.payload.get();
627 // Set sequence for next cycle: h + mask + 1 prevents ABA issues
628 slot.seq
629 .store(h.wrapping_add(self.mask).wrapping_add(1), Ordering::Release);
630 // Notify waiting producers (Release: publish slot availability)
631 self.not_full.fetch_add(1, Ordering::Release);
632 sys::notify(&self.not_full, 1);
633 return Some(item);
634 }
635 // CAS failed: another consumer claimed it, retry
636 } else if (diff as i32) < 0 {
637 // Ring is empty: no data available
638 return None;
639 }
640 // Slot not ready yet, retry (another operation in progress)
641 }
642 }
643 }
644
645 /// Enqueue an item, blocking until space is available.
646 ///
647 /// Uses WASM atomic wait/notify for efficient blocking when the ring is full.
648 /// The wait is interruptible and will retry the enqueue operation.
649 #[inline(always)]
650 pub fn enqueue_blocking(&self, item: T) {
651 while !self.try_enqueue(item) {
652 // Load current state and wait for notification
653 let v = self.not_full.load(Ordering::Acquire);
654 // Double-check after loading (spurious wakeup protection)
655 if self.try_enqueue(item) {
656 return;
657 }
658 // Wait for producer to notify us of available space
659 sys::wait32(&self.not_full, v);
660 }
661 }
662
663 /// Dequeue an item, blocking until data is available.
664 ///
665 /// Uses WASM atomic wait/notify for efficient blocking when the ring is empty.
666 /// The wait is interruptible and will retry the dequeue operation.
667 #[inline(always)]
668 pub fn dequeue_blocking(&self) -> T {
669 loop {
670 if let Some(x) = self.try_dequeue() {
671 return x;
672 }
673 // Load current state and wait for notification
674 let v = self.not_empty.load(Ordering::Acquire);
675 // Double-check after loading (spurious wakeup protection)
676 if let Some(x) = self.try_dequeue() {
677 return x;
678 }
679 // Wait for consumer to notify us of available data
680 sys::wait32(&self.not_empty, v);
681 }
682 }
683 }
684
685 #[inline(always)]
686 const fn ring_bytes<T>(entries: u32) -> u32 {
687 let header = core::mem::size_of::<Ring<T>>() as u32;
688 let slot = core::mem::size_of::<Slot<T>>() as u32;
689 align64(header + entries * slot)
690 }
691
692 // -------------------------------------------------------------------------
693 // Header layout (fixed offsets)
694 // -------------------------------------------------------------------------
695 struct SabLayout {
696 offset_sq: u32,
697 offset_cq: u32,
698 offset_stdin: u32,
699 offset_stdout: u32,
700 offset_stdin_wait: u32,
701 offset_kind: u32,
702 offset_sym: u32,
703 offset_left_id: u32,
704 offset_right_id: u32,
705 offset_hash32: u32,
706 offset_next_idx: u32,
707 offset_buckets: u32,
708 offset_term_cache: u32,
709 total_size: u32,
710 }
711
712 #[repr(C, align(64))]
713 struct SabHeader {
714 magic: u32,
715 ring_entries: u32,
716 ring_mask: u32,
717 offset_sq: u32,
718 offset_cq: u32,
719 offset_stdin: u32,
720 offset_stdout: u32,
721 offset_stdin_wait: u32,
722 offset_kind: u32,
723 offset_sym: u32,
724 offset_left_id: u32,
725 offset_right_id: u32,
726 offset_hash32: u32,
727 offset_next_idx: u32,
728 offset_buckets: u32,
729 offset_term_cache: u32,
730 capacity: u32,
731 bucket_mask: u32,
732 resize_seq: AtomicU32,
733 top: AtomicU32,
734 }
735
736 impl SabHeader {
737 fn layout(capacity: u32) -> SabLayout {
738 let header_size = core::mem::size_of::<SabHeader>() as u32;
739 let offset_sq = align64(header_size);
740 let offset_cq = align64(offset_sq + ring_bytes::<Sqe>(RING_ENTRIES));
741 let offset_stdin = align64(offset_cq + ring_bytes::<Cqe>(RING_ENTRIES));
742 let offset_stdout = align64(offset_stdin + ring_bytes::<u8>(RING_ENTRIES));
743 let offset_stdin_wait = align64(offset_stdout + ring_bytes::<u8>(RING_ENTRIES));
744
745 let offset_kind = align64(offset_stdin_wait + ring_bytes::<u32>(RING_ENTRIES));
746 let offset_sym = offset_kind + capacity;
747 let offset_left_id = align64(offset_sym + capacity);
748 let offset_right_id = offset_left_id + 4 * capacity;
749 let offset_hash32 = offset_right_id + 4 * capacity;
750 let offset_next_idx = offset_hash32 + 4 * capacity;
751 let offset_buckets = align64(offset_next_idx + 4 * capacity);
752 let offset_term_cache = offset_buckets + 4 * capacity;
753 let total_size = offset_term_cache + (TERM_CACHE_LEN as u32) * 4;
754 SabLayout {
755 offset_sq,
756 offset_cq,
757 offset_stdin,
758 offset_stdout,
759 offset_stdin_wait,
760 offset_kind,
761 offset_sym,
762 offset_left_id,
763 offset_right_id,
764 offset_hash32,
765 offset_next_idx,
766 offset_buckets,
767 offset_term_cache,
768 total_size,
769 }
770 }
771 }
772
773 // -------------------------------------------------------------------------
774 // Globals
775 // -------------------------------------------------------------------------
776 static mut ARENA_BASE_ADDR: u32 = 0;
777 static mut ARENA_MODE: u32 = 0;
778
779 #[inline(always)]
780 unsafe fn ensure_arena() {
781 if ARENA_BASE_ADDR != 0 {
782 return;
783 }
784 // If we were supposed to be in SAB mode but have no base, that's fatal.
785 if ARENA_MODE == 1 {
786 wasm32::unreachable();
787 }
788 let ptr = allocate_raw_arena(INITIAL_CAP);
789 if ptr.is_null() {
790 wasm32::unreachable();
791 }
792 // Heap / single-instance mode
793 ARENA_MODE = 0;
794 }
795
796 // -------------------------------------------------------------------------
797 // Helpers to locate structures
798 // -------------------------------------------------------------------------
799 #[inline(always)]
800 unsafe fn header() -> &'static SabHeader {
801 &*(ARENA_BASE_ADDR as *const SabHeader)
802 }
803
804 #[inline(always)]
805 unsafe fn header_mut() -> &'static mut SabHeader {
806 &mut *(ARENA_BASE_ADDR as *mut SabHeader)
807 }
808
809 #[inline(always)]
810 unsafe fn sq_ring() -> &'static Ring<Sqe> {
811 let h = header();
812 &*((ARENA_BASE_ADDR + h.offset_sq) as *const Ring<Sqe>)
813 }
814
815 #[inline(always)]
816 unsafe fn cq_ring() -> &'static Ring<Cqe> {
817 let h = header();
818 &*((ARENA_BASE_ADDR + h.offset_cq) as *const Ring<Cqe>)
819 }
820
821 #[inline(always)]
822 unsafe fn stdin_ring() -> &'static Ring<u8> {
823 let h = header();
824 &*((ARENA_BASE_ADDR + h.offset_stdin) as *const Ring<u8>)
825 }
826
827 #[inline(always)]
828 unsafe fn stdout_ring() -> &'static Ring<u8> {
829 let h = header();
830 &*((ARENA_BASE_ADDR + h.offset_stdout) as *const Ring<u8>)
831 }
832
833 #[inline(always)]
834 unsafe fn stdin_wait_ring() -> &'static Ring<u32> {
835 let h = header();
836 &*((ARENA_BASE_ADDR + h.offset_stdin_wait) as *const Ring<u32>)
837 }
838
839 // Array helpers
840 #[inline(always)]
841 unsafe fn kind_ptr() -> *mut AtomicU8 {
842 (ARENA_BASE_ADDR + header().offset_kind) as *mut AtomicU8
843 }
844 #[inline(always)]
845 unsafe fn sym_ptr() -> *mut AtomicU8 {
846 (ARENA_BASE_ADDR + header().offset_sym) as *mut AtomicU8
847 }
848 #[inline(always)]
849 unsafe fn left_ptr() -> *mut AtomicU32 {
850 (ARENA_BASE_ADDR + header().offset_left_id) as *mut AtomicU32
851 }
852 #[inline(always)]
853 unsafe fn right_ptr() -> *mut AtomicU32 {
854 (ARENA_BASE_ADDR + header().offset_right_id) as *mut AtomicU32
855 }
856 #[inline(always)]
857 unsafe fn hash_ptr() -> *mut AtomicU32 {
858 (ARENA_BASE_ADDR + header().offset_hash32) as *mut AtomicU32
859 }
860 #[inline(always)]
861 unsafe fn next_ptr() -> *mut AtomicU32 {
862 (ARENA_BASE_ADDR + header().offset_next_idx) as *mut AtomicU32
863 }
864 #[inline(always)]
865 unsafe fn buckets_ptr() -> *mut AtomicU32 {
866 (ARENA_BASE_ADDR + header().offset_buckets) as *mut AtomicU32
867 }
868 #[inline(always)]
869 unsafe fn term_cache_ptr() -> *mut AtomicU32 {
870 (ARENA_BASE_ADDR + header().offset_term_cache) as *mut AtomicU32
871 }
872
873 // -------------------------------------------------------------------------
874 // Hashing helpers
875 // -------------------------------------------------------------------------
876 fn avalanche32(mut x: u32) -> u32 {
877 x ^= x >> 16;
878 x = x.wrapping_mul(0x7feb_352d);
879 x ^= x >> 15;
880 x = x.wrapping_mul(0x846c_a68b);
881 x ^= x >> 16;
882 x
883 }
884 const GOLD: u32 = 0x9e37_79b9;
885 fn mix(a: u32, b: u32) -> u32 {
886 avalanche32(a ^ b.wrapping_mul(GOLD))
887 }
888
889 // If a resize fails (OOM / max cap), poison the seqlock so other threads trap
890 // instead of spinning forever on an odd seq.
891 const POISON_SEQ: u32 = 0xffff_ffff;
892
893 // -------------------------------------------------------------------------
894 // Resize guard (seqlock-style)
895 // -------------------------------------------------------------------------
896 // See "WASM atomics / CAS safety notes" above for CAS/ABA discussion.
897 // This specific guard is seqlock-style: even=stable, odd=resize in progress.
898 // For READING existing data. Returns (seq, h) to enable read-verify-retry pattern:
899 // 1. Call enter_stable() to get sequence number
900 // 2. Read the data you need
901 // 3. Call check_stable(seq) to verify sequence didn't change
902 // 4. If changed, retry (resize occurred during read)
903 // Used in: kindOf(), symOf(), leftOf(), rightOf(), hash lookups
904 #[inline(always)]
905 fn enter_stable() -> (u32, &'static SabHeader) {
906 unsafe {
907 let h = header();
908 loop {
909 let seq = h.resize_seq.load(Ordering::Acquire);
910 if seq == POISON_SEQ {
911 core::arch::wasm32::unreachable();
912 }
913 if seq & 1 == 1 {
914 core::hint::spin_loop();
915 continue;
916 }
917 return (seq, h);
918 }
919 }
920 }
921
922 // For WRITING new data. See comment above enter_stable() for distinction.
923 // Simply waits until resize completes (sequence is even).
924 // No verification needed since we're not reading existing data.
925 // Used in: allocTerminal(), allocCons() allocation path, alloc_generic()
926 #[inline(always)]
927 fn wait_resize_stable() {
928 unsafe {
929 let h = header();
930 loop {
931 let seq = h.resize_seq.load(Ordering::Acquire);
932 if seq == POISON_SEQ {
933 core::arch::wasm32::unreachable();
934 }
935 if (seq & 1) == 0 {
936 return;
937 }
938 core::hint::spin_loop();
939 }
940 }
941 }
942
943 #[inline(always)]
944 fn check_stable(seq: u32) -> bool {
945 unsafe { header().resize_seq.load(Ordering::Acquire) == seq }
946 }
947
948 // -------------------------------------------------------------------------
949 // Arena init / connect
950 // -------------------------------------------------------------------------
951 unsafe fn zero_region(start: u32, len: u32) {
952 core::ptr::write_bytes((ARENA_BASE_ADDR + start) as *mut u8, 0, len as usize);
953 }
954
955 unsafe fn init_header(capacity: u32) {
956 let layout = SabHeader::layout(capacity);
957 let h = &mut *(ARENA_BASE_ADDR as *mut SabHeader);
958 h.magic = ARENA_MAGIC;
959 h.ring_entries = RING_ENTRIES;
960 h.ring_mask = RING_ENTRIES - 1;
961 h.offset_sq = layout.offset_sq;
962 h.offset_cq = layout.offset_cq;
963 h.offset_stdin = layout.offset_stdin;
964 h.offset_stdout = layout.offset_stdout;
965 h.offset_stdin_wait = layout.offset_stdin_wait;
966 h.offset_kind = layout.offset_kind;
967 h.offset_sym = layout.offset_sym;
968 h.offset_left_id = layout.offset_left_id;
969 h.offset_right_id = layout.offset_right_id;
970 h.offset_hash32 = layout.offset_hash32;
971 h.offset_next_idx = layout.offset_next_idx;
972 h.offset_buckets = layout.offset_buckets;
973 h.offset_term_cache = layout.offset_term_cache;
974 h.capacity = capacity;
975 h.bucket_mask = capacity - 1;
976 h.resize_seq.store(0, Ordering::Relaxed);
977 h.top.store(0, Ordering::Relaxed);
978
979 zero_region(
980 (core::mem::size_of::<SabHeader>()) as u32,
981 layout.total_size - core::mem::size_of::<SabHeader>() as u32,
982 );
983
984 Ring::<Sqe>::init_at((ARENA_BASE_ADDR + h.offset_sq) as *mut u8, RING_ENTRIES);
985 Ring::<Cqe>::init_at((ARENA_BASE_ADDR + h.offset_cq) as *mut u8, RING_ENTRIES);
986 Ring::<u8>::init_at((ARENA_BASE_ADDR + h.offset_stdin) as *mut u8, RING_ENTRIES);
987 Ring::<u8>::init_at((ARENA_BASE_ADDR + h.offset_stdout) as *mut u8, RING_ENTRIES);
988 Ring::<u32>::init_at((ARENA_BASE_ADDR + h.offset_stdin_wait) as *mut u8, RING_ENTRIES);
989
990 // Buckets + cache init
991 let buckets = buckets_ptr();
992 for i in 0..capacity as usize {
993 buckets.add(i).write(AtomicU32::new(EMPTY));
994 }
995 let cache = term_cache_ptr();
996 for i in 0..TERM_CACHE_LEN {
997 cache.add(i).write(AtomicU32::new(EMPTY));
998 }
999 }
1000
1001 unsafe fn allocate_raw_arena(capacity: u32) -> *mut SabHeader {
1002 let layout = SabHeader::layout(capacity);
1003 let pages_needed = (layout.total_size as usize + WASM_PAGE_SIZE - 1) / WASM_PAGE_SIZE;
1004 let old_pages = wasm32::memory_grow(0, pages_needed);
1005 if old_pages == usize::MAX {
1006 return core::ptr::null_mut();
1007 }
1008 let base_addr = (old_pages * WASM_PAGE_SIZE) as u32;
1009 ARENA_BASE_ADDR = base_addr;
1010 init_header(capacity);
1011 base_addr as *mut SabHeader
1012 }
1013
1014 // -------------------------------------------------------------------------
1015 // Exports: init/connect/reset
1016 // -------------------------------------------------------------------------
1017 #[no_mangle]
1018 pub extern "C" fn initArena(initial_capacity: u32) -> u32 {
1019 if initial_capacity < 1024 || initial_capacity > MAX_CAP || !initial_capacity.is_power_of_two() {
1020 return 0;
1021 }
1022 unsafe {
1023 if ARENA_BASE_ADDR != 0 {
1024 return ARENA_BASE_ADDR;
1025 }
1026 let ptr = allocate_raw_arena(initial_capacity);
1027 if ptr.is_null() {
1028 return 1;
1029 }
1030 ARENA_MODE = 1;
1031 ARENA_BASE_ADDR
1032 }
1033 }
1034
1035 #[no_mangle]
1036 pub extern "C" fn connectArena(ptr_addr: u32) -> u32 {
1037 if ptr_addr == 0 || ptr_addr % 64 != 0 {
1038 return 0;
1039 }
1040 unsafe {
1041 ARENA_BASE_ADDR = ptr_addr;
1042 ARENA_MODE = 1;
1043 let h = header();
1044 if h.magic != ARENA_MAGIC {
1045 return 5;
1046 }
1047 1
1048 }
1049 }
1050
1051 #[no_mangle]
1052 pub extern "C" fn reset() {
1053 unsafe {
1054 ensure_arena();
1055 let h = header_mut();
1056 h.top.store(0, Ordering::Release);
1057 let buckets = buckets_ptr();
1058 for i in 0..h.capacity as usize {
1059 (*buckets.add(i)).store(EMPTY, Ordering::Release);
1060 }
1061 let cache = term_cache_ptr();
1062 for i in 0..TERM_CACHE_LEN {
1063 (*cache.add(i)).store(EMPTY, Ordering::Release);
1064 }
1065 h.resize_seq.store(h.resize_seq.load(Ordering::Relaxed) & !1, Ordering::Release);
1066 }
1067 }
1068
1069 // -------------------------------------------------------------------------
1070 // Accessors
1071 // -------------------------------------------------------------------------
1072 #[no_mangle]
1073 pub extern "C" fn kindOf(n: u32) -> u32 {
1074 unsafe {
1075 ensure_arena();
1076 loop {
1077 let (seq, h) = enter_stable();
1078 let cap = h.capacity;
1079 if n >= cap {
1080 return 0;
1081 }
1082 let val = (*kind_ptr().add(n as usize)).load(Ordering::Acquire) as u32;
1083 core::sync::atomic::fence(Ordering::Acquire);
1084 if check_stable(seq) {
1085 return val;
1086 }
1087 }
1088 }
1089 }
1090
1091 #[no_mangle]
1092 pub extern "C" fn symOf(n: u32) -> u32 {
1093 unsafe {
1094 ensure_arena();
1095 loop {
1096 let (seq, h) = enter_stable();
1097 if n >= h.capacity {
1098 return 0;
1099 }
1100 let val = (*sym_ptr().add(n as usize)).load(Ordering::Acquire) as u32;
1101 core::sync::atomic::fence(Ordering::Acquire);
1102 if check_stable(seq) {
1103 return val;
1104 }
1105 }
1106 }
1107 }
1108
1109 #[no_mangle]
1110 pub extern "C" fn leftOf(n: u32) -> u32 {
1111 unsafe {
1112 ensure_arena();
1113 loop {
1114 let (seq, h) = enter_stable();
1115 if n >= h.capacity {
1116 return 0;
1117 }
1118 let val = (*left_ptr().add(n as usize)).load(Ordering::Acquire);
1119 core::sync::atomic::fence(Ordering::Acquire);
1120 if check_stable(seq) {
1121 return val;
1122 }
1123 }
1124 }
1125 }
1126
1127 #[no_mangle]
1128 pub extern "C" fn rightOf(n: u32) -> u32 {
1129 unsafe {
1130 ensure_arena();
1131 loop {
1132 let (seq, h) = enter_stable();
1133 if n >= h.capacity {
1134 return 0;
1135 }
1136 let val = (*right_ptr().add(n as usize)).load(Ordering::Acquire);
1137 core::sync::atomic::fence(Ordering::Acquire);
1138 if check_stable(seq) {
1139 return val;
1140 }
1141 }
1142 }
1143 }
1144
1145 // -------------------------------------------------------------------------
1146 // Allocation (speculative, lock-free)
1147 // -------------------------------------------------------------------------
1148 #[no_mangle]
1149 pub extern "C" fn allocTerminal(sym: u32) -> u32 {
1150 unsafe {
1151 ensure_arena();
1152 let h = header();
1153
1154 if sym < TERM_CACHE_LEN as u32 {
1155 let cached = (*term_cache_ptr().add(sym as usize)).load(Ordering::Acquire);
1156 if cached != EMPTY {
1157 return cached;
1158 }
1159 }
1160
1161 loop {
1162 wait_resize_stable();
1163 let id = h.top.fetch_add(1, Ordering::AcqRel);
1164 if id >= h.capacity {
1165 grow();
1166 continue;
1167 }
1168
1169 (*kind_ptr().add(id as usize)).store(ArenaKind::Terminal as u8, Ordering::Release);
1170 (*sym_ptr().add(id as usize)).store(sym as u8, Ordering::Release);
1171 (*hash_ptr().add(id as usize)).store(sym, Ordering::Release);
1172
1173 if sym < TERM_CACHE_LEN as u32 {
1174 (*term_cache_ptr().add(sym as usize)).store(id, Ordering::Release);
1175 }
1176 return id;
1177 }
1178 }
1179 }
1180
1181 #[no_mangle]
1182 /// Allocate a NonTerm (application) node with hash-consing optimization.
1183 ///
1184 /// This is the core optimization that enables structural sharing in the arena.
1185 /// Instead of always creating new nodes, it checks if an identical `(l r)` pair
1186 /// already exists and returns the existing node ID if found.
1187 ///
1188 /// ## Hash-Consing Algorithm
1189 ///
1190 /// 1. **Compute hash**: `mix(hash(l), hash(r))` using avalanche hashing
1191 /// 2. **Lookup in hash table**: Check bucket for existing `(l,r)` pairs
1192 /// 3. **Return existing**: If found, return the shared node ID
1193 /// 4. **Allocate new**: If not found, create new node and add to hash table
1194 ///
1195 /// ## Memory Efficiency
1196 ///
1197 /// - **DAG representation**: Common subexpressions are shared
1198 /// - **Reduced allocations**: Avoids duplicate node creation
1199 /// - **Cache-friendly**: Hash table enables O(1) lookups
1200 ///
1201 /// ## Thread Safety
1202 ///
1203 /// - Uses seqlock to handle concurrent resizing
1204 /// - Atomic operations for hash table consistency
1205 /// - CAS-based insertion prevents race conditions
1206 pub extern "C" fn allocCons(l: u32, r: u32) -> u32 {
1207 unsafe {
1208 ensure_arena();
1209
1210 // Compute hash value (doesn't depend on header state)
1211 let hl = loop {
1212 let (seq, h) = enter_stable();
1213 if l >= h.capacity {
1214 if !check_stable(seq) { continue; }
1215 return EMPTY; // Invalid left node
1216 }
1217 let val = (*hash_ptr().add(l as usize)).load(Ordering::Acquire);
1218 core::sync::atomic::fence(Ordering::Acquire);
1219 if check_stable(seq) {
1220 break val;
1221 }
1222 };
1223 let hr = loop {
1224 let (seq, h) = enter_stable();
1225 if r >= h.capacity {
1226 if !check_stable(seq) { continue; }
1227 return EMPTY; // Invalid right node
1228 }
1229 let val = (*hash_ptr().add(r as usize)).load(Ordering::Acquire);
1230 core::sync::atomic::fence(Ordering::Acquire);
1231 if check_stable(seq) {
1232 break val;
1233 }
1234 };
1235 let hval = mix(hl, hr);
1236
1237 // Retry loop for stable reads
1238 let _ = loop {
1239 let (seq, h) = enter_stable(); // Wait if resizing
1240 let mask = h.bucket_mask;
1241 let bucket_idx = (hval & mask) as usize;
1242
1243 // Validate bucket index is safe before dereferencing
1244 if bucket_idx >= h.capacity as usize {
1245 // Capacity changed mid-read? Retry.
1246 if !check_stable(seq) { continue; }
1247 // Should be unreachable if logic is correct, but safe fallback
1248 continue;
1249 }
1250
1251 let buckets = buckets_ptr();
1252 let next = next_ptr();
1253
1254 let mut cur = (*buckets.add(bucket_idx)).load(Ordering::Acquire);
1255 let mut found = EMPTY;
1256
1257 while cur != EMPTY {
1258 // Bounds check for safety
1259 if cur >= h.capacity { break; }
1260
1261 let k = (*kind_ptr().add(cur as usize)).load(Ordering::Acquire);
1262 if k == ArenaKind::NonTerm as u8 {
1263 let ch = (*hash_ptr().add(cur as usize)).load(Ordering::Acquire);
1264 if ch == hval {
1265 let cl = (*left_ptr().add(cur as usize)).load(Ordering::Acquire);
1266 let cr = (*right_ptr().add(cur as usize)).load(Ordering::Acquire);
1267 if cl == l && cr == r {
1268 found = cur;
1269 break;
1270 }
1271 }
1272 }
1273 cur = (*next.add(cur as usize)).load(Ordering::Acquire);
1274 }
1275
1276 // If we found it, verify the lock is still valid.
1277 // If lock changed, our read might be garbage -> Retry.
1278 if check_stable(seq) {
1279 if found != EMPTY {
1280 return found;
1281 }
1282 // If not found, we break to the allocation logic with bucket index
1283 break bucket_idx;
1284 }
1285 // If lock changed, retry the whole lookup
1286 };
1287
1288 // allocate new
1289 loop {
1290 wait_resize_stable();
1291
1292 // Reload pointers/mask in case they changed during wait_resize_stable()
1293 let h = header();
1294 let buckets = buckets_ptr();
1295 let current_mask = h.bucket_mask;
1296 let b = (hval & current_mask) as usize;
1297
1298 let id = h.top.fetch_add(1, Ordering::AcqRel);
1299 if id >= h.capacity {
1300 grow();
1301 continue;
1302 }
1303
1304 (*kind_ptr().add(id as usize)).store(ArenaKind::NonTerm as u8, Ordering::Release);
1305 (*left_ptr().add(id as usize)).store(l, Ordering::Release);
1306 (*right_ptr().add(id as usize)).store(r, Ordering::Release);
1307 (*hash_ptr().add(id as usize)).store(hval, Ordering::Release);
1308
1309 // insert into bucket with CAS; if we lose, drop id as hole (kind=0)
1310 let next = next_ptr();
1311 loop {
1312 let head = (*buckets.add(b)).load(Ordering::Acquire);
1313 (*next.add(id as usize)).store(head, Ordering::Relaxed);
1314 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1315 if (*buckets.add(b))
1316 .compare_exchange(head, id, Ordering::Release, Ordering::Relaxed)
1317 .is_ok()
1318 {
1319 return id;
1320 }
1321 // someone inserted; check if it matches now
1322 let mut cur2 = (*buckets.add(b)).load(Ordering::Acquire);
1323 while cur2 != EMPTY {
1324 let ck2 = (*kind_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1325 if ck2 != ArenaKind::NonTerm as u8 {
1326 cur2 = (*next.add(cur2 as usize)).load(Ordering::Acquire);
1327 continue;
1328 }
1329 let ch2 = (*hash_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1330 if ch2 == hval {
1331 let cl2 = (*left_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1332 let cr2 = (*right_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1333 if cl2 == l && cr2 == r {
1334 // mark hole
1335 (*kind_ptr().add(id as usize)).store(0, Ordering::Release);
1336 return cur2;
1337 }
1338 }
1339 cur2 = (*next.add(cur2 as usize)).load(Ordering::Acquire);
1340 }
1341 }
1342 }
1343 }
1344 }
1345
1346 // Generic allocation (non hash-consed; NOT inserted into buckets).
1347 // This is used for reducer continuations/suspensions.
1348 #[inline(always)]
1349 unsafe fn alloc_generic(kind: u8, sym: u8, left: u32, right: u32, hash: u32) -> u32 {
1350 ensure_arena();
1351 let h = header();
1352 loop {
1353 wait_resize_stable();
1354 let id = h.top.fetch_add(1, Ordering::AcqRel);
1355 if id >= h.capacity {
1356 grow();
1357 continue;
1358 }
1359 // Publish payload, then kind last.
1360 (*sym_ptr().add(id as usize)).store(sym, Ordering::Release);
1361 (*left_ptr().add(id as usize)).store(left, Ordering::Release);
1362 (*right_ptr().add(id as usize)).store(right, Ordering::Release);
1363 (*hash_ptr().add(id as usize)).store(hash, Ordering::Release);
1364 (*kind_ptr().add(id as usize)).store(kind, Ordering::Release);
1365 return id;
1366 }
1367 }
1368
1369 // -------------------------------------------------------------------------
1370 // Resize (stop-the-world via odd/even seq)
1371 // -------------------------------------------------------------------------
1372 fn grow() {
1373 unsafe {
1374 let h = header_mut();
1375 let mut expected = h.resize_seq.load(Ordering::Acquire);
1376 loop {
1377 if expected & 1 == 1 {
1378 core::hint::spin_loop();
1379 expected = h.resize_seq.load(Ordering::Acquire);
1380 continue;
1381 }
1382 // Acquire exclusive "writer" by flipping even->odd with CAS.
1383 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1384 if h.resize_seq.compare_exchange(expected, expected | 1, Ordering::AcqRel, Ordering::Acquire).is_ok() {
1385 break;
1386 }
1387 expected = h.resize_seq.load(Ordering::Acquire);
1388 }
1389
1390 let old_cap = h.capacity;
1391 // Capture OLD offsets before we overwrite the header with new ones.
1392 let old_offset_kind = h.offset_kind;
1393 let old_offset_sym = h.offset_sym;
1394 let old_offset_left = h.offset_left_id;
1395 let old_offset_right = h.offset_right_id;
1396 let old_offset_hash = h.offset_hash32;
1397 let old_offset_next = h.offset_next_idx;
1398 let old_offset_term_cache = h.offset_term_cache;
1399 let old_top = h.top.load(Ordering::Acquire);
1400
1401 if old_cap >= MAX_CAP {
1402 // Poison so other threads trap instead of spinning on odd resize_seq.
1403 h.resize_seq.store(POISON_SEQ, Ordering::Release);
1404 core::arch::wasm32::unreachable();
1405 }
1406 let new_cap = (old_cap * 2).min(MAX_CAP);
1407
1408 let layout = SabHeader::layout(new_cap);
1409 let needed_bytes = ARENA_BASE_ADDR as usize + layout.total_size as usize;
1410 let current_bytes = wasm32::memory_size(0) * WASM_PAGE_SIZE;
1411 if needed_bytes > current_bytes {
1412 let extra = needed_bytes - current_bytes;
1413 let pages = (extra + WASM_PAGE_SIZE - 1) / WASM_PAGE_SIZE;
1414 let res = wasm32::memory_grow(0, pages);
1415 if res == usize::MAX {
1416 // OOM (or denied grow). Poison so other threads trap instead of spinning.
1417 h.resize_seq.store(POISON_SEQ, Ordering::Release);
1418 core::arch::wasm32::unreachable();
1419 }
1420 }
1421
1422 // Update header (rings untouched)
1423 h.capacity = new_cap;
1424 h.bucket_mask = new_cap - 1;
1425 h.offset_sq = layout.offset_sq;
1426 h.offset_cq = layout.offset_cq;
1427 h.offset_stdin = layout.offset_stdin;
1428 h.offset_stdout = layout.offset_stdout;
1429 h.offset_stdin_wait = layout.offset_stdin_wait;
1430 h.offset_kind = layout.offset_kind;
1431 h.offset_sym = layout.offset_sym;
1432 h.offset_left_id = layout.offset_left_id;
1433 h.offset_right_id = layout.offset_right_id;
1434 h.offset_hash32 = layout.offset_hash32;
1435 h.offset_next_idx = layout.offset_next_idx;
1436 h.offset_buckets = layout.offset_buckets;
1437 h.offset_term_cache = layout.offset_term_cache;
1438
1439 // Preserve top
1440 let count = old_top.min(old_cap);
1441 h.top.store(count, Ordering::Release);
1442
1443 // IMPORTANT: reverse-copy order is mandatory.
1444 // The layout is packed contiguously, and new_cap > old_cap means new regions can overlap
1445 // old regions during migration. Copy from the end back to the front.
1446
1447 // Term cache
1448 core::ptr::copy(
1449 (ARENA_BASE_ADDR + old_offset_term_cache) as *const u8,
1450 (ARENA_BASE_ADDR + h.offset_term_cache) as *mut u8,
1451 (TERM_CACHE_LEN * 4) as usize,
1452 );
1453
1454 // Next (u32)
1455 core::ptr::copy(
1456 (ARENA_BASE_ADDR + old_offset_next) as *const u8,
1457 (ARENA_BASE_ADDR + h.offset_next_idx) as *mut u8,
1458 (count as usize) * 4,
1459 );
1460 if new_cap > old_cap {
1461 zero_region(
1462 h.offset_next_idx + old_cap * 4,
1463 (new_cap - old_cap) * 4,
1464 );
1465 }
1466
1467 // Hash (u32)
1468 core::ptr::copy(
1469 (ARENA_BASE_ADDR + old_offset_hash) as *const u8,
1470 (ARENA_BASE_ADDR + h.offset_hash32) as *mut u8,
1471 (count as usize) * 4,
1472 );
1473 if new_cap > old_cap {
1474 zero_region(
1475 h.offset_hash32 + old_cap * 4,
1476 (new_cap - old_cap) * 4,
1477 );
1478 }
1479
1480 // Right (u32)
1481 core::ptr::copy(
1482 (ARENA_BASE_ADDR + old_offset_right) as *const u8,
1483 (ARENA_BASE_ADDR + h.offset_right_id) as *mut u8,
1484 (count as usize) * 4,
1485 );
1486 if new_cap > old_cap {
1487 zero_region(
1488 h.offset_right_id + old_cap * 4,
1489 (new_cap - old_cap) * 4,
1490 );
1491 }
1492
1493 // Left (u32)
1494 core::ptr::copy(
1495 (ARENA_BASE_ADDR + old_offset_left) as *const u8,
1496 (ARENA_BASE_ADDR + h.offset_left_id) as *mut u8,
1497 (count as usize) * 4,
1498 );
1499 if new_cap > old_cap {
1500 zero_region(
1501 h.offset_left_id + old_cap * 4,
1502 (new_cap - old_cap) * 4,
1503 );
1504 }
1505
1506 // Sym (u8)
1507 core::ptr::copy(
1508 (ARENA_BASE_ADDR + old_offset_sym) as *const u8,
1509 (ARENA_BASE_ADDR + h.offset_sym) as *mut u8,
1510 count as usize,
1511 );
1512 if new_cap > old_cap {
1513 zero_region(
1514 h.offset_sym + old_cap,
1515 new_cap - old_cap,
1516 );
1517 }
1518
1519 // Kind (u8)
1520 core::ptr::copy(
1521 (ARENA_BASE_ADDR + old_offset_kind) as *const u8,
1522 (ARENA_BASE_ADDR + h.offset_kind) as *mut u8,
1523 count as usize,
1524 );
1525 if new_cap > old_cap {
1526 zero_region(
1527 h.offset_kind + old_cap,
1528 new_cap - old_cap,
1529 );
1530 }
1531
1532 // Rebuild buckets (hash-consing table) for NonTerm only.
1533 let buckets = buckets_ptr();
1534 let next = next_ptr();
1535 for i in 0..new_cap as usize {
1536 (*buckets.add(i)).store(EMPTY, Ordering::Release);
1537 }
1538 for i in 0..count {
1539 let k = (*kind_ptr().add(i as usize)).load(Ordering::Acquire);
1540 if k != ArenaKind::NonTerm as u8 {
1541 continue;
1542 }
1543 let hv = (*hash_ptr().add(i as usize)).load(Ordering::Acquire);
1544 let b = (hv & h.bucket_mask) as usize;
1545 loop {
1546 let head = (*buckets.add(b)).load(Ordering::Acquire);
1547 (*next.add(i as usize)).store(head, Ordering::Relaxed);
1548 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1549 if (*buckets.add(b))
1550 .compare_exchange(head, i, Ordering::Release, Ordering::Relaxed)
1551 .is_ok()
1552 {
1553 break;
1554 }
1555 }
1556 }
1557
1558 h.resize_seq.fetch_add(1, Ordering::Release);
1559 }
1560 }
1561
1562 // -------------------------------------------------------------------------
1563 // Reducer
1564 // -------------------------------------------------------------------------
1565 // Continuation frame: kind=Continuation, sym=stage, left=parent_stack, right=parent_node
1566 const STAGE_LEFT: u8 = 0;
1567 const STAGE_RIGHT: u8 = 1;
1568
1569 // Suspension: kind=Suspension, sym=mode (0=descend, 1=return), left=curr, right=stack, hash=remaining_steps
1570 const MODE_DESCEND: u8 = 0;
1571 const MODE_RETURN: u8 = 1;
1572 const MODE_IO_WAIT: u8 = 2;
1573
1574 #[inline(always)]
1575 unsafe fn alloc_continuation(parent: u32, target: u32, stage: u8) -> u32 {
1576 alloc_generic(ArenaKind::Continuation as u8, stage, parent, target, 0)
1577 }
1578
1579 #[inline(always)]
1580 unsafe fn alloc_suspension(curr: u32, stack: u32, mode: u8, remaining_steps: u32) -> u32 {
1581 alloc_generic(ArenaKind::Suspension as u8, mode, curr, stack, remaining_steps)
1582 }
1583
1584 #[inline(always)]
1585 unsafe fn alloc_num(value: u32) -> u32 {
1586 alloc_generic(ArenaKind::Terminal as u8, ArenaSym::Num as u8, 0, 0, value)
1587 }
1588
1589 #[inline(always)]
1590 unsafe fn alloc_inc() -> u32 {
1591 allocTerminal(ArenaSym::Inc as u32)
1592 }
1593
1594 #[inline(always)]
1595 unsafe fn alloc_church_zero() -> u32 {
1596 let k = allocTerminal(ArenaSym::K as u32);
1597 let i = allocTerminal(ArenaSym::I as u32);
1598 allocCons(k, i)
1599 }
1600
1601 #[inline(always)]
1602 unsafe fn alloc_church_succ() -> u32 {
1603 let s = allocTerminal(ArenaSym::S as u32);
1604 let k = allocTerminal(ArenaSym::K as u32);
1605 let ks = allocCons(k, s);
1606 let b = allocCons(allocCons(s, ks), k); // S(KS)K
1607 allocCons(s, b) // S B
1608 }
1609
1610 #[inline(always)]
1611 unsafe fn alloc_church_byte(value: u8) -> u32 {
1612 let mut cur = alloc_church_zero();
1613 if value == 0 {
1614 return cur;
1615 }
1616 let succ = alloc_church_succ();
1617 for _ in 0..value {
1618 cur = allocCons(succ, cur);
1619 }
1620 cur
1621 }
1622
1623 #[inline(always)]
1624 unsafe fn decode_church_u32(expr: u32) -> u32 {
1625 let inc = alloc_inc();
1626 let zero = alloc_num(0);
1627 let app = allocCons(allocCons(expr, inc), zero);
1628 let reduced = reduce(app, 1_000_000);
1629 if kindOf(reduced) == ArenaKind::Terminal as u32
1630 && symOf(reduced) == ArenaSym::Num as u32
1631 {
1632 hash_of_internal(reduced)
1633 } else {
1634 0
1635 }
1636 }
1637
1638 // --- OPTIMIZATION: Update existing node (Slot Reuse) ---
1639 #[inline(always)]
1640 unsafe fn update_continuation(id: u32, parent: u32, target: u32, stage: u8) {
1641 (*left_ptr().add(id as usize)).store(parent, Ordering::Relaxed);
1642 (*right_ptr().add(id as usize)).store(target, Ordering::Relaxed);
1643 (*sym_ptr().add(id as usize)).store(stage, Ordering::Relaxed);
1644 // Ensure it is marked as Continuation (in case we recycled a Suspension)
1645 (*kind_ptr().add(id as usize)).store(ArenaKind::Continuation as u8, Ordering::Release);
1646 }
1647
1648 #[inline(always)]
1649 fn hash_of_internal(n: u32) -> u32 {
1650 unsafe {
1651 ensure_arena();
1652 loop {
1653 let (seq, h) = enter_stable();
1654 if n >= h.capacity {
1655 return 0;
1656 }
1657 let val = (*hash_ptr().add(n as usize)).load(Ordering::Acquire);
1658 core::sync::atomic::fence(Ordering::Acquire);
1659 if check_stable(seq) {
1660 return val;
1661 }
1662 }
1663 }
1664 }
1665
1666 /// Unwind the continuation stack to reconstruct the full expression tree.
1667 ///
1668 /// When the step limit is exhausted, the worker may be deep in the expression tree
1669 /// with a non-empty stack. This function walks up the stack, rebuilding parent nodes
1670 /// as necessary, to return the root of the expression rather than a sub-expression.
1671 #[inline(always)]
1672 unsafe fn unwind_to_root(mut curr: u32, mut stack: u32) -> u32 {
1673 while stack != EMPTY {
1674 let recycled = stack;
1675 stack = leftOf(recycled);
1676 let parent_node = rightOf(recycled);
1677 let stage = symOf(recycled) as u8;
1678
1679 if stage == STAGE_LEFT {
1680 let orig_left = leftOf(parent_node);
1681 if curr != orig_left {
1682 // Left child changed, must allocate new parent
1683 curr = allocCons(curr, rightOf(parent_node));
1684 } else {
1685 // Left child didn't change, reuse existing parent
1686 curr = parent_node;
1687 }
1688 } else {
1689 // STAGE_RIGHT
1690 let orig_right = rightOf(parent_node);
1691 if curr != orig_right {
1692 // Right child changed, must allocate new parent
1693 curr = allocCons(leftOf(parent_node), curr);
1694 } else {
1695 curr = parent_node;
1696 }
1697 }
1698 }
1699 curr
1700 }
1701
1702 enum StepResult {
1703 Done(u32),
1704 Yield(u32), // Suspension node id
1705 }
1706
1707 /// Perform one iterative reduction step with preemptive yielding.
1708 ///
1709 /// This implements the core SKI reduction algorithm using an iterative approach
1710 /// with explicit stack management instead of recursion. It can yield mid-step
1711 /// when traversal gas is exhausted, enabling cooperative multitasking.
1712 ///
1713 /// ## Iterative Reduction Algorithm
1714 ///
1715 /// Instead of recursive function calls, uses an explicit stack of Continuation nodes:
1716 ///
1717 /// - **MODE_DESCEND**: Traverse down the expression tree looking for redexes
1718 /// - **MODE_RETURN**: Return up the tree after reducing a subexpression
1719 /// - **Continuation frames**: Represent suspended stack frames as arena nodes
1720 ///
1721 /// ## Gas-Based Preemption
1722 ///
1723 /// - **Traversal gas**: Limits AST traversal depth per step
1724 /// - **Yield on exhaustion**: Returns Suspension node for later resumption
1725 /// - **Cooperative multitasking**: Prevents worker starvation in parallel evaluation
1726 ///
1727 /// ## Step Counting
1728 ///
1729 /// - **Accurate counting**: Decrements `remaining_steps` immediately when each reduction occurs
1730 /// - **Multiple reductions per call**: Can perform multiple reductions in a single call
1731 /// - **Deterministic**: Every reduction is counted exactly once, regardless of batching
1732 ///
1733 /// ## Node Recycling Optimization
1734 ///
1735 /// - **Free node reuse**: Dead continuation frames are recycled immediately
1736 /// - **Memory efficiency**: Reduces allocation pressure during reduction
1737 /// - **Cache locality**: Reuses recently freed nodes
1738 ///
1739 /// ## Parameters
1740 ///
1741 /// - `curr`: Current expression node being evaluated
1742 /// - `stack`: Stack of continuation frames (linked list of nodes)
1743 /// - `mode`: Current evaluation mode (descend/return)
1744 /// - `gas`: Remaining traversal gas (mutable, decremented during execution)
1745 /// - `remaining_steps`: Mutable reference to reduction steps remaining (decremented on each reduction)
1746 /// - `free_node`: Recyclable node ID from previous operations
1747 ///
1748 /// ## Returns
1749 ///
1750 /// - `StepResult::Done(node)`: Reduction completed, `node` is the result
1751 /// - `StepResult::Yield(susp_id)`: Yielded mid-step, `susp_id` is Suspension node
1752 unsafe fn step_iterative(mut curr: u32, mut stack: u32, mut mode: u8, gas: &mut u32, remaining_steps: &mut u32, mut free_node: u32) -> StepResult {
1753 loop {
1754 // Gas exhaustion yield
1755 if *gas == 0 {
1756 // If we have a free_node we didn't use, it's just a hole now.
1757 return StepResult::Yield(alloc_suspension(curr, stack, mode, *remaining_steps));
1758 }
1759 *gas -= 1;
1760
1761 if mode == MODE_RETURN {
1762 if stack == EMPTY {
1763 return StepResult::Done(curr);
1764 }
1765
1766 // POP FRAME
1767 let recycled = stack; // <--- This frame is now dead/recyclable
1768 stack = leftOf(recycled); // Parent
1769 let parent_node = rightOf(recycled);
1770 let stage = symOf(recycled) as u8;
1771
1772 if stage == STAGE_LEFT {
1773 let orig_left = leftOf(parent_node);
1774 if curr != orig_left {
1775 // Rebuild parent
1776 curr = allocCons(curr, rightOf(parent_node));
1777 // 'recycled' is still free, we are returning up.
1778 free_node = recycled; // Keep for next push or pop
1779 mode = MODE_RETURN;
1780 continue;
1781 }
1782 // Left stable, DESCEND RIGHT
1783 // Reuse 'recycled' as the new Continuation frame!
1784 update_continuation(recycled, stack, parent_node, STAGE_RIGHT);
1785 stack = recycled;
1786 mode = MODE_DESCEND;
1787 curr = rightOf(parent_node);
1788 continue;
1789 } else {
1790 let orig_right = rightOf(parent_node);
1791 if curr != orig_right {
1792 curr = allocCons(leftOf(parent_node), curr);
1793 free_node = recycled;
1794 mode = MODE_RETURN;
1795 continue;
1796 }
1797 // Both stable
1798 curr = parent_node;
1799 free_node = recycled;
1800 mode = MODE_RETURN;
1801 continue;
1802 }
1803 }
1804
1805 // MODE_DESCEND
1806 let k = kindOf(curr);
1807 if k != ArenaKind::NonTerm as u32 {
1808 mode = MODE_RETURN;
1809 continue;
1810 }
1811
1812 // NonTerm: check for I/K/S redex at this node.
1813 let left = leftOf(curr);
1814 let right = rightOf(curr);
1815
1816 // [REDUCTION LOGIC START] -----------------------------------------
1817
1818 // I x -> x
1819 if kindOf(left) == ArenaKind::Terminal as u32 && symOf(left) == ArenaSym::I as u32 {
1820 if *remaining_steps == 0 {
1821 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1822 }
1823 *remaining_steps = remaining_steps.saturating_sub(1);
1824
1825 curr = right;
1826 mode = MODE_RETURN;
1827
1828 // Yield IMMEDIATELY if limit hit zero.
1829 // Don't waste gas traversing to the next redex.
1830 if *remaining_steps == 0 {
1831 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1832 }
1833 continue;
1834 }
1835
1836 // readOne k -> k (Church byte)
1837 if kindOf(left) == ArenaKind::Terminal as u32
1838 && symOf(left) == ArenaSym::ReadOne as u32
1839 {
1840 if let Some(byte) = stdin_ring().try_dequeue() {
1841 if *remaining_steps == 0 {
1842 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1843 }
1844 *remaining_steps = remaining_steps.saturating_sub(1);
1845
1846 let numeral = alloc_church_byte(byte);
1847 curr = allocCons(right, numeral);
1848 mode = MODE_RETURN;
1849
1850 if *remaining_steps == 0 {
1851 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1852 }
1853 continue;
1854 }
1855
1856 let susp_id = alloc_suspension(curr, stack, MODE_IO_WAIT, *remaining_steps);
1857 stdin_wait_ring().enqueue_blocking(susp_id);
1858 return StepResult::Yield(susp_id);
1859 }
1860
1861 // writeOne n -> n (enqueue byte)
1862 if kindOf(left) == ArenaKind::Terminal as u32
1863 && symOf(left) == ArenaSym::WriteOne as u32
1864 {
1865 if *remaining_steps == 0 {
1866 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1867 }
1868 *remaining_steps = remaining_steps.saturating_sub(1);
1869
1870 let value = decode_church_u32(right);
1871 let byte = (value & 0xff) as u8;
1872 stdout_ring().enqueue_blocking(byte);
1873
1874 curr = right;
1875 mode = MODE_RETURN;
1876
1877 if *remaining_steps == 0 {
1878 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1879 }
1880 continue;
1881 }
1882
1883 // Inc (Num n) -> Num (n + 1) [internal Church decoding]
1884 if kindOf(left) == ArenaKind::Terminal as u32
1885 && symOf(left) == ArenaSym::Inc as u32
1886 && kindOf(right) == ArenaKind::Terminal as u32
1887 && symOf(right) == ArenaSym::Num as u32
1888 {
1889 if *remaining_steps == 0 {
1890 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1891 }
1892 *remaining_steps = remaining_steps.saturating_sub(1);
1893
1894 let next_val = hash_of_internal(right).wrapping_add(1);
1895 curr = alloc_num(next_val);
1896 mode = MODE_RETURN;
1897
1898 if *remaining_steps == 0 {
1899 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1900 }
1901 continue;
1902 }
1903
1904 if kindOf(left) == ArenaKind::NonTerm as u32 {
1905 let ll = leftOf(left);
1906 // K x y -> x
1907 if kindOf(ll) == ArenaKind::Terminal as u32 && symOf(ll) == ArenaSym::K as u32 {
1908 if *remaining_steps == 0 {
1909 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1910 }
1911 *remaining_steps = remaining_steps.saturating_sub(1);
1912
1913 curr = rightOf(left);
1914 mode = MODE_RETURN;
1915
1916 // Yield IMMEDIATELY
1917 if *remaining_steps == 0 {
1918 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1919 }
1920 continue;
1921 }
1922 // S x y z -> x z (y z)
1923 if kindOf(ll) == ArenaKind::NonTerm as u32 {
1924 let lll = leftOf(ll);
1925 if kindOf(lll) == ArenaKind::Terminal as u32 && symOf(lll) == ArenaSym::S as u32 {
1926 if *remaining_steps == 0 {
1927 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1928 }
1929 // Use saturating_sub for consistency
1930 *remaining_steps = remaining_steps.saturating_sub(1);
1931
1932 let x = rightOf(ll);
1933 let y = rightOf(left);
1934 let z = right;
1935 let xz = allocCons(x, z);
1936 let yz = allocCons(y, z);
1937 curr = allocCons(xz, yz);
1938 mode = MODE_RETURN;
1939
1940 // Yield IMMEDIATELY
1941 if *remaining_steps == 0 {
1942 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1943 }
1944 continue;
1945 }
1946 }
1947 }
1948
1949 // [REDUCTION LOGIC END] -------------------------------------------
1950
1951 // No redex: PUSH frame to descend left
1952 if free_node != EMPTY {
1953 update_continuation(free_node, stack, curr, STAGE_LEFT);
1954 stack = free_node;
1955 free_node = EMPTY;
1956 } else {
1957 stack = alloc_continuation(stack, curr, STAGE_LEFT);
1958 }
1959 curr = left;
1960 mode = MODE_DESCEND;
1961 }
1962 }
1963
1964 fn step_internal(expr: u32) -> u32 {
1965 unsafe {
1966 let mut gas = u32::MAX;
1967 let mut steps = u32::MAX; // Dummy - not used for single-step calls
1968 match step_iterative(expr, EMPTY, MODE_DESCEND, &mut gas, &mut steps, EMPTY) {
1969 StepResult::Done(x) => x,
1970 StepResult::Yield(_) => expr, // unreachable with u32::MAX gas; keep total safety
1971 }
1972 }
1973 }
1974
1975 #[no_mangle]
1976 pub extern "C" fn arenaKernelStep(expr: u32) -> u32 {
1977 unsafe { ensure_arena(); }
1978 step_internal(expr)
1979 }
1980
1981 #[no_mangle]
1982 pub extern "C" fn reduce(expr: u32, max: u32) -> u32 {
1983 unsafe { ensure_arena(); }
1984 let limit = if max == 0xffff_ffff { u32::MAX } else { max };
1985 let mut cur = expr;
1986 for _ in 0..limit {
1987 let next = step_internal(cur);
1988 if next == cur {
1989 break;
1990 }
1991 cur = next;
1992 }
1993 cur
1994 }
1995
1996 // -------------------------------------------------------------------------
1997 // Host/worker ring APIs
1998 // -------------------------------------------------------------------------
1999 #[no_mangle]
2000 pub extern "C" fn hostPull() -> i64 {
2001 unsafe {
2002 if ARENA_BASE_ADDR == 0 {
2003 return -1;
2004 }
2005 if let Some(cqe) = cq_ring().try_dequeue() {
2006 // Pack: high 32 bits = req_id, low 32 bits = node_id
2007 let packed: u64 = ((cqe.req_id as u64) << 32) | (cqe.node_id as u64);
2008 packed as i64
2009 } else {
2010 -1
2011 }
2012 }
2013 }
2014
2015 /// Debug/diagnostic helper: expose ring capacity to the host.
2016 ///
2017 /// Useful for tests and for sizing stress workloads without duplicating a JS-side constant.
2018 #[no_mangle]
2019 pub extern "C" fn debugGetRingEntries() -> u32 {
2020 RING_ENTRIES
2021 }
2022
2023 #[no_mangle]
2024 /// Submit work with explicit correlation id and max reduction steps.
2025 /// Returns: 0=ok, 1=full, 2=not connected.
2026 pub extern "C" fn hostSubmit(node_id: u32, req_id: u32, max_steps: u32) -> u32 {
2027 unsafe {
2028 if ARENA_BASE_ADDR == 0 {
2029 return 2;
2030 }
2031 if sq_ring().try_enqueue(Sqe { node_id, req_id, max_steps }) {
2032 0
2033 } else {
2034 1
2035 }
2036 }
2037 }
2038
2039 /// Main worker loop for parallel SKI evaluation.
2040 ///
2041 /// Processes evaluation requests from the submission queue and posts results
2042 /// to the completion queue. Implements preemptive multitasking using gas-based
2043 /// yielding to prevent worker starvation in concurrent workloads.
2044 ///
2045 /// ## Processing Model
2046 ///
2047 /// 1. **Dequeue request** from submission queue (blocking)
2048 /// 2. **Initialize evaluation state** from request or suspension
2049 /// 3. **Iterative reduction loop** with gas-based preemption
2050 /// 4. **Yield or complete** based on gas and step limits
2051 /// 5. **Enqueue result** to completion queue
2052 /// 6. **Repeat** for next request
2053 ///
2054 /// ## Preemption Strategy
2055 ///
2056 /// - **Batch gas limit**: `20,000` AST nodes per dequeue batch
2057 /// - **Cooperative yielding**: Returns control when gas exhausted
2058 /// - **Suspension resumption**: Can restart from any yield point
2059 /// - **Fair scheduling**: Prevents any single expression from monopolizing CPU
2060 ///
2061 /// ## Step Counting Semantics
2062 ///
2063 /// - **Step decrement**: Only occurs when `StepResult::Done` is returned (step completed)
2064 /// - **Yield behavior**: When `step_iterative` yields, it saves the UN-DECREMENTED `remaining_steps`
2065 /// because the step did not finish. This is correct: the suspension captures the state before
2066 /// the step completes, so resumption continues with the same budget.
2067 /// - **Deterministic counting**: Each reduction step is counted exactly once, regardless of
2068 /// how many times the work is suspended and resumed.
2069 ///
2070 /// ## Memory Management
2071 ///
2072 /// - **Node recycling**: Reuses dead continuation frames immediately
2073 /// - **Shared arena**: All workers access the same memory space
2074 /// - **Atomic operations**: Safe concurrent access to shared structures
2075 ///
2076 /// ## Performance Optimizations
2077 ///
2078 /// - **Lock-free queues**: Minimal synchronization overhead
2079 /// - **Batched processing**: Amortizes dequeue overhead
2080 /// - **In-place updates**: Modifies nodes directly in shared memory
2081 /// - **Cache-friendly**: Sequential memory access patterns
2082 #[no_mangle]
2083 pub extern "C" fn workerLoop() {
2084 unsafe {
2085 let sq = sq_ring();
2086 let cq = cq_ring();
2087 // Per-dequeue traversal gas (preemption) to prevent starvation.
2088 // This is NOT the same as max reduction steps.
2089 let batch_gas: u32 = 20000;
2090 loop {
2091 let job = sq.dequeue_blocking();
2092 let mut curr = job.node_id;
2093 let mut stack = EMPTY;
2094 let mut mode = MODE_DESCEND;
2095 let mut remaining_steps: u32;
2096
2097 // If resuming a suspension, the suspension node ITSELF is now free to be reused!
2098 let mut free_node = EMPTY;
2099
2100 // Resume from suspension?
2101 if kindOf(curr) == ArenaKind::Suspension as u32 {
2102 let susp = curr;
2103 curr = leftOf(susp);
2104 stack = rightOf(susp);
2105 mode = symOf(susp) as u8;
2106 if mode == MODE_IO_WAIT {
2107 mode = MODE_DESCEND;
2108 }
2109 // Strict resume: Trust the suspension's counter 100%
2110 // The suspension node's hash field contains the remaining_steps that were
2111 // saved when the step yielded (before decrement, because the step didn't finish).
2112 remaining_steps = hash_of_internal(susp);
2113 free_node = susp; // <--- RECYCLE START
2114 } else {
2115 // Set strict limit from the job packet
2116 let limit = job.max_steps;
2117 remaining_steps = if limit == 0xffff_ffff { u32::MAX } else { limit };
2118 }
2119
2120 loop {
2121 // Check budget BEFORE trying a step
2122 if remaining_steps == 0 {
2123 // If we have a stack, we are deep in the tree.
2124 // We must unwind to the root to return a valid full expression.
2125 if stack != EMPTY {
2126 curr = unwind_to_root(curr, stack);
2127 stack = EMPTY;
2128 }
2129
2130 // Step budget exhausted; return (partial) result.
2131 cq.enqueue_blocking(Cqe {
2132 node_id: curr,
2133 req_id: job.req_id,
2134 _pad: 0,
2135 });
2136 break;
2137 }
2138
2139 let mut gas = batch_gas;
2140
2141 match step_iterative(curr, stack, mode, &mut gas, &mut remaining_steps, free_node) {
2142 StepResult::Yield(susp_id) => {
2143 // Yielded (gas or limit). 'susp_id' has the correct remaining count
2144 // because step_iterative updated 'remaining_steps' in place for each
2145 // reduction that occurred before yielding.
2146 cq.enqueue_blocking(Cqe {
2147 node_id: susp_id,
2148 req_id: job.req_id,
2149 _pad: 0,
2150 });
2151 break;
2152 }
2153 StepResult::Done(next_node) => {
2154 if next_node == curr {
2155 // Fixpoint reached.
2156 cq.enqueue_blocking(Cqe {
2157 node_id: curr,
2158 req_id: job.req_id,
2159 _pad: 0,
2160 });
2161 break;
2162 }
2163
2164 // Setup for next iteration
2165 curr = next_node;
2166 stack = EMPTY;
2167 mode = MODE_DESCEND;
2168 free_node = EMPTY;
2169
2170 // Loop continues; 'remaining_steps' was updated by reference in step_iterative
2171 }
2172 }
2173 }
2174 }
2175 }
2176 }
2177
2178 // -------------------------------------------------------------------------
2179 // Debug helpers
2180 // -------------------------------------------------------------------------
2181 #[no_mangle]
2182 pub extern "C" fn debugGetArenaBaseAddr() -> u32 {
2183 unsafe { ARENA_BASE_ADDR }
2184 }
2185
2186 #[no_mangle]
2187 pub extern "C" fn getArenaMode() -> u32 {
2188 unsafe { ARENA_MODE }
2189 }
2190
2191 #[no_mangle]
2192 pub extern "C" fn debugCalculateArenaSize(capacity: u32) -> u32 {
2193 SabHeader::layout(capacity).total_size
2194 }
2195
2196 #[no_mangle]
2197 pub extern "C" fn debugLockState() -> u32 {
2198 // Full resize sequence word:
2199 // - even: stable
2200 // - odd: resize in progress
2201 // - 0xFFFF_FFFF: poisoned (unrecoverable resize failure)
2202 unsafe { header().resize_seq.load(Ordering::Relaxed) }
2203 }
2204}
2205
2206// Re-export WASM symbols at crate root
2207#[cfg(target_arch = "wasm32")]
2208pub use wasm::*;