typed_ski/arena.rs
1//! # Arena-Based Memory Management for SKI Expressions (WASM, no_std)
2//!
3//! This module implements a high-performance, thread-safe arena allocator optimized
4//! for SKI combinator calculus evaluation. It supports both single-threaded heap-based
5//! allocation and multi-threaded SharedArrayBuffer (SAB) based allocation for Web Workers.
6//!
7//! ## Core Architecture
8//!
9//! ### Arena Node Types
10//!
11//! The arena uses four distinct node types to represent SKI expressions and evaluation state:
12//!
13//! - **`Terminal`**: Leaf nodes containing SKI combinators (S, K, I)
14//! - `kind = 1`, `sym` contains the combinator symbol
15//!
16//! - **`NonTerm`**: Application nodes (function application)
17//! - `kind = 2`, `left` and `right` point to subexpressions
18//! - Represents expressions of the form `(left right)`
19//!
20//! - **`Continuation`**: Stack frames for iterative reduction (optimization)
21//! - `kind = 3`, `sym` indicates reduction stage, `left`/`right` point to parent stack and node
22//! - Used by the iterative reduction algorithm to avoid recursion stack overflow
23//!
24//! - **`Suspension`**: Paused evaluation state for preemptive multitasking
25//! - `kind = 4`, `sym` contains evaluation mode, `left`/`right` contain current expression and stack
26//! - `hash` field stores remaining reduction steps for resumption
27//! - Enables cooperative multitasking across Web Workers
28//!
29//! ### Memory Layout (SharedArrayBuffer Mode)
30//!
31//! ```text
32//! +-------------------+ <-- ARENA_BASE_ADDR
33//! | SabHeader |
34//! | - Magic, offsets |
35//! | - Rings, capacity |
36//! | - Atomic counters |
37//! +-------------------+ <-- offset_sq (64-byte aligned)
38//! | Submission Ring |
39//! | (1024 entries) |
40//! +-------------------+ <-- offset_cq (64-byte aligned)
41//! | Completion Ring |
42//! | (1024 entries) |
43//! +-------------------+ <-- offset_stdin (64-byte aligned)
44//! | Stdin Ring (u8) |
45//! +-------------------+ <-- offset_stdout (64-byte aligned)
46//! | Stdout Ring (u8) |
47//! +-------------------+ <-- offset_stdin_wait (64-byte aligned)
48//! | Stdin Wait Ring |
49//! | (u32 node ids) |
50//! +-------------------+ <-- offset_kind (64-byte aligned)
51//! | Kind Array | u8[capacity] - Node types
52//! +-------------------+ <-- offset_sym
53//! | Sym Array | u8[capacity] - Symbols/modes
54//! +-------------------+ <-- offset_left_id (64-byte aligned)
55//! | Left Array | u32[capacity] - Left child pointers
56//! +-------------------+ <-- offset_right_id
57//! | Right Array | u32[capacity] - Right child pointers
58//! +-------------------+ <-- offset_hash32
59//! | Hash Array | u32[capacity] - Hash values for deduplication
60//! +-------------------+ <-- offset_next_idx
61//! | Next Array | u32[capacity] - Hash table collision chains
62//! +-------------------+ <-- offset_buckets (64-byte aligned)
63//! | Bucket Array | u32[capacity] - Hash table buckets
64//! +-------------------+ <-- offset_term_cache
65//! | Terminal Cache | u32[4] - Cached S/K/I node IDs
66//! +-------------------+ <-- End of arena
67//! ```
68//!
69//! ### Key Optimizations
70//!
71//! #### 1. Hash-Consing (Structural Sharing)
72//!
73//! - **[Hash consing](https://en.wikipedia.org/wiki/Hash_consing)** dedupes identical subexpressions to prevent redundant allocations
74//! - **Uses avalanche hash** of `(left, right)` pairs for fast lookups
75//! - **Collision resolution** via separate chaining in the bucket array
76//! - **Memory efficiency**: [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph) representation instead of tree
77//!
78//! #### 2. Iterative Reduction with Continuations
79//!
80//! - **Avoids recursion stack overflow** on deep expressions
81//! - **Continuation nodes** represent suspended stack frames
82//! - **Two-stage reduction**: left child first, then right child
83//! - **Memory reuse**: Dead continuation frames are recycled
84//!
85//! #### 3. Preemptive Multitasking (Suspensions)
86//!
87//! - **Cooperative yielding** when traversal gas is exhausted
88//! - **Suspension nodes** capture complete evaluation state
89//! - **Worker preemption** prevents starvation in parallel evaluation
90//! - **State resumption** via suspension node deserialization
91//!
92//! #### 4. Lock-Free Ring Buffers (io_uring Style)
93//!
94//! - **Submission Queue (SQ)**: Main thread -> Worker communication
95//! - **Completion Queue (CQ)**: Worker -> Main thread results
96//! - **Atomic operations** for thread-safe producer/consumer patterns
97//! - **Blocking waits** using WASM atomic wait/notify
98//!
99//! #### 5. Concurrent Resizing
100//!
101//! - **[Seqlock](https://en.wikipedia.org/wiki/Seqlock)-style synchronization** for arena growth
102//! - **Stop-the-world pauses** during resize operations
103//! - **Reverse-order copying** to handle overlapping memory regions
104//! - **Poisoning** on OOM to prevent infinite waits
105//!
106//! ### Performance Characteristics
107//!
108//! - **O(1) allocation** for new nodes (amortized)
109//! - **O(1) lookup** for existing subexpressions (hash-consing)
110//! - **O(depth) reduction** with iterative algorithm (no stack overflow)
111//! - **Lock-free communication** between main thread and workers
112//! - **Memory efficient**: ~16 bytes per node, structural sharing
113//!
114//! ### Thread Safety
115//!
116//! - **Atomic operations** for all shared state access
117//! - **Seqlock** for resize synchronization
118//! - **Separate arenas** per worker (no cross-worker sharing)
119//! - **Ring buffer fences** prevent data races in communication
120//!
121//! ### Integration with JavaScript
122//!
123//! - **[SharedArrayBuffer](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer)** enables cross-thread memory access
124//! - **Cross-origin isolation** required for SAB support
125//! - **Typed array views** provide efficient memory access from JS
126//! - **Ring polling** in main thread for completion handling
127
128#![allow(dead_code)]
129
130// =============================================================================
131// Public enums (shared with JS)
132// =============================================================================
133
134/// Arena node types supporting SKI evaluation and parallel execution.
135///
136/// See module-level documentation for detailed descriptions of each variant.
137#[repr(u8)]
138#[derive(Clone, Copy, PartialEq, Eq, Debug)]
139pub enum ArenaKind {
140 /// Terminal node containing an SKI combinator (S, K, or I).
141 /// - `sym`: The combinator symbol
142 /// - `left`/`right`: Unused (reserved)
143 Terminal = 1,
144
145 /// Application node representing function application `(left right)`.
146 /// - `left`: Function expression
147 /// - `right`: Argument expression
148 /// - `sym`: Unused (reserved)
149 NonTerm = 2,
150
151 /// Stack frame for iterative reduction algorithm.
152 /// Used to avoid recursion stack overflow on deep expressions.
153 /// - `sym`: Reduction stage (0=left child, 1=right child)
154 /// - `left`: Parent stack frame
155 /// - `right`: Parent expression node
156 Continuation = 3,
157
158 /// Paused evaluation state for preemptive multitasking.
159 /// Captures complete evaluation context for resumption.
160 /// - `sym`: Evaluation mode (0=descend, 1=return)
161 /// - `left`: Current expression being evaluated
162 /// - `right`: Evaluation stack
163 /// - `hash`: Remaining reduction steps
164 Suspension = 4,
165}
166
167/// SKI combinator symbols and evaluation state markers.
168///
169/// Used in both Terminal nodes (for combinators) and Continuation/Suspension
170/// nodes (for evaluation state).
171#[repr(u8)]
172#[derive(Clone, Copy, PartialEq, Eq, Debug)]
173pub enum ArenaSym {
174 /// S combinator: `S x y z -> x z (y z)`
175 /// The most complex combinator, enabling arbitrary computation.
176 S = 1,
177
178 /// K combinator: `K x y -> x`
179 /// The constant function, discards its second argument.
180 K = 2,
181
182 /// I combinator: `I x -> x`
183 /// The identity function, returns its argument unchanged.
184 I = 3,
185
186 /// readOne terminal: consumes one byte from stdin, passes Church numeral to continuation.
187 ReadOne = 4,
188
189 /// writeOne terminal: writes one byte to stdout, returns its argument.
190 WriteOne = 5,
191
192 // Internal-only primitives for Church decoding (not surfaced in the language).
193 Inc = 6,
194 Num = 7,
195
196 /// B combinator: `B x y z -> x (y z)
197 B = 8,
198
199 /// C combinator: `C x y z -> x z y`
200 C = 9,
201
202 /// S' combinator (Turner PSI): `S' w x y z -> w (x z) (y z)`
203 SPrime = 10,
204
205 /// B' combinator (Turner B-prime): `B' w x y z -> w x (y z)`
206 BPrime = 11,
207
208 /// C' combinator (Turner GAMMA): `C' w x y z -> w (x z) y`
209 CPrime = 12,
210}
211
212// =============================================================================
213// Non-WASM stubs (keep TS type-checking happy)
214// =============================================================================
215#[cfg(not(target_arch = "wasm32"))]
216#[no_mangle]
217pub extern "C" fn initArena(_cap: u32) -> u32 {
218 0
219}
220#[cfg(not(target_arch = "wasm32"))]
221#[no_mangle]
222pub extern "C" fn connectArena(_p: u32) -> u32 {
223 0
224}
225#[cfg(not(target_arch = "wasm32"))]
226#[no_mangle]
227pub extern "C" fn allocTerminal(_s: u32) -> u32 {
228 0
229}
230#[cfg(not(target_arch = "wasm32"))]
231#[no_mangle]
232pub extern "C" fn allocCons(_l: u32, _r: u32) -> u32 {
233 0
234}
235#[cfg(not(target_arch = "wasm32"))]
236#[no_mangle]
237pub extern "C" fn kindOf(_n: u32) -> u32 {
238 0
239}
240#[cfg(not(target_arch = "wasm32"))]
241#[no_mangle]
242pub extern "C" fn symOf(_n: u32) -> u32 {
243 0
244}
245#[cfg(not(target_arch = "wasm32"))]
246#[no_mangle]
247pub extern "C" fn leftOf(_n: u32) -> u32 {
248 0
249}
250#[cfg(not(target_arch = "wasm32"))]
251#[no_mangle]
252pub extern "C" fn rightOf(_n: u32) -> u32 {
253 0
254}
255#[cfg(not(target_arch = "wasm32"))]
256#[no_mangle]
257pub extern "C" fn reset() {}
258#[cfg(not(target_arch = "wasm32"))]
259#[no_mangle]
260pub extern "C" fn arenaKernelStep(x: u32) -> u32 {
261 x
262}
263#[cfg(not(target_arch = "wasm32"))]
264#[no_mangle]
265pub extern "C" fn hostSubmit(_id: u32) -> u32 {
266 1
267}
268#[cfg(not(target_arch = "wasm32"))]
269#[no_mangle]
270pub extern "C" fn hostPull() -> u32 {
271 u32::MAX
272}
273#[cfg(not(target_arch = "wasm32"))]
274#[no_mangle]
275pub extern "C" fn workerLoop() {}
276#[cfg(not(target_arch = "wasm32"))]
277#[no_mangle]
278pub extern "C" fn debugGetArenaBaseAddr() -> u32 {
279 0
280}
281#[cfg(not(target_arch = "wasm32"))]
282#[no_mangle]
283pub extern "C" fn getArenaMode() -> u32 {
284 0
285}
286#[cfg(not(target_arch = "wasm32"))]
287#[no_mangle]
288pub extern "C" fn debugCalculateArenaSize(_c: u32) -> u32 {
289 0
290}
291#[cfg(not(target_arch = "wasm32"))]
292#[no_mangle]
293pub extern "C" fn debugLockState() -> u32 {
294 0xffff_ffff
295}
296
297// =============================================================================
298// WASM implementation
299// =============================================================================
300#[cfg(target_arch = "wasm32")]
301mod wasm {
302 use crate::{ArenaKind, ArenaSym};
303 use core::arch::wasm32;
304 use core::cell::UnsafeCell;
305 use core::sync::atomic::{AtomicU32, AtomicU8, Ordering};
306
307 // -------------------------------------------------------------------------
308 // Constants / helpers
309 // -------------------------------------------------------------------------
310 pub const EMPTY: u32 = 0xffff_ffff;
311 const ARENA_MAGIC: u32 = 0x534B_4941; // "SKIA"
312 const INITIAL_CAP: u32 = 1 << 20;
313 const MAX_CAP: u32 = 1 << 27;
314 const WASM_PAGE_SIZE: usize = 65536;
315 // Size of SQ/CQ rings (power of two).
316 //
317 // Larger rings reduce backpressure + atomic wait/notify churn when many workers
318 // produce results faster than the host can drain them (a common cause of "stuttering"
319 // worker timelines and main-thread saturation in profiles).
320 const RING_ENTRIES: u32 = 1 << 16; // 65536
321 const TERM_CACHE_LEN: usize = 10;
322
323 #[inline(always)]
324 const fn align64(x: u32) -> u32 {
325 (x + 63) & !63
326 }
327
328 // -------------------------------------------------------------------------
329 // Atomics + wait/notify
330 // -------------------------------------------------------------------------
331 // WASM atomics / CAS safety notes (applies to all compare_exchange* uses below)
332 // -------------------------------------------------------------------------
333 //
334 // This module relies heavily on CAS (compare_exchange / compare_exchange_weak),
335 // which compiles to a single atomic read-modify-write instruction in WebAssembly
336 // (e.g. `i32.atomic.rmw.cmpxchg`). That property prevents the classic "check-then-
337 // store" race where two threads both observe a value and both write the same
338 // update thinking they won; CAS has a single winner and forces losers to retry.
339 //
340 // ABA notes:
341 // - Many CAS patterns are vulnerable to ABA when the *same* value can reappear
342 // (e.g. counters wrapping or pointer reuse). In this codebase, ABA is either
343 // explicitly prevented (ring slot sequence numbers advance by whole cycles)
344 // or is practically irrelevant in context (e.g. seqlock-style counters would
345 // require billions of expensive operations to wrap during one read section).
346 //
347 // WASM platform particularities:
348 // - When wasm threads are enabled, the linear memory is backed by a
349 // `SharedArrayBuffer`, and atomic ops synchronize across Web Workers.
350 // Rust `Ordering::{Acquire,Release,AcqRel,SeqCst}` map onto WASM atomics/fences
351 // to provide the needed visibility guarantees.
352 // - `memory.grow` increases linear memory size but does not "move" addresses
353 // from the module's perspective; what changes is our chosen layout within
354 // that address space (offsets/capacity), guarded by atomics/seqlock.
355 //
356 // External references:
357 // - Seqlock concept: https://en.wikipedia.org/wiki/Seqlock
358 // - SharedArrayBuffer (required for wasm threads): https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer
359 // - WebAssembly features (threads/atomics): https://webassembly.org/features/
360 // - Rust atomic memory orderings: https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html
361 // - WebAssembly `memory.grow`: https://developer.mozilla.org/en-US/docs/WebAssembly/Reference/Memory/Grow
362 //
363 mod sys {
364 use super::*;
365 #[inline(always)]
366 pub fn wait32(ptr: &AtomicU32, expected: u32) {
367 unsafe {
368 let _ =
369 wasm32::memory_atomic_wait32(ptr as *const _ as *mut i32, expected as i32, -1);
370 }
371 }
372 #[inline(always)]
373 pub fn notify(ptr: &AtomicU32, count: u32) {
374 unsafe {
375 let _ = wasm32::memory_atomic_notify(ptr as *const _ as *mut i32, count);
376 }
377 }
378 }
379
380 // -------------------------------------------------------------------------
381 // Ring Buffer Types (io_uring Style)
382 // -------------------------------------------------------------------------
383
384 /// Submission Queue Entry: Main thread -> Worker communication.
385 ///
386 /// Sent from main thread to worker to request evaluation of an expression.
387 /// Workers dequeue these and perform the actual reduction work.
388 #[repr(C)]
389 #[derive(Clone, Copy)]
390 pub struct Sqe {
391 /// Arena node ID of the expression to evaluate.
392 /// This is the root of the expression tree to reduce.
393 pub node_id: u32,
394
395 /// Unique request identifier for correlation.
396 /// Used to match completion queue entries back to the original request.
397 /// Must be unique across all outstanding requests.
398 pub req_id: u32,
399
400 /// Max reduction steps for this specific request.
401 /// Replaces previous _pad field. Each request carries its own immutable limit.
402 pub max_steps: u32,
403 }
404
405 /// Completion Queue Entry: Worker -> Main thread results.
406 ///
407 /// Workers enqueue these when they complete (or yield) evaluation work.
408 /// The main thread polls the completion queue to retrieve results.
409 #[repr(C)]
410 #[derive(Clone, Copy)]
411 pub struct Cqe {
412 /// Result node ID or Suspension node ID (for yields).
413 /// - If reduction completed: The fully reduced expression node
414 /// - If evaluation yielded: A Suspension node for resumption
415 pub node_id: u32,
416
417 /// Request identifier matching the original Sqe.req_id.
418 /// Used by main thread to correlate completions with pending requests.
419 pub req_id: u32,
420
421 /// Padding to align Slot<Cqe> to 16 bytes (power of 2) for efficient indexing.
422 pub _pad: u32,
423 }
424
425 /// Ring buffer slot with sequence number for ABA prevention.
426 ///
427 /// Each slot contains a sequence number and payload. The sequence number
428 /// prevents ABA problems in concurrent CAS operations by ensuring that
429 /// a slot can only be reused after a full cycle of the ring.
430 ///
431 /// ## Sequence Number Protocol
432 ///
433 /// - **Initial state**: `seq = slot_index`
434 /// - **Producer stores**: `seq = tail + 1`, payload written, `seq = tail + 1` (final)
435 /// - **Consumer loads**: Checks `seq == head + 1`, payload read, `seq = head + mask + 1`
436 ///
437 /// This ensures proper ordering and prevents race conditions.
438 #[repr(C)]
439 struct Slot<T> {
440 /// Sequence number for synchronization and ABA prevention.
441 /// Must be atomically updated to maintain memory ordering.
442 seq: AtomicU32,
443
444 /// The actual data payload stored in this slot.
445 /// Access must be synchronized with sequence number updates.
446 payload: UnsafeCell<T>,
447 }
448
449 // SAFETY: Ring<T> ensures proper synchronization of Slot<T> access.
450 // The UnsafeCell is only accessed after proper sequence number validation.
451 unsafe impl<T> Sync for Slot<T> {}
452
453 /// Lock-free, wait-free ring buffer for inter-thread communication.
454 ///
455 /// Uses [io_uring](https://en.wikipedia.org/wiki/Io_uring)-style producer/consumer pattern with atomic operations.
456 /// Supports both non-blocking (try_*) and blocking (*_blocking) operations.
457 ///
458 /// ## Design Principles
459 ///
460 /// - **Single-producer, single-consumer** per ring instance
461 /// - **Power-of-two sizing** for efficient masking operations
462 /// - **Cache-line alignment** (64-byte) to prevent false sharing
463 /// - **Atomic wait/notify** for efficient blocking operations
464 /// - **Sequence numbers** prevent ABA problems in concurrent access
465 ///
466 /// ## Memory Layout
467 ///
468 /// ```text
469 /// +----------------------+ <-- 64-byte aligned
470 /// | head: AtomicU32 | Consumer position
471 /// | not_full: AtomicU32 | Wait/notify for producers
472 /// | _pad1: [u8; 56] | Cache line padding
473 /// +----------------------+ <-- 64-byte aligned
474 /// | tail: AtomicU32 | Producer position
475 /// | not_empty: AtomicU32 | Wait/notify for consumers
476 /// | _pad2: [u8; 56] | Cache line padding
477 /// +----------------------+
478 /// | mask: u32 | entries - 1 (for fast modulo)
479 /// | entries: u32 | Ring capacity (power of 2)
480 /// +----------------------+
481 /// | slots[entries] | Array of Slot<T> entries
482 /// +----------------------+
483 /// ```
484 ///
485 /// ## Thread Safety
486 ///
487 /// - **Producer calls**: `try_enqueue()`, `enqueue_blocking()`
488 /// - **Consumer calls**: `try_dequeue()`, `dequeue_blocking()`
489 /// - **No internal locking**: Uses atomic CAS operations only
490 /// - **Wait-free progress**: No thread can be indefinitely blocked
491 #[repr(C, align(64))]
492 pub struct Ring<T> {
493 /// Consumer position (head of queue).
494 /// Only modified by consumer thread via CAS.
495 head: AtomicU32,
496
497 /// Producer wait/notify synchronization.
498 /// Used by producers to wait when ring is full.
499 not_full: AtomicU32,
500
501 /// Cache line padding to prevent false sharing with tail.
502 _pad1: [u8; 56],
503
504 /// Producer position (tail of queue).
505 /// Only modified by producer thread via CAS.
506 tail: AtomicU32,
507
508 /// Consumer wait/notify synchronization.
509 /// Used by consumers to wait when ring is empty.
510 not_empty: AtomicU32,
511
512 /// Cache line padding to prevent false sharing with head.
513 _pad2: [u8; 56],
514
515 /// Bitmask for fast modulo: `index & mask` == `index % entries`
516 mask: u32,
517
518 /// Ring capacity (must be power of 2).
519 entries: u32,
520
521 /// Zero-sized type marker for generic parameter.
522 _marker: core::marker::PhantomData<T>,
523 }
524
525 impl<T: Copy> Ring<T> {
526 /// Get pointer to the slots array following the Ring header.
527 #[inline(always)]
528 fn slots_ptr(&self) -> *const Slot<T> {
529 unsafe { (self as *const Ring<T>).add(1) as *const Slot<T> }
530 }
531
532 /// Get reference to slot at the given index (masked for wraparound).
533 #[inline(always)]
534 unsafe fn slot_at(&self, i: u32) -> &Slot<T> {
535 &*self.slots_ptr().add((i & self.mask) as usize)
536 }
537
538 /// Initialize a ring buffer at the given memory location.
539 ///
540 /// # Safety
541 ///
542 /// - `ptr` must point to sufficient memory for the ring and all slots
543 /// - `entries_pow2` must be a power of 2
544 /// - The ring must not be accessed concurrently during initialization
545 #[inline(always)]
546 pub unsafe fn init_at(ptr: *mut u8, entries_pow2: u32) -> &'static Self {
547 let ring = &mut *(ptr as *mut Ring<T>);
548 // Initialize producer/consumer positions
549 ring.head.store(0, Ordering::Relaxed);
550 ring.tail.store(0, Ordering::Relaxed);
551 // Initialize wait/notify counters
552 ring.not_empty.store(0, Ordering::Relaxed);
553 ring.not_full.store(0, Ordering::Relaxed);
554 // Set ring parameters
555 ring.entries = entries_pow2;
556 ring.mask = entries_pow2 - 1;
557 // Initialize slot sequence numbers to their indices
558 for i in 0..entries_pow2 {
559 ring.slot_at(i).seq.store(i, Ordering::Relaxed);
560 }
561 ring
562 }
563
564 /// Attempt to enqueue an item without blocking.
565 ///
566 /// Returns `true` if the item was successfully enqueued, `false` if the ring is full.
567 ///
568 /// ## Algorithm
569 ///
570 /// 1. Load current tail position
571 /// 2. Check if the corresponding slot is available (`seq == tail`)
572 /// 3. Attempt to claim the slot by CAS'ing tail forward
573 /// 4. If successful: write payload, update sequence, notify consumers
574 /// 5. If slot unavailable: check if ring is full or retry
575 ///
576 /// ## Memory Ordering
577 ///
578 /// - `Acquire` load of sequence number ensures payload visibility
579 /// - `Release` store of sequence number publishes payload to consumers
580 /// - `Release` notify ensures consumer sees all prior writes
581 ///
582 /// See [memory barriers](https://en.wikipedia.org/wiki/Memory_barrier) for details.
583 #[inline(always)]
584 pub fn try_enqueue(&self, item: T) -> bool {
585 unsafe {
586 loop {
587 // Load producer position (relaxed: no ordering requirements)
588 let t = self.tail.load(Ordering::Relaxed);
589 let slot = self.slot_at(t);
590
591 // Check if slot is available for us (Acquire: see previous publications)
592 let s = slot.seq.load(Ordering::Acquire);
593 let diff = s.wrapping_sub(t);
594
595 if diff == 0 {
596 // Slot is free. Try to claim it by advancing tail.
597 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
598 if self
599 .tail
600 .compare_exchange_weak(
601 t,
602 t.wrapping_add(1),
603 Ordering::Relaxed, // Success: no special ordering
604 Ordering::Relaxed, // Failure: no special ordering
605 )
606 .is_ok()
607 {
608 // Successfully claimed slot. Write payload and publish.
609 *slot.payload.get() = item;
610 slot.seq.store(t.wrapping_add(1), Ordering::Release);
611 // Notify waiting consumers (Release: publish all prior writes)
612 self.not_empty.fetch_add(1, Ordering::Release);
613 sys::notify(&self.not_empty, 1);
614 return true;
615 }
616 // CAS failed: another producer claimed it, retry
617 } else if (diff as i32) < 0 {
618 // Ring is full: slot sequence is too far ahead
619 return false;
620 }
621 // Slot not ready yet, retry (another iteration in progress)
622 }
623 }
624 }
625
626 /// Attempt to dequeue an item without blocking.
627 ///
628 /// Returns `Some(item)` if an item was successfully dequeued, `None` if the ring is empty.
629 ///
630 /// ## Algorithm
631 ///
632 /// 1. Load current head position
633 /// 2. Check if the corresponding slot has data (`seq == head + 1`)
634 /// 3. Attempt to claim the slot by CAS'ing head forward
635 /// 4. If successful: read payload, update sequence for reuse, notify producers
636 /// 5. If slot unavailable: check if ring is empty or retry
637 ///
638 /// ## Sequence Number Reuse
639 ///
640 /// After consuming, sequence is set to `head + mask + 1`. Since `mask = entries - 1`,
641 /// this ensures the slot won't be reused until after a full ring cycle, preventing ABA.
642 #[inline(always)]
643 pub fn try_dequeue(&self) -> Option<T> {
644 unsafe {
645 loop {
646 // Load consumer position (relaxed: no ordering requirements)
647 let h = self.head.load(Ordering::Relaxed);
648 let slot = self.slot_at(h);
649
650 // Check if slot has data for us (Acquire: see producer's publication)
651 let s = slot.seq.load(Ordering::Acquire);
652 let diff = s.wrapping_sub(h.wrapping_add(1));
653
654 if diff == 0 {
655 // Slot has data. Try to claim it by advancing head.
656 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
657 if self
658 .head
659 .compare_exchange_weak(
660 h,
661 h.wrapping_add(1),
662 Ordering::Relaxed, // Success: no special ordering
663 Ordering::Relaxed, // Failure: no special ordering
664 )
665 .is_ok()
666 {
667 // Successfully claimed slot. Read payload and release for reuse.
668 let item = *slot.payload.get();
669 // Set sequence for next cycle: h + mask + 1 prevents ABA issues
670 slot.seq.store(
671 h.wrapping_add(self.mask).wrapping_add(1),
672 Ordering::Release,
673 );
674 // Notify waiting producers (Release: publish slot availability)
675 self.not_full.fetch_add(1, Ordering::Release);
676 sys::notify(&self.not_full, 1);
677 return Some(item);
678 }
679 // CAS failed: another consumer claimed it, retry
680 } else if (diff as i32) < 0 {
681 // Ring is empty: no data available
682 return None;
683 }
684 // Slot not ready yet, retry (another operation in progress)
685 }
686 }
687 }
688
689 /// Enqueue an item, blocking until space is available.
690 ///
691 /// Uses WASM atomic wait/notify for efficient blocking when the ring is full.
692 /// The wait is interruptible and will retry the enqueue operation.
693 #[inline(always)]
694 pub fn enqueue_blocking(&self, item: T) {
695 while !self.try_enqueue(item) {
696 // Load current state and wait for notification
697 let v = self.not_full.load(Ordering::Acquire);
698 // Double-check after loading (spurious wakeup protection)
699 if self.try_enqueue(item) {
700 return;
701 }
702 // Wait for producer to notify us of available space
703 sys::wait32(&self.not_full, v);
704 }
705 }
706
707 /// Dequeue an item, blocking until data is available.
708 ///
709 /// Uses WASM atomic wait/notify for efficient blocking when the ring is empty.
710 /// The wait is interruptible and will retry the dequeue operation.
711 #[inline(always)]
712 pub fn dequeue_blocking(&self) -> T {
713 loop {
714 if let Some(x) = self.try_dequeue() {
715 return x;
716 }
717 // Load current state and wait for notification
718 let v = self.not_empty.load(Ordering::Acquire);
719 // Double-check after loading (spurious wakeup protection)
720 if let Some(x) = self.try_dequeue() {
721 return x;
722 }
723 // Wait for consumer to notify us of available data
724 sys::wait32(&self.not_empty, v);
725 }
726 }
727 }
728
729 #[inline(always)]
730 const fn ring_bytes<T>(entries: u32) -> u32 {
731 let header = core::mem::size_of::<Ring<T>>() as u32;
732 let slot = core::mem::size_of::<Slot<T>>() as u32;
733 align64(header + entries * slot)
734 }
735
736 // -------------------------------------------------------------------------
737 // Header layout (fixed offsets)
738 // -------------------------------------------------------------------------
739 struct SabLayout {
740 offset_sq: u32,
741 offset_cq: u32,
742 offset_stdin: u32,
743 offset_stdout: u32,
744 offset_stdin_wait: u32,
745 offset_kind: u32,
746 offset_sym: u32,
747 offset_left_id: u32,
748 offset_right_id: u32,
749 offset_hash32: u32,
750 offset_next_idx: u32,
751 offset_buckets: u32,
752 offset_term_cache: u32,
753 total_size: u32,
754 }
755
756 #[repr(C, align(64))]
757 struct SabHeader {
758 magic: u32,
759 ring_entries: u32,
760 ring_mask: u32,
761 offset_sq: u32,
762 offset_cq: u32,
763 offset_stdin: u32,
764 offset_stdout: u32,
765 offset_stdin_wait: u32,
766 offset_kind: u32,
767 offset_sym: u32,
768 offset_left_id: u32,
769 offset_right_id: u32,
770 offset_hash32: u32,
771 offset_next_idx: u32,
772 offset_buckets: u32,
773 offset_term_cache: u32,
774 capacity: u32,
775 bucket_mask: u32,
776 resize_seq: AtomicU32,
777 top: AtomicU32,
778 }
779
780 impl SabHeader {
781 fn layout(capacity: u32) -> SabLayout {
782 let header_size = core::mem::size_of::<SabHeader>() as u32;
783 let offset_sq = align64(header_size);
784 let offset_cq = align64(offset_sq + ring_bytes::<Sqe>(RING_ENTRIES));
785 let offset_stdin = align64(offset_cq + ring_bytes::<Cqe>(RING_ENTRIES));
786 let offset_stdout = align64(offset_stdin + ring_bytes::<u8>(RING_ENTRIES));
787 let offset_stdin_wait = align64(offset_stdout + ring_bytes::<u8>(RING_ENTRIES));
788
789 let offset_kind = align64(offset_stdin_wait + ring_bytes::<u32>(RING_ENTRIES));
790 let offset_sym = offset_kind + capacity;
791 let offset_left_id = align64(offset_sym + capacity);
792 let offset_right_id = offset_left_id + 4 * capacity;
793 let offset_hash32 = offset_right_id + 4 * capacity;
794 let offset_next_idx = offset_hash32 + 4 * capacity;
795 let offset_buckets = align64(offset_next_idx + 4 * capacity);
796 let offset_term_cache = offset_buckets + 4 * capacity;
797 let total_size = offset_term_cache + (TERM_CACHE_LEN as u32) * 4;
798 SabLayout {
799 offset_sq,
800 offset_cq,
801 offset_stdin,
802 offset_stdout,
803 offset_stdin_wait,
804 offset_kind,
805 offset_sym,
806 offset_left_id,
807 offset_right_id,
808 offset_hash32,
809 offset_next_idx,
810 offset_buckets,
811 offset_term_cache,
812 total_size,
813 }
814 }
815 }
816
817 // -------------------------------------------------------------------------
818 // Globals
819 // -------------------------------------------------------------------------
820 static mut ARENA_BASE_ADDR: u32 = 0;
821 static mut ARENA_MODE: u32 = 0;
822
823 #[inline(always)]
824 unsafe fn ensure_arena() {
825 if ARENA_BASE_ADDR != 0 {
826 return;
827 }
828 // If we were supposed to be in SAB mode but have no base, that's fatal.
829 if ARENA_MODE == 1 {
830 wasm32::unreachable();
831 }
832 let ptr = allocate_raw_arena(INITIAL_CAP);
833 if ptr.is_null() {
834 wasm32::unreachable();
835 }
836 // Heap / single-instance mode
837 ARENA_MODE = 0;
838 }
839
840 // -------------------------------------------------------------------------
841 // Helpers to locate structures
842 // -------------------------------------------------------------------------
843 #[inline(always)]
844 unsafe fn header() -> &'static SabHeader {
845 &*(ARENA_BASE_ADDR as *const SabHeader)
846 }
847
848 #[inline(always)]
849 unsafe fn header_mut() -> &'static mut SabHeader {
850 &mut *(ARENA_BASE_ADDR as *mut SabHeader)
851 }
852
853 #[inline(always)]
854 unsafe fn sq_ring() -> &'static Ring<Sqe> {
855 let h = header();
856 &*((ARENA_BASE_ADDR + h.offset_sq) as *const Ring<Sqe>)
857 }
858
859 #[inline(always)]
860 unsafe fn cq_ring() -> &'static Ring<Cqe> {
861 let h = header();
862 &*((ARENA_BASE_ADDR + h.offset_cq) as *const Ring<Cqe>)
863 }
864
865 #[inline(always)]
866 unsafe fn stdin_ring() -> &'static Ring<u8> {
867 let h = header();
868 &*((ARENA_BASE_ADDR + h.offset_stdin) as *const Ring<u8>)
869 }
870
871 #[inline(always)]
872 unsafe fn stdout_ring() -> &'static Ring<u8> {
873 let h = header();
874 &*((ARENA_BASE_ADDR + h.offset_stdout) as *const Ring<u8>)
875 }
876
877 #[inline(always)]
878 unsafe fn stdin_wait_ring() -> &'static Ring<u32> {
879 let h = header();
880 &*((ARENA_BASE_ADDR + h.offset_stdin_wait) as *const Ring<u32>)
881 }
882
883 // Array helpers
884 #[inline(always)]
885 unsafe fn kind_ptr() -> *mut AtomicU8 {
886 (ARENA_BASE_ADDR + header().offset_kind) as *mut AtomicU8
887 }
888 #[inline(always)]
889 unsafe fn sym_ptr() -> *mut AtomicU8 {
890 (ARENA_BASE_ADDR + header().offset_sym) as *mut AtomicU8
891 }
892 #[inline(always)]
893 unsafe fn left_ptr() -> *mut AtomicU32 {
894 (ARENA_BASE_ADDR + header().offset_left_id) as *mut AtomicU32
895 }
896 #[inline(always)]
897 unsafe fn right_ptr() -> *mut AtomicU32 {
898 (ARENA_BASE_ADDR + header().offset_right_id) as *mut AtomicU32
899 }
900 #[inline(always)]
901 unsafe fn hash_ptr() -> *mut AtomicU32 {
902 (ARENA_BASE_ADDR + header().offset_hash32) as *mut AtomicU32
903 }
904 #[inline(always)]
905 unsafe fn next_ptr() -> *mut AtomicU32 {
906 (ARENA_BASE_ADDR + header().offset_next_idx) as *mut AtomicU32
907 }
908 #[inline(always)]
909 unsafe fn buckets_ptr() -> *mut AtomicU32 {
910 (ARENA_BASE_ADDR + header().offset_buckets) as *mut AtomicU32
911 }
912 #[inline(always)]
913 unsafe fn term_cache_ptr() -> *mut AtomicU32 {
914 (ARENA_BASE_ADDR + header().offset_term_cache) as *mut AtomicU32
915 }
916
917 // -------------------------------------------------------------------------
918 // Hashing helpers
919 // -------------------------------------------------------------------------
920 fn avalanche32(mut x: u32) -> u32 {
921 x ^= x >> 16;
922 x = x.wrapping_mul(0x7feb_352d);
923 x ^= x >> 15;
924 x = x.wrapping_mul(0x846c_a68b);
925 x ^= x >> 16;
926 x
927 }
928 const GOLD: u32 = 0x9e37_79b9;
929 fn mix(a: u32, b: u32) -> u32 {
930 avalanche32(a ^ b.wrapping_mul(GOLD))
931 }
932
933 // If a resize fails (OOM / max cap), poison the seqlock so other threads trap
934 // instead of spinning forever on an odd seq.
935 const POISON_SEQ: u32 = 0xffff_ffff;
936
937 // -------------------------------------------------------------------------
938 // Resize guard (seqlock-style)
939 // -------------------------------------------------------------------------
940 // See "WASM atomics / CAS safety notes" above for CAS/ABA discussion.
941 // This specific guard is seqlock-style: even=stable, odd=resize in progress.
942 // For READING existing data. Returns (seq, h) to enable read-verify-retry pattern:
943 // 1. Call enter_stable() to get sequence number
944 // 2. Read the data you need
945 // 3. Call check_stable(seq) to verify sequence didn't change
946 // 4. If changed, retry (resize occurred during read)
947 // Used in: kindOf(), symOf(), leftOf(), rightOf(), hash lookups
948 #[inline(always)]
949 fn enter_stable() -> (u32, &'static SabHeader) {
950 unsafe {
951 let h = header();
952 loop {
953 let seq = h.resize_seq.load(Ordering::Acquire);
954 if seq == POISON_SEQ {
955 core::arch::wasm32::unreachable();
956 }
957 if seq & 1 == 1 {
958 core::hint::spin_loop();
959 continue;
960 }
961 return (seq, h);
962 }
963 }
964 }
965
966 // For WRITING new data. See comment above enter_stable() for distinction.
967 // Simply waits until resize completes (sequence is even).
968 // No verification needed since we're not reading existing data.
969 // Used in: allocTerminal(), allocCons() allocation path, alloc_generic()
970 #[inline(always)]
971 fn wait_resize_stable() {
972 unsafe {
973 let h = header();
974 loop {
975 let seq = h.resize_seq.load(Ordering::Acquire);
976 if seq == POISON_SEQ {
977 core::arch::wasm32::unreachable();
978 }
979 if (seq & 1) == 0 {
980 return;
981 }
982 core::hint::spin_loop();
983 }
984 }
985 }
986
987 #[inline(always)]
988 fn check_stable(seq: u32) -> bool {
989 unsafe { header().resize_seq.load(Ordering::Acquire) == seq }
990 }
991
992 // -------------------------------------------------------------------------
993 // Arena init / connect
994 // -------------------------------------------------------------------------
995 unsafe fn zero_region(start: u32, len: u32) {
996 core::ptr::write_bytes((ARENA_BASE_ADDR + start) as *mut u8, 0, len as usize);
997 }
998
999 unsafe fn init_header(capacity: u32) {
1000 let layout = SabHeader::layout(capacity);
1001 let h = &mut *(ARENA_BASE_ADDR as *mut SabHeader);
1002 h.magic = ARENA_MAGIC;
1003 h.ring_entries = RING_ENTRIES;
1004 h.ring_mask = RING_ENTRIES - 1;
1005 h.offset_sq = layout.offset_sq;
1006 h.offset_cq = layout.offset_cq;
1007 h.offset_stdin = layout.offset_stdin;
1008 h.offset_stdout = layout.offset_stdout;
1009 h.offset_stdin_wait = layout.offset_stdin_wait;
1010 h.offset_kind = layout.offset_kind;
1011 h.offset_sym = layout.offset_sym;
1012 h.offset_left_id = layout.offset_left_id;
1013 h.offset_right_id = layout.offset_right_id;
1014 h.offset_hash32 = layout.offset_hash32;
1015 h.offset_next_idx = layout.offset_next_idx;
1016 h.offset_buckets = layout.offset_buckets;
1017 h.offset_term_cache = layout.offset_term_cache;
1018 h.capacity = capacity;
1019 h.bucket_mask = capacity - 1;
1020 h.resize_seq.store(0, Ordering::Relaxed);
1021 h.top.store(0, Ordering::Relaxed);
1022
1023 zero_region(
1024 (core::mem::size_of::<SabHeader>()) as u32,
1025 layout.total_size - core::mem::size_of::<SabHeader>() as u32,
1026 );
1027
1028 Ring::<Sqe>::init_at((ARENA_BASE_ADDR + h.offset_sq) as *mut u8, RING_ENTRIES);
1029 Ring::<Cqe>::init_at((ARENA_BASE_ADDR + h.offset_cq) as *mut u8, RING_ENTRIES);
1030 Ring::<u8>::init_at((ARENA_BASE_ADDR + h.offset_stdin) as *mut u8, RING_ENTRIES);
1031 Ring::<u8>::init_at((ARENA_BASE_ADDR + h.offset_stdout) as *mut u8, RING_ENTRIES);
1032 Ring::<u32>::init_at(
1033 (ARENA_BASE_ADDR + h.offset_stdin_wait) as *mut u8,
1034 RING_ENTRIES,
1035 );
1036
1037 // Buckets + cache init
1038 let buckets = buckets_ptr();
1039 for i in 0..capacity as usize {
1040 buckets.add(i).write(AtomicU32::new(EMPTY));
1041 }
1042 let cache = term_cache_ptr();
1043 for i in 0..TERM_CACHE_LEN {
1044 cache.add(i).write(AtomicU32::new(EMPTY));
1045 }
1046 }
1047
1048 unsafe fn allocate_raw_arena(capacity: u32) -> *mut SabHeader {
1049 let layout = SabHeader::layout(capacity);
1050 let pages_needed = (layout.total_size as usize + WASM_PAGE_SIZE - 1) / WASM_PAGE_SIZE;
1051 let old_pages = wasm32::memory_grow(0, pages_needed);
1052 if old_pages == usize::MAX {
1053 return core::ptr::null_mut();
1054 }
1055 let base_addr = (old_pages * WASM_PAGE_SIZE) as u32;
1056 ARENA_BASE_ADDR = base_addr;
1057 init_header(capacity);
1058 base_addr as *mut SabHeader
1059 }
1060
1061 // -------------------------------------------------------------------------
1062 // Exports: init/connect/reset
1063 // -------------------------------------------------------------------------
1064 #[no_mangle]
1065 pub extern "C" fn initArena(initial_capacity: u32) -> u32 {
1066 if initial_capacity < 1024
1067 || initial_capacity > MAX_CAP
1068 || !initial_capacity.is_power_of_two()
1069 {
1070 return 0;
1071 }
1072 unsafe {
1073 if ARENA_BASE_ADDR != 0 {
1074 return ARENA_BASE_ADDR;
1075 }
1076 let ptr = allocate_raw_arena(initial_capacity);
1077 if ptr.is_null() {
1078 return 1;
1079 }
1080 ARENA_MODE = 1;
1081 ARENA_BASE_ADDR
1082 }
1083 }
1084
1085 #[no_mangle]
1086 pub extern "C" fn connectArena(ptr_addr: u32) -> u32 {
1087 if ptr_addr == 0 || ptr_addr % 64 != 0 {
1088 return 0;
1089 }
1090 unsafe {
1091 ARENA_BASE_ADDR = ptr_addr;
1092 ARENA_MODE = 1;
1093 let h = header();
1094 if h.magic != ARENA_MAGIC {
1095 return 5;
1096 }
1097 1
1098 }
1099 }
1100
1101 #[no_mangle]
1102 pub extern "C" fn reset() {
1103 unsafe {
1104 ensure_arena();
1105 let h = header_mut();
1106 h.top.store(0, Ordering::Release);
1107 let buckets = buckets_ptr();
1108 for i in 0..h.capacity as usize {
1109 (*buckets.add(i)).store(EMPTY, Ordering::Release);
1110 }
1111 let cache = term_cache_ptr();
1112 for i in 0..TERM_CACHE_LEN {
1113 (*cache.add(i)).store(EMPTY, Ordering::Release);
1114 }
1115 h.resize_seq
1116 .store(h.resize_seq.load(Ordering::Relaxed) & !1, Ordering::Release);
1117 }
1118 }
1119
1120 // -------------------------------------------------------------------------
1121 // Accessors
1122 // -------------------------------------------------------------------------
1123 #[no_mangle]
1124 pub extern "C" fn kindOf(n: u32) -> u32 {
1125 unsafe {
1126 ensure_arena();
1127 loop {
1128 let (seq, h) = enter_stable();
1129 let cap = h.capacity;
1130 if n >= cap {
1131 return 0;
1132 }
1133 let val = (*kind_ptr().add(n as usize)).load(Ordering::Acquire) as u32;
1134 core::sync::atomic::fence(Ordering::Acquire);
1135 if check_stable(seq) {
1136 return val;
1137 }
1138 }
1139 }
1140 }
1141
1142 #[no_mangle]
1143 pub extern "C" fn symOf(n: u32) -> u32 {
1144 unsafe {
1145 ensure_arena();
1146 loop {
1147 let (seq, h) = enter_stable();
1148 if n >= h.capacity {
1149 return 0;
1150 }
1151 let val = (*sym_ptr().add(n as usize)).load(Ordering::Acquire) as u32;
1152 core::sync::atomic::fence(Ordering::Acquire);
1153 if check_stable(seq) {
1154 return val;
1155 }
1156 }
1157 }
1158 }
1159
1160 #[no_mangle]
1161 pub extern "C" fn leftOf(n: u32) -> u32 {
1162 unsafe {
1163 ensure_arena();
1164 loop {
1165 let (seq, h) = enter_stable();
1166 if n >= h.capacity {
1167 return 0;
1168 }
1169 let val = (*left_ptr().add(n as usize)).load(Ordering::Acquire);
1170 core::sync::atomic::fence(Ordering::Acquire);
1171 if check_stable(seq) {
1172 return val;
1173 }
1174 }
1175 }
1176 }
1177
1178 #[no_mangle]
1179 pub extern "C" fn rightOf(n: u32) -> u32 {
1180 unsafe {
1181 ensure_arena();
1182 loop {
1183 let (seq, h) = enter_stable();
1184 if n >= h.capacity {
1185 return 0;
1186 }
1187 let val = (*right_ptr().add(n as usize)).load(Ordering::Acquire);
1188 core::sync::atomic::fence(Ordering::Acquire);
1189 if check_stable(seq) {
1190 return val;
1191 }
1192 }
1193 }
1194 }
1195
1196 // -------------------------------------------------------------------------
1197 // Allocation (speculative, lock-free)
1198 // -------------------------------------------------------------------------
1199 #[no_mangle]
1200 pub extern "C" fn allocTerminal(sym: u32) -> u32 {
1201 unsafe {
1202 ensure_arena();
1203 let h = header();
1204
1205 if sym < TERM_CACHE_LEN as u32 {
1206 let cached = (*term_cache_ptr().add(sym as usize)).load(Ordering::Acquire);
1207 if cached != EMPTY {
1208 return cached;
1209 }
1210 }
1211
1212 loop {
1213 wait_resize_stable();
1214 let id = h.top.fetch_add(1, Ordering::AcqRel);
1215 if id >= h.capacity {
1216 grow();
1217 continue;
1218 }
1219
1220 (*kind_ptr().add(id as usize)).store(ArenaKind::Terminal as u8, Ordering::Release);
1221 (*sym_ptr().add(id as usize)).store(sym as u8, Ordering::Release);
1222 (*hash_ptr().add(id as usize)).store(sym, Ordering::Release);
1223
1224 if sym < TERM_CACHE_LEN as u32 {
1225 (*term_cache_ptr().add(sym as usize)).store(id, Ordering::Release);
1226 }
1227 return id;
1228 }
1229 }
1230 }
1231
1232 #[no_mangle]
1233 /// Allocate a NonTerm (application) node with hash-consing optimization.
1234 ///
1235 /// This is the core optimization that enables structural sharing in the arena.
1236 /// Instead of always creating new nodes, it checks if an identical `(l r)` pair
1237 /// already exists and returns the existing node ID if found.
1238 ///
1239 /// ## Hash-Consing Algorithm
1240 ///
1241 /// 1. **Compute hash**: `mix(hash(l), hash(r))` using avalanche hashing
1242 /// 2. **Lookup in hash table**: Check bucket for existing `(l,r)` pairs
1243 /// 3. **Return existing**: If found, return the shared node ID
1244 /// 4. **Allocate new**: If not found, create new node and add to hash table
1245 ///
1246 /// ## Memory Efficiency
1247 ///
1248 /// - **DAG representation**: Common subexpressions are shared
1249 /// - **Reduced allocations**: Avoids duplicate node creation
1250 /// - **Cache-friendly**: Hash table enables O(1) lookups
1251 ///
1252 /// ## Thread Safety
1253 ///
1254 /// - Uses seqlock to handle concurrent resizing
1255 /// - Atomic operations for hash table consistency
1256 /// - CAS-based insertion prevents race conditions
1257 pub extern "C" fn allocCons(l: u32, r: u32) -> u32 {
1258 unsafe {
1259 ensure_arena();
1260
1261 // Compute hash value (doesn't depend on header state)
1262 let hl = loop {
1263 let (seq, h) = enter_stable();
1264 if l >= h.capacity {
1265 if !check_stable(seq) {
1266 continue;
1267 }
1268 return EMPTY; // Invalid left node
1269 }
1270 let val = (*hash_ptr().add(l as usize)).load(Ordering::Acquire);
1271 core::sync::atomic::fence(Ordering::Acquire);
1272 if check_stable(seq) {
1273 break val;
1274 }
1275 };
1276 let hr = loop {
1277 let (seq, h) = enter_stable();
1278 if r >= h.capacity {
1279 if !check_stable(seq) {
1280 continue;
1281 }
1282 return EMPTY; // Invalid right node
1283 }
1284 let val = (*hash_ptr().add(r as usize)).load(Ordering::Acquire);
1285 core::sync::atomic::fence(Ordering::Acquire);
1286 if check_stable(seq) {
1287 break val;
1288 }
1289 };
1290 let hval = mix(hl, hr);
1291
1292 // Retry loop for stable reads
1293 let _ = loop {
1294 let (seq, h) = enter_stable(); // Wait if resizing
1295 let mask = h.bucket_mask;
1296 let bucket_idx = (hval & mask) as usize;
1297
1298 // Validate bucket index is safe before dereferencing
1299 if bucket_idx >= h.capacity as usize {
1300 // Capacity changed mid-read? Retry.
1301 if !check_stable(seq) {
1302 continue;
1303 }
1304 // Should be unreachable if logic is correct, but safe fallback
1305 continue;
1306 }
1307
1308 let buckets = buckets_ptr();
1309 let next = next_ptr();
1310
1311 let mut cur = (*buckets.add(bucket_idx)).load(Ordering::Acquire);
1312 let mut found = EMPTY;
1313
1314 while cur != EMPTY {
1315 // Bounds check for safety
1316 if cur >= h.capacity {
1317 break;
1318 }
1319
1320 let k = (*kind_ptr().add(cur as usize)).load(Ordering::Acquire);
1321 if k == ArenaKind::NonTerm as u8 {
1322 let ch = (*hash_ptr().add(cur as usize)).load(Ordering::Acquire);
1323 if ch == hval {
1324 let cl = (*left_ptr().add(cur as usize)).load(Ordering::Acquire);
1325 let cr = (*right_ptr().add(cur as usize)).load(Ordering::Acquire);
1326 if cl == l && cr == r {
1327 found = cur;
1328 break;
1329 }
1330 }
1331 }
1332 cur = (*next.add(cur as usize)).load(Ordering::Acquire);
1333 }
1334
1335 // If we found it, verify the lock is still valid.
1336 // If lock changed, our read might be garbage -> Retry.
1337 if check_stable(seq) {
1338 if found != EMPTY {
1339 return found;
1340 }
1341 // If not found, we break to the allocation logic with bucket index
1342 break bucket_idx;
1343 }
1344 // If lock changed, retry the whole lookup
1345 };
1346
1347 // allocate new
1348 loop {
1349 wait_resize_stable();
1350
1351 // Reload pointers/mask in case they changed during wait_resize_stable()
1352 let h = header();
1353 let buckets = buckets_ptr();
1354 let current_mask = h.bucket_mask;
1355 let b = (hval & current_mask) as usize;
1356
1357 let id = h.top.fetch_add(1, Ordering::AcqRel);
1358 if id >= h.capacity {
1359 grow();
1360 continue;
1361 }
1362
1363 (*kind_ptr().add(id as usize)).store(ArenaKind::NonTerm as u8, Ordering::Release);
1364 (*left_ptr().add(id as usize)).store(l, Ordering::Release);
1365 (*right_ptr().add(id as usize)).store(r, Ordering::Release);
1366 (*hash_ptr().add(id as usize)).store(hval, Ordering::Release);
1367
1368 // insert into bucket with CAS; if we lose, drop id as hole (kind=0)
1369 let next = next_ptr();
1370 loop {
1371 let head = (*buckets.add(b)).load(Ordering::Acquire);
1372 (*next.add(id as usize)).store(head, Ordering::Relaxed);
1373 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1374 if (*buckets.add(b))
1375 .compare_exchange(head, id, Ordering::Release, Ordering::Relaxed)
1376 .is_ok()
1377 {
1378 return id;
1379 }
1380 // someone inserted; check if it matches now
1381 let mut cur2 = (*buckets.add(b)).load(Ordering::Acquire);
1382 while cur2 != EMPTY {
1383 let ck2 = (*kind_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1384 if ck2 != ArenaKind::NonTerm as u8 {
1385 cur2 = (*next.add(cur2 as usize)).load(Ordering::Acquire);
1386 continue;
1387 }
1388 let ch2 = (*hash_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1389 if ch2 == hval {
1390 let cl2 = (*left_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1391 let cr2 = (*right_ptr().add(cur2 as usize)).load(Ordering::Acquire);
1392 if cl2 == l && cr2 == r {
1393 // mark hole
1394 (*kind_ptr().add(id as usize)).store(0, Ordering::Release);
1395 return cur2;
1396 }
1397 }
1398 cur2 = (*next.add(cur2 as usize)).load(Ordering::Acquire);
1399 }
1400 }
1401 }
1402 }
1403 }
1404
1405 // Generic allocation (non hash-consed; NOT inserted into buckets).
1406 // This is used for reducer continuations/suspensions.
1407 #[inline(always)]
1408 unsafe fn alloc_generic(kind: u8, sym: u8, left: u32, right: u32, hash: u32) -> u32 {
1409 ensure_arena();
1410 let h = header();
1411 loop {
1412 wait_resize_stable();
1413 let id = h.top.fetch_add(1, Ordering::AcqRel);
1414 if id >= h.capacity {
1415 grow();
1416 continue;
1417 }
1418 // Publish payload, then kind last.
1419 (*sym_ptr().add(id as usize)).store(sym, Ordering::Release);
1420 (*left_ptr().add(id as usize)).store(left, Ordering::Release);
1421 (*right_ptr().add(id as usize)).store(right, Ordering::Release);
1422 (*hash_ptr().add(id as usize)).store(hash, Ordering::Release);
1423 (*kind_ptr().add(id as usize)).store(kind, Ordering::Release);
1424 return id;
1425 }
1426 }
1427
1428 // -------------------------------------------------------------------------
1429 // Resize (stop-the-world via odd/even seq)
1430 // -------------------------------------------------------------------------
1431 fn grow() {
1432 unsafe {
1433 let h = header_mut();
1434 let mut expected = h.resize_seq.load(Ordering::Acquire);
1435 loop {
1436 if expected & 1 == 1 {
1437 core::hint::spin_loop();
1438 expected = h.resize_seq.load(Ordering::Acquire);
1439 continue;
1440 }
1441 // Acquire exclusive "writer" by flipping even->odd with CAS.
1442 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1443 if h.resize_seq
1444 .compare_exchange(expected, expected | 1, Ordering::AcqRel, Ordering::Acquire)
1445 .is_ok()
1446 {
1447 break;
1448 }
1449 expected = h.resize_seq.load(Ordering::Acquire);
1450 }
1451
1452 let old_cap = h.capacity;
1453 // Capture OLD offsets before we overwrite the header with new ones.
1454 let old_offset_kind = h.offset_kind;
1455 let old_offset_sym = h.offset_sym;
1456 let old_offset_left = h.offset_left_id;
1457 let old_offset_right = h.offset_right_id;
1458 let old_offset_hash = h.offset_hash32;
1459 let old_offset_next = h.offset_next_idx;
1460 let old_offset_term_cache = h.offset_term_cache;
1461 let old_top = h.top.load(Ordering::Acquire);
1462
1463 if old_cap >= MAX_CAP {
1464 // Poison so other threads trap instead of spinning on odd resize_seq.
1465 h.resize_seq.store(POISON_SEQ, Ordering::Release);
1466 core::arch::wasm32::unreachable();
1467 }
1468 let new_cap = (old_cap * 2).min(MAX_CAP);
1469
1470 let layout = SabHeader::layout(new_cap);
1471 let needed_bytes = ARENA_BASE_ADDR as usize + layout.total_size as usize;
1472 let current_bytes = wasm32::memory_size(0) * WASM_PAGE_SIZE;
1473 if needed_bytes > current_bytes {
1474 let extra = needed_bytes - current_bytes;
1475 let pages = (extra + WASM_PAGE_SIZE - 1) / WASM_PAGE_SIZE;
1476 let res = wasm32::memory_grow(0, pages);
1477 if res == usize::MAX {
1478 // OOM (or denied grow). Poison so other threads trap instead of spinning.
1479 h.resize_seq.store(POISON_SEQ, Ordering::Release);
1480 core::arch::wasm32::unreachable();
1481 }
1482 }
1483
1484 // Update header (rings untouched)
1485 h.capacity = new_cap;
1486 h.bucket_mask = new_cap - 1;
1487 h.offset_sq = layout.offset_sq;
1488 h.offset_cq = layout.offset_cq;
1489 h.offset_stdin = layout.offset_stdin;
1490 h.offset_stdout = layout.offset_stdout;
1491 h.offset_stdin_wait = layout.offset_stdin_wait;
1492 h.offset_kind = layout.offset_kind;
1493 h.offset_sym = layout.offset_sym;
1494 h.offset_left_id = layout.offset_left_id;
1495 h.offset_right_id = layout.offset_right_id;
1496 h.offset_hash32 = layout.offset_hash32;
1497 h.offset_next_idx = layout.offset_next_idx;
1498 h.offset_buckets = layout.offset_buckets;
1499 h.offset_term_cache = layout.offset_term_cache;
1500
1501 // Preserve top
1502 let count = old_top.min(old_cap);
1503 h.top.store(count, Ordering::Release);
1504
1505 // IMPORTANT: reverse-copy order is mandatory.
1506 // The layout is packed contiguously, and new_cap > old_cap means new regions can overlap
1507 // old regions during migration. Copy from the end back to the front.
1508
1509 // Term cache
1510 core::ptr::copy(
1511 (ARENA_BASE_ADDR + old_offset_term_cache) as *const u8,
1512 (ARENA_BASE_ADDR + h.offset_term_cache) as *mut u8,
1513 (TERM_CACHE_LEN * 4) as usize,
1514 );
1515
1516 // Next (u32)
1517 core::ptr::copy(
1518 (ARENA_BASE_ADDR + old_offset_next) as *const u8,
1519 (ARENA_BASE_ADDR + h.offset_next_idx) as *mut u8,
1520 (count as usize) * 4,
1521 );
1522 if new_cap > old_cap {
1523 zero_region(h.offset_next_idx + old_cap * 4, (new_cap - old_cap) * 4);
1524 }
1525
1526 // Hash (u32)
1527 core::ptr::copy(
1528 (ARENA_BASE_ADDR + old_offset_hash) as *const u8,
1529 (ARENA_BASE_ADDR + h.offset_hash32) as *mut u8,
1530 (count as usize) * 4,
1531 );
1532 if new_cap > old_cap {
1533 zero_region(h.offset_hash32 + old_cap * 4, (new_cap - old_cap) * 4);
1534 }
1535
1536 // Right (u32)
1537 core::ptr::copy(
1538 (ARENA_BASE_ADDR + old_offset_right) as *const u8,
1539 (ARENA_BASE_ADDR + h.offset_right_id) as *mut u8,
1540 (count as usize) * 4,
1541 );
1542 if new_cap > old_cap {
1543 zero_region(h.offset_right_id + old_cap * 4, (new_cap - old_cap) * 4);
1544 }
1545
1546 // Left (u32)
1547 core::ptr::copy(
1548 (ARENA_BASE_ADDR + old_offset_left) as *const u8,
1549 (ARENA_BASE_ADDR + h.offset_left_id) as *mut u8,
1550 (count as usize) * 4,
1551 );
1552 if new_cap > old_cap {
1553 zero_region(h.offset_left_id + old_cap * 4, (new_cap - old_cap) * 4);
1554 }
1555
1556 // Sym (u8)
1557 core::ptr::copy(
1558 (ARENA_BASE_ADDR + old_offset_sym) as *const u8,
1559 (ARENA_BASE_ADDR + h.offset_sym) as *mut u8,
1560 count as usize,
1561 );
1562 if new_cap > old_cap {
1563 zero_region(h.offset_sym + old_cap, new_cap - old_cap);
1564 }
1565
1566 // Kind (u8)
1567 core::ptr::copy(
1568 (ARENA_BASE_ADDR + old_offset_kind) as *const u8,
1569 (ARENA_BASE_ADDR + h.offset_kind) as *mut u8,
1570 count as usize,
1571 );
1572 if new_cap > old_cap {
1573 zero_region(h.offset_kind + old_cap, new_cap - old_cap);
1574 }
1575
1576 // Rebuild buckets (hash-consing table) for NonTerm only.
1577 let buckets = buckets_ptr();
1578 let next = next_ptr();
1579 for i in 0..new_cap as usize {
1580 (*buckets.add(i)).store(EMPTY, Ordering::Release);
1581 }
1582 for i in 0..count {
1583 let k = (*kind_ptr().add(i as usize)).load(Ordering::Acquire);
1584 if k != ArenaKind::NonTerm as u8 {
1585 continue;
1586 }
1587 let hv = (*hash_ptr().add(i as usize)).load(Ordering::Acquire);
1588 let b = (hv & h.bucket_mask) as usize;
1589 loop {
1590 let head = (*buckets.add(b)).load(Ordering::Acquire);
1591 (*next.add(i as usize)).store(head, Ordering::Relaxed);
1592 // See "WASM atomics / CAS safety notes" above (cmpxchg winner/loser).
1593 if (*buckets.add(b))
1594 .compare_exchange(head, i, Ordering::Release, Ordering::Relaxed)
1595 .is_ok()
1596 {
1597 break;
1598 }
1599 }
1600 }
1601
1602 h.resize_seq.fetch_add(1, Ordering::Release);
1603 }
1604 }
1605
1606 // -------------------------------------------------------------------------
1607 // Reducer
1608 // -------------------------------------------------------------------------
1609 // Continuation frame: kind=Continuation, sym=stage, left=parent_stack, right=parent_node
1610 const STAGE_LEFT: u8 = 0;
1611 const STAGE_RIGHT: u8 = 1;
1612
1613 // Suspension: kind=Suspension, sym=mode (0=descend, 1=return), left=curr, right=stack, hash=remaining_steps
1614 const MODE_DESCEND: u8 = 0;
1615 const MODE_RETURN: u8 = 1;
1616 const MODE_IO_WAIT: u8 = 2;
1617
1618 #[inline(always)]
1619 unsafe fn alloc_continuation(parent: u32, target: u32, stage: u8) -> u32 {
1620 alloc_generic(ArenaKind::Continuation as u8, stage, parent, target, 0)
1621 }
1622
1623 #[inline(always)]
1624 unsafe fn alloc_suspension(curr: u32, stack: u32, mode: u8, remaining_steps: u32) -> u32 {
1625 alloc_generic(
1626 ArenaKind::Suspension as u8,
1627 mode,
1628 curr,
1629 stack,
1630 remaining_steps,
1631 )
1632 }
1633
1634 #[inline(always)]
1635 unsafe fn alloc_num(value: u32) -> u32 {
1636 alloc_generic(ArenaKind::Terminal as u8, ArenaSym::Num as u8, 0, 0, value)
1637 }
1638
1639 #[inline(always)]
1640 unsafe fn alloc_inc() -> u32 {
1641 allocTerminal(ArenaSym::Inc as u32)
1642 }
1643
1644 #[inline(always)]
1645 unsafe fn alloc_church_zero() -> u32 {
1646 let k = allocTerminal(ArenaSym::K as u32);
1647 let i = allocTerminal(ArenaSym::I as u32);
1648 allocCons(k, i)
1649 }
1650
1651 #[inline(always)]
1652 unsafe fn alloc_church_succ() -> u32 {
1653 let s = allocTerminal(ArenaSym::S as u32);
1654 let k = allocTerminal(ArenaSym::K as u32);
1655 let ks = allocCons(k, s);
1656 let b = allocCons(allocCons(s, ks), k); // S(KS)K
1657 allocCons(s, b) // S B
1658 }
1659
1660 #[inline(always)]
1661 unsafe fn alloc_church_byte(value: u8) -> u32 {
1662 let mut cur = alloc_church_zero();
1663 if value == 0 {
1664 return cur;
1665 }
1666 let succ = alloc_church_succ();
1667 for _ in 0..value {
1668 cur = allocCons(succ, cur);
1669 }
1670 cur
1671 }
1672
1673 #[inline(always)]
1674 unsafe fn decode_church_u32(expr: u32) -> u32 {
1675 let inc = alloc_inc();
1676 let zero = alloc_num(0);
1677 let app = allocCons(allocCons(expr, inc), zero);
1678 let reduced = reduce(app, 1_000_000);
1679 if kindOf(reduced) == ArenaKind::Terminal as u32 && symOf(reduced) == ArenaSym::Num as u32 {
1680 hash_of_internal(reduced)
1681 } else {
1682 0
1683 }
1684 }
1685
1686 // --- OPTIMIZATION: Update existing node (Slot Reuse) ---
1687 #[inline(always)]
1688 unsafe fn update_continuation(id: u32, parent: u32, target: u32, stage: u8) {
1689 (*left_ptr().add(id as usize)).store(parent, Ordering::Relaxed);
1690 (*right_ptr().add(id as usize)).store(target, Ordering::Relaxed);
1691 (*sym_ptr().add(id as usize)).store(stage, Ordering::Relaxed);
1692 // Ensure it is marked as Continuation (in case we recycled a Suspension)
1693 (*kind_ptr().add(id as usize)).store(ArenaKind::Continuation as u8, Ordering::Release);
1694 }
1695
1696 #[inline(always)]
1697 fn hash_of_internal(n: u32) -> u32 {
1698 unsafe {
1699 ensure_arena();
1700 loop {
1701 let (seq, h) = enter_stable();
1702 if n >= h.capacity {
1703 return 0;
1704 }
1705 let val = (*hash_ptr().add(n as usize)).load(Ordering::Acquire);
1706 core::sync::atomic::fence(Ordering::Acquire);
1707 if check_stable(seq) {
1708 return val;
1709 }
1710 }
1711 }
1712 }
1713
1714 /// Unwind the continuation stack to reconstruct the full expression tree.
1715 ///
1716 /// When the step limit is exhausted, the worker may be deep in the expression tree
1717 /// with a non-empty stack. This function walks up the stack, rebuilding parent nodes
1718 /// as necessary, to return the root of the expression rather than a sub-expression.
1719 #[inline(always)]
1720 unsafe fn unwind_to_root(mut curr: u32, mut stack: u32) -> u32 {
1721 while stack != EMPTY {
1722 let recycled = stack;
1723 stack = leftOf(recycled);
1724 let parent_node = rightOf(recycled);
1725 let stage = symOf(recycled) as u8;
1726
1727 if stage == STAGE_LEFT {
1728 let orig_left = leftOf(parent_node);
1729 if curr != orig_left {
1730 // Left child changed, must allocate new parent
1731 curr = allocCons(curr, rightOf(parent_node));
1732 } else {
1733 // Left child didn't change, reuse existing parent
1734 curr = parent_node;
1735 }
1736 } else {
1737 // STAGE_RIGHT
1738 let orig_right = rightOf(parent_node);
1739 if curr != orig_right {
1740 // Right child changed, must allocate new parent
1741 curr = allocCons(leftOf(parent_node), curr);
1742 } else {
1743 curr = parent_node;
1744 }
1745 }
1746 }
1747 curr
1748 }
1749
1750 enum StepResult {
1751 Done(u32),
1752 Yield(u32), // Suspension node id
1753 }
1754
1755 /// Perform one iterative reduction step with preemptive yielding.
1756 ///
1757 /// This implements the core SKI reduction algorithm using an iterative approach
1758 /// with explicit stack management instead of recursion. It can yield mid-step
1759 /// when traversal gas is exhausted, enabling cooperative multitasking.
1760 ///
1761 /// ## Iterative Reduction Algorithm
1762 ///
1763 /// Instead of recursive function calls, uses an explicit stack of Continuation nodes:
1764 ///
1765 /// - **MODE_DESCEND**: Traverse down the expression tree looking for redexes
1766 /// - **MODE_RETURN**: Return up the tree after reducing a subexpression
1767 /// - **Continuation frames**: Represent suspended stack frames as arena nodes
1768 ///
1769 /// ## Gas-Based Preemption
1770 ///
1771 /// - **Traversal gas**: Limits AST traversal depth per step
1772 /// - **Yield on exhaustion**: Returns Suspension node for later resumption
1773 /// - **Cooperative multitasking**: Prevents worker starvation in parallel evaluation
1774 ///
1775 /// ## Step Counting
1776 ///
1777 /// - **Accurate counting**: Decrements `remaining_steps` immediately when each reduction occurs
1778 /// - **Multiple reductions per call**: Can perform multiple reductions in a single call
1779 /// - **Deterministic**: Every reduction is counted exactly once, regardless of batching
1780 ///
1781 /// ## Node Recycling Optimization
1782 ///
1783 /// - **Free node reuse**: Dead continuation frames are recycled immediately
1784 /// - **Memory efficiency**: Reduces allocation pressure during reduction
1785 /// - **Cache locality**: Reuses recently freed nodes
1786 ///
1787 /// ## Parameters
1788 ///
1789 /// - `curr`: Current expression node being evaluated
1790 /// - `stack`: Stack of continuation frames (linked list of nodes)
1791 /// - `mode`: Current evaluation mode (descend/return)
1792 /// - `gas`: Remaining traversal gas (mutable, decremented during execution)
1793 /// - `remaining_steps`: Mutable reference to reduction steps remaining (decremented on each reduction)
1794 /// - `free_node`: Recyclable node ID from previous operations
1795 ///
1796 /// ## Returns
1797 ///
1798 /// - `StepResult::Done(node)`: Reduction completed, `node` is the result
1799 /// - `StepResult::Yield(susp_id)`: Yielded mid-step, `susp_id` is Suspension node
1800 unsafe fn step_iterative(
1801 mut curr: u32,
1802 mut stack: u32,
1803 mut mode: u8,
1804 gas: &mut u32,
1805 remaining_steps: &mut u32,
1806 mut free_node: u32,
1807 ) -> StepResult {
1808 loop {
1809 // Gas exhaustion yield
1810 if *gas == 0 {
1811 // If we have a free_node we didn't use, it's just a hole now.
1812 return StepResult::Yield(alloc_suspension(curr, stack, mode, *remaining_steps));
1813 }
1814 *gas -= 1;
1815
1816 if mode == MODE_RETURN {
1817 if stack == EMPTY {
1818 return StepResult::Done(curr);
1819 }
1820
1821 // POP FRAME
1822 let recycled = stack; // <--- This frame is now dead/recyclable
1823 stack = leftOf(recycled); // Parent
1824 let parent_node = rightOf(recycled);
1825 let stage = symOf(recycled) as u8;
1826
1827 if stage == STAGE_LEFT {
1828 let orig_left = leftOf(parent_node);
1829 if curr != orig_left {
1830 // Rebuild parent
1831 curr = allocCons(curr, rightOf(parent_node));
1832 // 'recycled' is still free, we are returning up.
1833 free_node = recycled; // Keep for next push or pop
1834 mode = MODE_RETURN;
1835 continue;
1836 }
1837 // Left stable, DESCEND RIGHT
1838 // Reuse 'recycled' as the new Continuation frame!
1839 update_continuation(recycled, stack, parent_node, STAGE_RIGHT);
1840 stack = recycled;
1841 mode = MODE_DESCEND;
1842 curr = rightOf(parent_node);
1843 continue;
1844 } else {
1845 let orig_right = rightOf(parent_node);
1846 if curr != orig_right {
1847 curr = allocCons(leftOf(parent_node), curr);
1848 free_node = recycled;
1849 mode = MODE_RETURN;
1850 continue;
1851 }
1852 // Both stable
1853 curr = parent_node;
1854 free_node = recycled;
1855 mode = MODE_RETURN;
1856 continue;
1857 }
1858 }
1859
1860 // MODE_DESCEND
1861 let k = kindOf(curr);
1862 if k != ArenaKind::NonTerm as u32 {
1863 mode = MODE_RETURN;
1864 continue;
1865 }
1866
1867 // NonTerm: check for I/K/S redex at this node.
1868 let left = leftOf(curr);
1869 let right = rightOf(curr);
1870
1871 // [REDUCTION LOGIC START] -----------------------------------------
1872
1873 // I x -> x
1874 if kindOf(left) == ArenaKind::Terminal as u32 && symOf(left) == ArenaSym::I as u32 {
1875 if *remaining_steps == 0 {
1876 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1877 }
1878 *remaining_steps = remaining_steps.saturating_sub(1);
1879
1880 curr = right;
1881 mode = MODE_RETURN;
1882
1883 // Yield IMMEDIATELY if limit hit zero.
1884 // Don't waste gas traversing to the next redex.
1885 if *remaining_steps == 0 {
1886 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1887 }
1888 continue;
1889 }
1890
1891 // readOne k -> k (Church byte)
1892 if kindOf(left) == ArenaKind::Terminal as u32 && symOf(left) == ArenaSym::ReadOne as u32
1893 {
1894 if let Some(byte) = stdin_ring().try_dequeue() {
1895 if *remaining_steps == 0 {
1896 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1897 }
1898 *remaining_steps = remaining_steps.saturating_sub(1);
1899
1900 let numeral = alloc_church_byte(byte);
1901 curr = allocCons(right, numeral);
1902 mode = MODE_RETURN;
1903
1904 if *remaining_steps == 0 {
1905 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1906 }
1907 continue;
1908 }
1909
1910 let susp_id = alloc_suspension(curr, stack, MODE_IO_WAIT, *remaining_steps);
1911 stdin_wait_ring().enqueue_blocking(susp_id);
1912 return StepResult::Yield(susp_id);
1913 }
1914
1915 // writeOne n -> n (enqueue byte)
1916 if kindOf(left) == ArenaKind::Terminal as u32
1917 && symOf(left) == ArenaSym::WriteOne as u32
1918 {
1919 if *remaining_steps == 0 {
1920 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1921 }
1922 *remaining_steps = remaining_steps.saturating_sub(1);
1923
1924 let value = decode_church_u32(right);
1925 let byte = (value & 0xff) as u8;
1926 stdout_ring().enqueue_blocking(byte);
1927
1928 curr = right;
1929 mode = MODE_RETURN;
1930
1931 if *remaining_steps == 0 {
1932 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1933 }
1934 continue;
1935 }
1936
1937 // Inc (Num n) -> Num (n + 1) [internal Church decoding]
1938 if kindOf(left) == ArenaKind::Terminal as u32
1939 && symOf(left) == ArenaSym::Inc as u32
1940 && kindOf(right) == ArenaKind::Terminal as u32
1941 && symOf(right) == ArenaSym::Num as u32
1942 {
1943 if *remaining_steps == 0 {
1944 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1945 }
1946 *remaining_steps = remaining_steps.saturating_sub(1);
1947
1948 let next_val = hash_of_internal(right).wrapping_add(1);
1949 curr = alloc_num(next_val);
1950 mode = MODE_RETURN;
1951
1952 if *remaining_steps == 0 {
1953 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1954 }
1955 continue;
1956 }
1957
1958 if kindOf(left) == ArenaKind::NonTerm as u32 {
1959 let ll = leftOf(left);
1960 // K x y -> x
1961 if kindOf(ll) == ArenaKind::Terminal as u32 && symOf(ll) == ArenaSym::K as u32 {
1962 if *remaining_steps == 0 {
1963 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1964 }
1965 *remaining_steps = remaining_steps.saturating_sub(1);
1966
1967 curr = rightOf(left);
1968 mode = MODE_RETURN;
1969
1970 // Yield IMMEDIATELY
1971 if *remaining_steps == 0 {
1972 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1973 }
1974 continue;
1975 }
1976
1977 if kindOf(ll) == ArenaKind::NonTerm as u32 {
1978 let lll = leftOf(ll);
1979
1980 if kindOf(lll) == ArenaKind::Terminal as u32 {
1981 let sym = symOf(lll);
1982
1983 // S x y z -> x z (y z)
1984 if sym == ArenaSym::S as u32 {
1985 if *remaining_steps == 0 {
1986 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
1987 }
1988 *remaining_steps = remaining_steps.saturating_sub(1);
1989
1990 let x = rightOf(ll);
1991 let y = rightOf(left);
1992 let z = right;
1993
1994 let xz = allocCons(x, z);
1995 let yz = allocCons(y, z);
1996 curr = allocCons(xz, yz);
1997 mode = MODE_RETURN;
1998
1999 if *remaining_steps == 0 {
2000 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
2001 }
2002 continue;
2003 }
2004
2005 // B x y z -> x (y z)
2006 if sym == ArenaSym::B as u32 {
2007 if *remaining_steps == 0 {
2008 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
2009 }
2010 *remaining_steps = remaining_steps.saturating_sub(1);
2011
2012 let x = rightOf(ll);
2013 let y = rightOf(left);
2014 let z = right;
2015 let yz = allocCons(y, z);
2016 curr = allocCons(x, yz);
2017 mode = MODE_RETURN;
2018
2019 if *remaining_steps == 0 {
2020 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
2021 }
2022 continue;
2023 }
2024
2025 // C x y z -> x z y
2026 if sym == ArenaSym::C as u32 {
2027 if *remaining_steps == 0 {
2028 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
2029 }
2030 *remaining_steps = remaining_steps.saturating_sub(1);
2031
2032 let x = rightOf(ll);
2033 let y = rightOf(left);
2034 let z = right;
2035 let xz = allocCons(x, z);
2036 curr = allocCons(xz, y);
2037 mode = MODE_RETURN;
2038
2039 if *remaining_steps == 0 {
2040 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
2041 }
2042 continue;
2043 }
2044 } else if kindOf(lll) == ArenaKind::NonTerm as u32 {
2045 let llll = leftOf(lll);
2046 if kindOf(llll) == ArenaKind::Terminal as u32 {
2047 let sym = symOf(llll);
2048
2049 // S' w x y z -> w (x z) (y z)
2050 if sym == ArenaSym::SPrime as u32 {
2051 if *remaining_steps == 0 {
2052 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
2053 }
2054 *remaining_steps = remaining_steps.saturating_sub(1);
2055
2056 let w = rightOf(lll);
2057 let x = rightOf(ll);
2058 let y = rightOf(left);
2059 let z = right;
2060
2061 let xz = allocCons(x, z);
2062 let yz = allocCons(y, z);
2063 let w_xz = allocCons(w, xz);
2064 curr = allocCons(w_xz, yz);
2065 mode = MODE_RETURN;
2066
2067 if *remaining_steps == 0 {
2068 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
2069 }
2070 continue;
2071 }
2072
2073 // B' w x y z -> w x (y z)
2074 if sym == ArenaSym::BPrime as u32 {
2075 if *remaining_steps == 0 {
2076 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
2077 }
2078 *remaining_steps = remaining_steps.saturating_sub(1);
2079
2080 let w = rightOf(lll);
2081 let x = rightOf(ll);
2082 let y = rightOf(left);
2083 let z = right;
2084
2085 let yz = allocCons(y, z);
2086 let wx = allocCons(w, x);
2087 curr = allocCons(wx, yz);
2088 mode = MODE_RETURN;
2089
2090 if *remaining_steps == 0 {
2091 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
2092 }
2093 continue;
2094 }
2095
2096 // C' w x y z -> w (x z) y
2097 if sym == ArenaSym::CPrime as u32 {
2098 if *remaining_steps == 0 {
2099 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
2100 }
2101 *remaining_steps = remaining_steps.saturating_sub(1);
2102
2103 let w = rightOf(lll);
2104 let x = rightOf(ll);
2105 let y = rightOf(left);
2106 let z = right;
2107
2108 let xz = allocCons(x, z);
2109 let wxz = allocCons(w, xz);
2110 curr = allocCons(wxz, y);
2111 mode = MODE_RETURN;
2112
2113 if *remaining_steps == 0 {
2114 return StepResult::Yield(alloc_suspension(curr, stack, mode, 0));
2115 }
2116 continue;
2117 }
2118 }
2119 }
2120 }
2121 }
2122
2123 // [REDUCTION LOGIC END] -------------------------------------------
2124
2125 // No redex: PUSH frame to descend left
2126 if free_node != EMPTY {
2127 update_continuation(free_node, stack, curr, STAGE_LEFT);
2128 stack = free_node;
2129 free_node = EMPTY;
2130 } else {
2131 stack = alloc_continuation(stack, curr, STAGE_LEFT);
2132 }
2133 curr = left;
2134 mode = MODE_DESCEND;
2135 }
2136 }
2137
2138 fn step_internal(expr: u32) -> u32 {
2139 unsafe {
2140 let mut gas = u32::MAX;
2141 let mut steps = u32::MAX; // Dummy - not used for single-step calls
2142 match step_iterative(expr, EMPTY, MODE_DESCEND, &mut gas, &mut steps, EMPTY) {
2143 StepResult::Done(x) => x,
2144 StepResult::Yield(_) => expr, // unreachable with u32::MAX gas; keep total safety
2145 }
2146 }
2147 }
2148
2149 #[no_mangle]
2150 pub extern "C" fn arenaKernelStep(expr: u32) -> u32 {
2151 unsafe {
2152 ensure_arena();
2153 }
2154 step_internal(expr)
2155 }
2156
2157 #[no_mangle]
2158 pub extern "C" fn reduce(expr: u32, max: u32) -> u32 {
2159 unsafe {
2160 ensure_arena();
2161 }
2162 let limit = if max == 0xffff_ffff { u32::MAX } else { max };
2163 let mut cur = expr;
2164 for _ in 0..limit {
2165 let next = step_internal(cur);
2166 if next == cur {
2167 break;
2168 }
2169 cur = next;
2170 }
2171 cur
2172 }
2173
2174 // -------------------------------------------------------------------------
2175 // Host/worker ring APIs
2176 // -------------------------------------------------------------------------
2177 #[no_mangle]
2178 pub extern "C" fn hostPull() -> i64 {
2179 unsafe {
2180 if ARENA_BASE_ADDR == 0 {
2181 return -1;
2182 }
2183 if let Some(cqe) = cq_ring().try_dequeue() {
2184 // Pack: high 32 bits = req_id, low 32 bits = node_id
2185 let packed: u64 = ((cqe.req_id as u64) << 32) | (cqe.node_id as u64);
2186 packed as i64
2187 } else {
2188 -1
2189 }
2190 }
2191 }
2192
2193 /// Debug/diagnostic helper: expose ring capacity to the host.
2194 ///
2195 /// Useful for tests and for sizing stress workloads without duplicating a JS-side constant.
2196 #[no_mangle]
2197 pub extern "C" fn debugGetRingEntries() -> u32 {
2198 RING_ENTRIES
2199 }
2200
2201 #[no_mangle]
2202 /// Submit work with explicit correlation id and max reduction steps.
2203 /// Returns: 0=ok, 1=full, 2=not connected.
2204 pub extern "C" fn hostSubmit(node_id: u32, req_id: u32, max_steps: u32) -> u32 {
2205 unsafe {
2206 if ARENA_BASE_ADDR == 0 {
2207 return 2;
2208 }
2209 if sq_ring().try_enqueue(Sqe {
2210 node_id,
2211 req_id,
2212 max_steps,
2213 }) {
2214 0
2215 } else {
2216 1
2217 }
2218 }
2219 }
2220
2221 /// Main worker loop for parallel SKI evaluation.
2222 ///
2223 /// Processes evaluation requests from the submission queue and posts results
2224 /// to the completion queue. Implements preemptive multitasking using gas-based
2225 /// yielding to prevent worker starvation in concurrent workloads.
2226 ///
2227 /// ## Processing Model
2228 ///
2229 /// 1. **Dequeue request** from submission queue (blocking)
2230 /// 2. **Initialize evaluation state** from request or suspension
2231 /// 3. **Iterative reduction loop** with gas-based preemption
2232 /// 4. **Yield or complete** based on gas and step limits
2233 /// 5. **Enqueue result** to completion queue
2234 /// 6. **Repeat** for next request
2235 ///
2236 /// ## Preemption Strategy
2237 ///
2238 /// - **Batch gas limit**: `20,000` AST nodes per dequeue batch
2239 /// - **Cooperative yielding**: Returns control when gas exhausted
2240 /// - **Suspension resumption**: Can restart from any yield point
2241 /// - **Fair scheduling**: Prevents any single expression from monopolizing CPU
2242 ///
2243 /// ## Step Counting Semantics
2244 ///
2245 /// - **Step decrement**: Only occurs when `StepResult::Done` is returned (step completed)
2246 /// - **Yield behavior**: When `step_iterative` yields, it saves the UN-DECREMENTED `remaining_steps`
2247 /// because the step did not finish. This is correct: the suspension captures the state before
2248 /// the step completes, so resumption continues with the same budget.
2249 /// - **Deterministic counting**: Each reduction step is counted exactly once, regardless of
2250 /// how many times the work is suspended and resumed.
2251 ///
2252 /// ## Memory Management
2253 ///
2254 /// - **Node recycling**: Reuses dead continuation frames immediately
2255 /// - **Shared arena**: All workers access the same memory space
2256 /// - **Atomic operations**: Safe concurrent access to shared structures
2257 ///
2258 /// ## Performance Optimizations
2259 ///
2260 /// - **Lock-free queues**: Minimal synchronization overhead
2261 /// - **Batched processing**: Amortizes dequeue overhead
2262 /// - **In-place updates**: Modifies nodes directly in shared memory
2263 /// - **Cache-friendly**: Sequential memory access patterns
2264 #[no_mangle]
2265 pub extern "C" fn workerLoop() {
2266 unsafe {
2267 let sq = sq_ring();
2268 let cq = cq_ring();
2269 // Per-dequeue traversal gas (preemption) to prevent starvation.
2270 // This is NOT the same as max reduction steps.
2271 let batch_gas: u32 = 20000;
2272 loop {
2273 let job = sq.dequeue_blocking();
2274 let mut curr = job.node_id;
2275 let mut stack = EMPTY;
2276 let mut mode = MODE_DESCEND;
2277 let mut remaining_steps: u32;
2278
2279 // If resuming a suspension, the suspension node ITSELF is now free to be reused!
2280 let mut free_node = EMPTY;
2281
2282 // Resume from suspension?
2283 if kindOf(curr) == ArenaKind::Suspension as u32 {
2284 let susp = curr;
2285 curr = leftOf(susp);
2286 stack = rightOf(susp);
2287 mode = symOf(susp) as u8;
2288 if mode == MODE_IO_WAIT {
2289 mode = MODE_DESCEND;
2290 }
2291 // Strict resume: Trust the suspension's counter 100%
2292 // The suspension node's hash field contains the remaining_steps that were
2293 // saved when the step yielded (before decrement, because the step didn't finish).
2294 remaining_steps = hash_of_internal(susp);
2295 free_node = susp; // <--- RECYCLE START
2296 } else {
2297 // Set strict limit from the job packet
2298 let limit = job.max_steps;
2299 remaining_steps = if limit == 0xffff_ffff {
2300 u32::MAX
2301 } else {
2302 limit
2303 };
2304 }
2305
2306 loop {
2307 // Check budget BEFORE trying a step
2308 if remaining_steps == 0 {
2309 // If we have a stack, we are deep in the tree.
2310 // We must unwind to the root to return a valid full expression.
2311 if stack != EMPTY {
2312 curr = unwind_to_root(curr, stack);
2313 stack = EMPTY;
2314 }
2315
2316 // Step budget exhausted; return (partial) result.
2317 cq.enqueue_blocking(Cqe {
2318 node_id: curr,
2319 req_id: job.req_id,
2320 _pad: 0,
2321 });
2322 break;
2323 }
2324
2325 let mut gas = batch_gas;
2326
2327 match step_iterative(
2328 curr,
2329 stack,
2330 mode,
2331 &mut gas,
2332 &mut remaining_steps,
2333 free_node,
2334 ) {
2335 StepResult::Yield(susp_id) => {
2336 // Yielded (gas or limit). 'susp_id' has the correct remaining count
2337 // because step_iterative updated 'remaining_steps' in place for each
2338 // reduction that occurred before yielding.
2339 cq.enqueue_blocking(Cqe {
2340 node_id: susp_id,
2341 req_id: job.req_id,
2342 _pad: 0,
2343 });
2344 break;
2345 }
2346 StepResult::Done(next_node) => {
2347 if next_node == curr {
2348 // Fixpoint reached.
2349 cq.enqueue_blocking(Cqe {
2350 node_id: curr,
2351 req_id: job.req_id,
2352 _pad: 0,
2353 });
2354 break;
2355 }
2356
2357 // Setup for next iteration
2358 curr = next_node;
2359 stack = EMPTY;
2360 mode = MODE_DESCEND;
2361 free_node = EMPTY;
2362
2363 // Loop continues; 'remaining_steps' was updated by reference in step_iterative
2364 }
2365 }
2366 }
2367 }
2368 }
2369 }
2370
2371 // -------------------------------------------------------------------------
2372 // Debug helpers
2373 // -------------------------------------------------------------------------
2374 #[no_mangle]
2375 pub extern "C" fn debugGetArenaBaseAddr() -> u32 {
2376 unsafe { ARENA_BASE_ADDR }
2377 }
2378
2379 #[no_mangle]
2380 pub extern "C" fn getArenaMode() -> u32 {
2381 unsafe { ARENA_MODE }
2382 }
2383
2384 #[no_mangle]
2385 pub extern "C" fn debugCalculateArenaSize(capacity: u32) -> u32 {
2386 SabHeader::layout(capacity).total_size
2387 }
2388
2389 #[no_mangle]
2390 pub extern "C" fn debugLockState() -> u32 {
2391 // Full resize sequence word:
2392 // - even: stable
2393 // - odd: resize in progress
2394 // - 0xFFFF_FFFF: poisoned (unrecoverable resize failure)
2395 unsafe { header().resize_seq.load(Ordering::Relaxed) }
2396 }
2397}
2398
2399// Re-export WASM symbols at crate root
2400#[cfg(target_arch = "wasm32")]
2401pub use wasm::*;