Skip to main content

seq_runtime/
scheduler.rs

1//! Scheduler - Green Thread Management with May
2//!
3//! CSP-style concurrency for Seq using May coroutines.
4//! Each strand is a lightweight green thread that can communicate via channels.
5//!
6//! ## Non-Blocking Guarantee
7//!
8//! Channel operations (`send`, `receive`) use May's cooperative blocking and NEVER
9//! block OS threads. However, I/O operations (`write_line`, `read_line` in io.rs)
10//! currently use blocking syscalls. Future work will make all I/O non-blocking.
11//!
12//! ## Panic Behavior
13//!
14//! Functions panic on invalid input (null stacks, negative IDs, closed channels).
15//! In a production system, consider implementing error channels or Result-based
16//! error handling instead of panicking.
17
18use crate::stack::Stack;
19use crate::tagged_stack::StackValue;
20use may::coroutine;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::{Condvar, Mutex, Once};
23use std::time::{Duration, Instant};
24
25static SCHEDULER_INIT: Once = Once::new();
26static SCHEDULER_START_TIME: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
27
28// Strand lifecycle tracking
29//
30// Design rationale:
31// - ACTIVE_STRANDS: Lock-free atomic counter for the hot path (spawn/complete)
32//   Every strand increments on spawn, decrements on complete. This is extremely
33//   fast (lock-free atomic ops) and suitable for high-frequency operations.
34//
35// - SHUTDOWN_CONDVAR/MUTEX: Event-driven synchronization for the cold path (shutdown wait)
36//   Used only when waiting for all strands to complete (program shutdown).
37//   Condvar provides event-driven wakeup instead of polling, which is critical
38//   for a systems language - no CPU waste, proper OS-level blocking.
39//
40// Why not track JoinHandles?
41// Strands are like Erlang processes - potentially hundreds of thousands of concurrent
42// entities with independent lifecycles. Storing handles would require global mutable
43// state with synchronization overhead on the hot path. The counter + condvar approach
44// keeps the hot path lock-free while providing proper shutdown synchronization.
45pub static ACTIVE_STRANDS: AtomicUsize = AtomicUsize::new(0);
46pub(crate) static SHUTDOWN_CONDVAR: Condvar = Condvar::new();
47pub(crate) static SHUTDOWN_MUTEX: Mutex<()> = Mutex::new(());
48
49// Strand lifecycle statistics (for diagnostics)
50//
51// These counters provide observability into strand lifecycle without any locking.
52// All operations are lock-free atomic increments/loads.
53//
54// - TOTAL_SPAWNED: Monotonically increasing count of all strands ever spawned
55// - TOTAL_COMPLETED: Monotonically increasing count of all strands that completed
56// - PEAK_STRANDS: High-water mark of concurrent strands (helps detect strand leaks)
57//
58// Useful diagnostics:
59// - Currently running: ACTIVE_STRANDS
60// - Completed successfully: TOTAL_COMPLETED
61// - Potential leaks: TOTAL_SPAWNED - TOTAL_COMPLETED - ACTIVE_STRANDS > 0 (strands lost)
62// - Peak concurrency: PEAK_STRANDS
63pub static TOTAL_SPAWNED: AtomicU64 = AtomicU64::new(0);
64pub static TOTAL_COMPLETED: AtomicU64 = AtomicU64::new(0);
65pub static PEAK_STRANDS: AtomicUsize = AtomicUsize::new(0);
66
67// Unique strand ID generation
68static NEXT_STRAND_ID: AtomicU64 = AtomicU64::new(1);
69
70// =============================================================================
71// Lock-Free Strand Registry (only when diagnostics feature is enabled)
72// =============================================================================
73//
74// A fixed-size array of slots for tracking active strands without locks.
75// Each slot stores a strand ID (0 = free) and spawn timestamp.
76//
77// Design principles:
78// - Fixed size: No dynamic allocation, predictable memory footprint
79// - Lock-free: All operations use atomic CAS, no mutex contention
80// - Bounded: If registry is full, strands still run but aren't tracked
81// - Zero cost when not querying: Only diagnostics reads the registry
82//
83// Slot encoding:
84// - strand_id == 0: slot is free
85// - strand_id > 0: slot contains an active strand
86//
87// The registry size can be configured via SEQ_STRAND_REGISTRY_SIZE env var.
88// Default is 1024 slots, which is sufficient for most applications.
89//
90// When the "diagnostics" feature is disabled, the registry is not compiled,
91// eliminating the SystemTime::now() syscall and O(n) scans on every spawn.
92
93#[cfg(feature = "diagnostics")]
94/// Default strand registry size (number of trackable concurrent strands)
95const DEFAULT_REGISTRY_SIZE: usize = 1024;
96
97#[cfg(feature = "diagnostics")]
98/// A slot in the strand registry
99///
100/// Uses two atomics to store strand info without locks.
101/// A slot is free when strand_id == 0.
102pub struct StrandSlot {
103    /// Strand ID (0 = free, >0 = active strand)
104    pub strand_id: AtomicU64,
105    /// Spawn timestamp (seconds since UNIX epoch, for detecting stuck strands)
106    pub spawn_time: AtomicU64,
107}
108
109#[cfg(feature = "diagnostics")]
110impl StrandSlot {
111    const fn new() -> Self {
112        Self {
113            strand_id: AtomicU64::new(0),
114            spawn_time: AtomicU64::new(0),
115        }
116    }
117}
118
119#[cfg(feature = "diagnostics")]
120/// Lock-free strand registry
121///
122/// Provides O(n) registration (scan for free slot) and O(n) unregistration.
123/// This is acceptable because:
124/// 1. N is bounded (default 1024)
125/// 2. Registration/unregistration are infrequent compared to strand work
126/// 3. No locks means no contention, just atomic ops
127pub struct StrandRegistry {
128    slots: Box<[StrandSlot]>,
129    /// Number of slots that couldn't be registered (registry full)
130    pub overflow_count: AtomicU64,
131}
132
133#[cfg(feature = "diagnostics")]
134impl StrandRegistry {
135    /// Create a new registry with the given capacity
136    fn new(capacity: usize) -> Self {
137        let mut slots = Vec::with_capacity(capacity);
138        for _ in 0..capacity {
139            slots.push(StrandSlot::new());
140        }
141        Self {
142            slots: slots.into_boxed_slice(),
143            overflow_count: AtomicU64::new(0),
144        }
145    }
146
147    /// Register a strand, returning the slot index if successful
148    ///
149    /// Uses CAS to atomically claim a free slot.
150    /// Returns None if the registry is full (strand still runs, just not tracked).
151    pub fn register(&self, strand_id: u64) -> Option<usize> {
152        let spawn_time = std::time::SystemTime::now()
153            .duration_since(std::time::UNIX_EPOCH)
154            .map(|d| d.as_secs())
155            .unwrap_or(0);
156
157        // Scan for a free slot
158        for (idx, slot) in self.slots.iter().enumerate() {
159            // Set spawn time first, before claiming the slot
160            // This prevents a race where a reader sees strand_id != 0 but spawn_time == 0
161            // If we fail to claim the slot, the owner will overwrite this value anyway
162            slot.spawn_time.store(spawn_time, Ordering::Relaxed);
163
164            // Try to claim this slot (CAS from 0 to strand_id)
165            // AcqRel ensures the spawn_time write above is visible before strand_id becomes non-zero
166            if slot
167                .strand_id
168                .compare_exchange(0, strand_id, Ordering::AcqRel, Ordering::Relaxed)
169                .is_ok()
170            {
171                return Some(idx);
172            }
173        }
174
175        // Registry full - track overflow but strand still runs
176        self.overflow_count.fetch_add(1, Ordering::Relaxed);
177        None
178    }
179
180    /// Unregister a strand by ID
181    ///
182    /// Scans for the slot containing this strand ID and clears it.
183    /// Returns true if found and cleared, false if not found.
184    ///
185    /// Note: ABA problem is not a concern here because strand IDs are monotonically
186    /// increasing u64 values. ID reuse would require 2^64 spawns, which is practically
187    /// impossible (at 1 billion spawns/sec, it would take ~584 years).
188    pub fn unregister(&self, strand_id: u64) -> bool {
189        for slot in self.slots.iter() {
190            // Check if this slot contains our strand
191            if slot
192                .strand_id
193                .compare_exchange(strand_id, 0, Ordering::AcqRel, Ordering::Relaxed)
194                .is_ok()
195            {
196                // Successfully cleared the slot
197                slot.spawn_time.store(0, Ordering::Release);
198                return true;
199            }
200        }
201        false
202    }
203
204    /// Iterate over active strands (for diagnostics)
205    ///
206    /// Returns an iterator of (strand_id, spawn_time) for non-empty slots.
207    /// Note: This is a snapshot and may be slightly inconsistent due to concurrent updates.
208    pub fn active_strands(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
209        self.slots.iter().filter_map(|slot| {
210            // Acquire on strand_id synchronizes with the Release in register()
211            let id = slot.strand_id.load(Ordering::Acquire);
212            if id > 0 {
213                // Relaxed is sufficient here - we've already synchronized via strand_id Acquire
214                // and spawn_time is written before strand_id in register()
215                let time = slot.spawn_time.load(Ordering::Relaxed);
216                Some((id, time))
217            } else {
218                None
219            }
220        })
221    }
222
223    /// Get the registry capacity
224    pub fn capacity(&self) -> usize {
225        self.slots.len()
226    }
227}
228
229// Global strand registry (lazy initialized)
230#[cfg(feature = "diagnostics")]
231static STRAND_REGISTRY: std::sync::OnceLock<StrandRegistry> = std::sync::OnceLock::new();
232
233/// Get or initialize the global strand registry
234#[cfg(feature = "diagnostics")]
235pub fn strand_registry() -> &'static StrandRegistry {
236    STRAND_REGISTRY.get_or_init(|| {
237        let size = std::env::var("SEQ_STRAND_REGISTRY_SIZE")
238            .ok()
239            .and_then(|s| s.parse().ok())
240            .unwrap_or(DEFAULT_REGISTRY_SIZE);
241        StrandRegistry::new(size)
242    })
243}
244
245/// Get elapsed time since scheduler was initialized
246pub fn scheduler_elapsed() -> Option<Duration> {
247    SCHEDULER_START_TIME.get().map(|start| start.elapsed())
248}
249
250/// Default coroutine stack size: 128KB (0x20000 bytes)
251/// Reduced from 1MB for better spawn performance (~16% faster in benchmarks).
252/// Can be overridden via SEQ_STACK_SIZE environment variable.
253const DEFAULT_STACK_SIZE: usize = 0x20000;
254
255/// Parse stack size from an optional string value.
256/// Returns the parsed size, or DEFAULT_STACK_SIZE if the value is missing, zero, or invalid.
257/// Prints a warning to stderr for invalid values.
258fn parse_stack_size(env_value: Option<String>) -> usize {
259    match env_value {
260        Some(val) => match val.parse::<usize>() {
261            Ok(0) => {
262                eprintln!(
263                    "Warning: SEQ_STACK_SIZE=0 is invalid, using default {}",
264                    DEFAULT_STACK_SIZE
265                );
266                DEFAULT_STACK_SIZE
267            }
268            Ok(size) => size,
269            Err(_) => {
270                eprintln!(
271                    "Warning: SEQ_STACK_SIZE='{}' is not a valid number, using default {}",
272                    val, DEFAULT_STACK_SIZE
273                );
274                DEFAULT_STACK_SIZE
275            }
276        },
277        None => DEFAULT_STACK_SIZE,
278    }
279}
280
281/// Default coroutine pool capacity.
282/// May reuses completed coroutine stacks from this pool to avoid allocations.
283/// Default of 1000 is often too small for spawn-heavy workloads.
284const DEFAULT_POOL_CAPACITY: usize = 10000;
285
286/// Initialize the scheduler.
287///
288/// # Safety
289/// Safe to call multiple times (idempotent via Once).
290/// Configures May coroutines with appropriate stack size for LLVM-generated code.
291#[unsafe(no_mangle)]
292pub unsafe extern "C" fn patch_seq_scheduler_init() {
293    SCHEDULER_INIT.call_once(|| {
294        // Configure stack size for coroutines
295        // Default is 128KB, reduced from 1MB for better spawn performance.
296        // Can be overridden via SEQ_STACK_SIZE environment variable (in bytes)
297        // Example: SEQ_STACK_SIZE=2097152 for 2MB
298        // Invalid values (non-numeric, zero) are warned and ignored.
299        let stack_size = parse_stack_size(std::env::var("SEQ_STACK_SIZE").ok());
300
301        // Configure coroutine pool capacity
302        // May reuses coroutine stacks from this pool to reduce allocation overhead.
303        // Default 10000 is 10x May's default (1000), better for spawn-heavy workloads.
304        // Can be overridden via SEQ_POOL_CAPACITY environment variable.
305        let pool_capacity = std::env::var("SEQ_POOL_CAPACITY")
306            .ok()
307            .and_then(|s| s.parse().ok())
308            .filter(|&v| v > 0)
309            .unwrap_or(DEFAULT_POOL_CAPACITY);
310
311        may::config()
312            .set_stack_size(stack_size)
313            .set_pool_capacity(pool_capacity);
314
315        // Record scheduler start time (for at-exit reporting)
316        SCHEDULER_START_TIME.get_or_init(Instant::now);
317
318        // Install SIGINT handler for Ctrl-C (unconditional - basic expected behavior)
319        // Without this, tight loops won't respond to Ctrl-C because signals
320        // are only delivered at syscall boundaries, and TCO loops may never syscall.
321        #[cfg(unix)]
322        {
323            use std::sync::atomic::{AtomicBool, Ordering};
324            static SIGINT_RECEIVED: AtomicBool = AtomicBool::new(false);
325
326            extern "C" fn sigint_handler(_: libc::c_int) {
327                // If we receive SIGINT twice, force exit (user is insistent)
328                if SIGINT_RECEIVED.swap(true, Ordering::SeqCst) {
329                    // Second SIGINT - exit immediately
330                    unsafe { libc::_exit(130) }; // 128 + 2 (SIGINT)
331                }
332                // First SIGINT - exit cleanly
333                std::process::exit(130);
334            }
335
336            unsafe {
337                libc::signal(
338                    libc::SIGINT,
339                    sigint_handler as *const () as libc::sighandler_t,
340                );
341            }
342        }
343
344        // Install SIGQUIT handler for runtime diagnostics (kill -3)
345        #[cfg(feature = "diagnostics")]
346        crate::diagnostics::install_signal_handler();
347
348        // Install watchdog timer (if enabled via SEQ_WATCHDOG_SECS)
349        #[cfg(feature = "diagnostics")]
350        crate::watchdog::install_watchdog();
351    });
352}
353
354/// Run the scheduler and wait for all coroutines to complete
355///
356/// # Safety
357/// Returns the final stack (always null for now since May handles all scheduling).
358/// This function blocks until all spawned strands have completed.
359///
360/// Uses a condition variable for event-driven shutdown synchronization rather than
361/// polling. The mutex is only held during the wait protocol, not during strand
362/// execution, so there's no contention on the hot path.
363#[unsafe(no_mangle)]
364pub unsafe extern "C" fn patch_seq_scheduler_run() -> Stack {
365    let mut guard = SHUTDOWN_MUTEX.lock().expect(
366        "scheduler_run: shutdown mutex poisoned - strand panicked during shutdown synchronization",
367    );
368
369    // Wait for all strands to complete
370    // The condition variable will be notified when the last strand exits
371    while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
372        guard = SHUTDOWN_CONDVAR
373            .wait(guard)
374            .expect("scheduler_run: condvar wait failed - strand panicked during shutdown wait");
375    }
376
377    // All strands have completed
378    std::ptr::null_mut()
379}
380
381/// Shutdown the scheduler
382///
383/// # Safety
384/// Safe to call. May doesn't require explicit shutdown, so this is a no-op.
385#[unsafe(no_mangle)]
386pub unsafe extern "C" fn patch_seq_scheduler_shutdown() {
387    // May doesn't require explicit shutdown
388    // This function exists for API symmetry with init
389}
390
391/// Spawn a strand (coroutine) with initial stack
392///
393/// # Safety
394/// - `entry` must be a valid function pointer that can safely execute on any thread
395/// - `initial_stack` must be either null or a valid pointer to a `StackValue` that:
396///   - Was heap-allocated (e.g., via Box)
397///   - Has a 'static lifetime or lives longer than the coroutine
398///   - Is safe to access from the spawned thread
399/// - The caller transfers ownership of `initial_stack` to the coroutine
400/// - Returns a unique strand ID (positive integer)
401///
402/// # Memory Management
403/// The spawned coroutine takes ownership of `initial_stack` and will automatically
404/// free the final stack returned by `entry` upon completion.
405#[unsafe(no_mangle)]
406pub unsafe extern "C" fn patch_seq_strand_spawn(
407    entry: extern "C" fn(Stack) -> Stack,
408    initial_stack: Stack,
409) -> i64 {
410    // For backwards compatibility, use null base (won't support nested spawns)
411    unsafe { patch_seq_strand_spawn_with_base(entry, initial_stack, std::ptr::null_mut()) }
412}
413
414/// Spawn a strand (coroutine) with initial stack and explicit stack base
415///
416/// This variant allows setting the STACK_BASE for the spawned strand, which is
417/// required for the child to perform operations like clone_stack (nested spawn).
418///
419/// # Safety
420/// - `entry` must be a valid function pointer that can safely execute on any thread
421/// - `initial_stack` must be a valid pointer to a `StackValue` array
422/// - `stack_base` must be the base of the stack (or null to skip setting STACK_BASE)
423/// - The caller transfers ownership of `initial_stack` to the coroutine
424/// - Returns a unique strand ID (positive integer)
425#[unsafe(no_mangle)]
426pub unsafe extern "C" fn patch_seq_strand_spawn_with_base(
427    entry: extern "C" fn(Stack) -> Stack,
428    initial_stack: Stack,
429    stack_base: Stack,
430) -> i64 {
431    // Generate unique strand ID
432    let strand_id = NEXT_STRAND_ID.fetch_add(1, Ordering::Relaxed);
433
434    // Increment active strand counter and track total spawned
435    let new_count = ACTIVE_STRANDS.fetch_add(1, Ordering::Release) + 1;
436    TOTAL_SPAWNED.fetch_add(1, Ordering::Relaxed);
437
438    // Update peak strands if this is a new high-water mark
439    // Uses a CAS loop to safely update the maximum without locks
440    // Uses Acquire/Release ordering for proper synchronization with diagnostics reads
441    let mut peak = PEAK_STRANDS.load(Ordering::Acquire);
442    while new_count > peak {
443        match PEAK_STRANDS.compare_exchange_weak(
444            peak,
445            new_count,
446            Ordering::Release,
447            Ordering::Relaxed,
448        ) {
449            Ok(_) => break,
450            Err(current) => peak = current,
451        }
452    }
453
454    // Register strand in the registry (for diagnostics visibility)
455    // If registry is full, strand still runs but isn't tracked
456    #[cfg(feature = "diagnostics")]
457    let _ = strand_registry().register(strand_id);
458
459    // Function pointers are already Send, no wrapper needed
460    let entry_fn = entry;
461
462    // Convert pointers to usize (which is Send)
463    // This is necessary because *mut T is !Send, but the caller guarantees thread safety
464    let stack_addr = initial_stack as usize;
465    let base_addr = stack_base as usize;
466
467    unsafe {
468        coroutine::spawn(move || {
469            // Reconstruct pointers from addresses
470            let stack_ptr = stack_addr as *mut StackValue;
471            let base_ptr = base_addr as *mut StackValue;
472
473            // Debug assertion: validate stack pointer alignment and reasonable address
474            debug_assert!(
475                stack_ptr.is_null()
476                    || stack_addr.is_multiple_of(std::mem::align_of::<StackValue>()),
477                "Stack pointer must be null or properly aligned"
478            );
479            debug_assert!(
480                stack_ptr.is_null() || stack_addr > 0x1000,
481                "Stack pointer appears to be in invalid memory region (< 0x1000)"
482            );
483
484            // Set STACK_BASE for this strand if provided
485            // This enables nested spawns and other operations that need clone_stack
486            if !base_ptr.is_null() {
487                crate::stack::patch_seq_set_stack_base(base_ptr);
488            }
489
490            // Execute the entry function
491            let final_stack = entry_fn(stack_ptr);
492
493            // Clean up the final stack to prevent memory leak
494            free_stack(final_stack);
495
496            // Unregister strand from registry (uses captured strand_id)
497            #[cfg(feature = "diagnostics")]
498            strand_registry().unregister(strand_id);
499
500            // Decrement active strand counter first, then track completion
501            // This ordering ensures the invariant SPAWNED = COMPLETED + ACTIVE + lost
502            // is never violated from an external observer's perspective
503            // Use AcqRel to establish proper synchronization (both acquire and release barriers)
504            let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
505
506            // Track completion after decrementing active count
507            TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
508            if prev_count == 1 {
509                // We were the last strand - acquire mutex and signal shutdown
510                // The mutex must be held when calling notify to prevent missed wakeups
511                let _guard = SHUTDOWN_MUTEX.lock()
512                    .expect("strand_spawn: shutdown mutex poisoned - strand panicked during shutdown notification");
513                SHUTDOWN_CONDVAR.notify_all();
514            }
515        });
516    }
517
518    strand_id as i64
519}
520
521/// Free a stack allocated by the runtime
522///
523/// With the tagged stack implementation, stack cleanup is handled differently.
524/// The contiguous array is freed when the TaggedStack is dropped.
525/// This function just resets the thread-local arena.
526///
527/// # Safety
528/// Stack pointer must be valid or null.
529fn free_stack(_stack: Stack) {
530    // With tagged stack, the array is freed when TaggedStack is dropped.
531    // We just need to reset the arena for thread-local strings.
532
533    // Reset the thread-local arena to free all arena-allocated strings
534    // This is safe because:
535    // - Any arena strings in Values have been dropped above
536    // - Global strings are unaffected (they have their own allocations)
537    // - Channel sends clone to global, so no cross-strand arena pointers
538    crate::arena::arena_reset();
539}
540
541/// Legacy spawn_strand function (kept for compatibility)
542///
543/// # Safety
544/// `entry` must be a valid function pointer that can safely execute on any thread.
545#[unsafe(no_mangle)]
546pub unsafe extern "C" fn patch_seq_spawn_strand(entry: extern "C" fn(Stack) -> Stack) {
547    unsafe {
548        patch_seq_strand_spawn(entry, std::ptr::null_mut());
549    }
550}
551
552/// Yield execution to allow other coroutines to run
553///
554/// # Safety
555/// Always safe to call from within a May coroutine.
556#[unsafe(no_mangle)]
557pub unsafe extern "C" fn patch_seq_yield_strand(stack: Stack) -> Stack {
558    coroutine::yield_now();
559    stack
560}
561
562// =============================================================================
563// Cooperative Yield Safety Valve
564// =============================================================================
565//
566// Prevents tight TCO loops from starving other strands and making the process
567// unresponsive. When enabled via SEQ_YIELD_INTERVAL, yields after N tail calls.
568//
569// Configuration:
570//   SEQ_YIELD_INTERVAL=10000  - Yield every 10,000 tail calls (default: 0 = disabled)
571//
572// Scope:
573//   - Covers: User-defined word tail calls (musttail) and quotation tail calls
574//   - Does NOT cover: Closure calls (they use regular calls, bounded by stack)
575//   - Does NOT cover: Non-tail recursive calls (bounded by stack)
576//   This is intentional: the safety valve targets unbounded TCO loops.
577//
578// Design:
579//   - Zero overhead when disabled (threshold=0 short-circuits immediately)
580//   - Thread-local counter avoids synchronization overhead
581//   - Called before every musttail in generated code
582//   - Threshold is cached on first access via OnceLock
583//
584// Thread-Local Counter Behavior:
585//   The counter is per-OS-thread, not per-coroutine. Multiple coroutines on the
586//   same OS thread share the counter, which may cause yields slightly more
587//   frequently than the configured interval. This is intentional:
588//   - Avoids coroutine-local storage overhead
589//   - Still achieves the goal of preventing starvation
590//   - Actual yield frequency is still bounded by the threshold
591
592use std::cell::Cell;
593use std::sync::OnceLock;
594
595/// Cached yield interval threshold (0 = disabled)
596static YIELD_THRESHOLD: OnceLock<u64> = OnceLock::new();
597
598thread_local! {
599    /// Per-thread tail call counter
600    static TAIL_CALL_COUNTER: Cell<u64> = const { Cell::new(0) };
601}
602
603/// Get the yield threshold from environment (cached)
604///
605/// Returns 0 (disabled) if SEQ_YIELD_INTERVAL is not set or invalid.
606/// Prints a warning to stderr if the value is set but invalid.
607fn get_yield_threshold() -> u64 {
608    *YIELD_THRESHOLD.get_or_init(|| {
609        match std::env::var("SEQ_YIELD_INTERVAL") {
610            Ok(s) if s.is_empty() => 0,
611            Ok(s) => match s.parse::<u64>() {
612                Ok(n) => n,
613                Err(_) => {
614                    eprintln!(
615                        "Warning: SEQ_YIELD_INTERVAL='{}' is not a valid positive integer, yield safety valve disabled",
616                        s
617                    );
618                    0
619                }
620            },
621            Err(_) => 0,
622        }
623    })
624}
625
626/// Maybe yield to other coroutines based on tail call count
627///
628/// Called before every tail call in generated code. When SEQ_YIELD_INTERVAL
629/// is set, yields after that many tail calls to prevent starvation.
630///
631/// # Performance
632/// - Disabled (default): Single branch on cached threshold (< 1ns)
633/// - Enabled: Increment + compare + occasional yield (~10-20ns average)
634///
635/// # Safety
636/// Always safe to call. No-op when not in a May coroutine context.
637#[unsafe(no_mangle)]
638pub extern "C" fn patch_seq_maybe_yield() {
639    let threshold = get_yield_threshold();
640
641    // Fast path: disabled
642    if threshold == 0 {
643        return;
644    }
645
646    TAIL_CALL_COUNTER.with(|counter| {
647        let count = counter.get().wrapping_add(1);
648        counter.set(count);
649
650        if count >= threshold {
651            counter.set(0);
652            coroutine::yield_now();
653        }
654    });
655}
656
657/// Wait for all strands to complete
658///
659/// # Safety
660/// Always safe to call. Blocks until all spawned strands have completed.
661///
662/// Uses event-driven synchronization via condition variable - no polling overhead.
663#[unsafe(no_mangle)]
664pub unsafe extern "C" fn patch_seq_wait_all_strands() {
665    let mut guard = SHUTDOWN_MUTEX.lock()
666        .expect("wait_all_strands: shutdown mutex poisoned - strand panicked during shutdown synchronization");
667
668    // Wait for all strands to complete
669    // The condition variable will be notified when the last strand exits
670    while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
671        guard = SHUTDOWN_CONDVAR
672            .wait(guard)
673            .expect("wait_all_strands: condvar wait failed - strand panicked during shutdown wait");
674    }
675}
676
677// Public re-exports with short names for internal use
678pub use patch_seq_maybe_yield as maybe_yield;
679pub use patch_seq_scheduler_init as scheduler_init;
680pub use patch_seq_scheduler_run as scheduler_run;
681pub use patch_seq_scheduler_shutdown as scheduler_shutdown;
682pub use patch_seq_spawn_strand as spawn_strand;
683pub use patch_seq_strand_spawn as strand_spawn;
684pub use patch_seq_wait_all_strands as wait_all_strands;
685pub use patch_seq_yield_strand as yield_strand;
686
687#[cfg(test)]
688mod tests {
689    use super::*;
690    use crate::stack::push;
691    use crate::value::Value;
692    use std::sync::atomic::{AtomicU32, Ordering};
693
694    #[test]
695    fn test_spawn_strand() {
696        unsafe {
697            static COUNTER: AtomicU32 = AtomicU32::new(0);
698
699            extern "C" fn test_entry(_stack: Stack) -> Stack {
700                COUNTER.fetch_add(1, Ordering::SeqCst);
701                std::ptr::null_mut()
702            }
703
704            for _ in 0..100 {
705                spawn_strand(test_entry);
706            }
707
708            std::thread::sleep(std::time::Duration::from_millis(200));
709            assert_eq!(COUNTER.load(Ordering::SeqCst), 100);
710        }
711    }
712
713    #[test]
714    fn test_scheduler_init_idempotent() {
715        unsafe {
716            // Should be safe to call multiple times
717            scheduler_init();
718            scheduler_init();
719            scheduler_init();
720        }
721    }
722
723    #[test]
724    fn test_free_stack_null() {
725        // Freeing null should be a no-op
726        free_stack(std::ptr::null_mut());
727    }
728
729    #[test]
730    fn test_free_stack_valid() {
731        unsafe {
732            // Create a stack, then free it
733            let stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
734            free_stack(stack);
735            // If we get here without crashing, test passed
736        }
737    }
738
739    #[test]
740    fn test_strand_spawn_with_stack() {
741        unsafe {
742            static COUNTER: AtomicU32 = AtomicU32::new(0);
743
744            extern "C" fn test_entry(stack: Stack) -> Stack {
745                COUNTER.fetch_add(1, Ordering::SeqCst);
746                // Return the stack as-is (caller will free it)
747                stack
748            }
749
750            let initial_stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
751            strand_spawn(test_entry, initial_stack);
752
753            std::thread::sleep(std::time::Duration::from_millis(200));
754            assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
755        }
756    }
757
758    #[test]
759    fn test_scheduler_shutdown() {
760        unsafe {
761            scheduler_init();
762            scheduler_shutdown();
763            // Should not crash
764        }
765    }
766
767    #[test]
768    fn test_many_strands_stress() {
769        unsafe {
770            static COUNTER: AtomicU32 = AtomicU32::new(0);
771
772            extern "C" fn increment(_stack: Stack) -> Stack {
773                COUNTER.fetch_add(1, Ordering::SeqCst);
774                std::ptr::null_mut()
775            }
776
777            // Reset counter for this test
778            COUNTER.store(0, Ordering::SeqCst);
779
780            // Spawn many strands to stress test synchronization
781            for _ in 0..1000 {
782                strand_spawn(increment, std::ptr::null_mut());
783            }
784
785            // Wait for all to complete
786            wait_all_strands();
787
788            // Verify all strands executed
789            assert_eq!(COUNTER.load(Ordering::SeqCst), 1000);
790        }
791    }
792
793    #[test]
794    fn test_strand_ids_are_unique() {
795        unsafe {
796            use std::collections::HashSet;
797
798            extern "C" fn noop(_stack: Stack) -> Stack {
799                std::ptr::null_mut()
800            }
801
802            // Spawn strands and collect their IDs
803            let mut ids = Vec::new();
804            for _ in 0..100 {
805                let id = strand_spawn(noop, std::ptr::null_mut());
806                ids.push(id);
807            }
808
809            // Wait for completion
810            wait_all_strands();
811
812            // Verify all IDs are unique
813            let unique_ids: HashSet<_> = ids.iter().collect();
814            assert_eq!(unique_ids.len(), 100, "All strand IDs should be unique");
815
816            // Verify all IDs are positive
817            assert!(
818                ids.iter().all(|&id| id > 0),
819                "All strand IDs should be positive"
820            );
821        }
822    }
823
824    #[test]
825    fn test_arena_reset_with_strands() {
826        unsafe {
827            use crate::arena;
828            use crate::seqstring::arena_string;
829
830            extern "C" fn create_temp_strings(stack: Stack) -> Stack {
831                // Create many temporary arena strings (simulating request parsing)
832                for i in 0..100 {
833                    let temp = arena_string(&format!("temporary string {}", i));
834                    // Use the string temporarily
835                    assert!(!temp.as_str().is_empty());
836                    // String is dropped, but memory stays in arena
837                }
838
839                // Arena should have allocated memory
840                let stats = arena::arena_stats();
841                assert!(stats.allocated_bytes > 0, "Arena should have allocations");
842
843                stack // Return empty stack
844            }
845
846            // Reset arena before test
847            arena::arena_reset();
848
849            // Spawn strand that creates many temp strings
850            strand_spawn(create_temp_strings, std::ptr::null_mut());
851
852            // Wait for strand to complete (which calls free_stack -> arena_reset)
853            wait_all_strands();
854
855            // After strand exits, arena should be reset
856            let stats_after = arena::arena_stats();
857            assert_eq!(
858                stats_after.allocated_bytes, 0,
859                "Arena should be reset after strand exits"
860            );
861        }
862    }
863
864    #[test]
865    fn test_arena_with_channel_send() {
866        unsafe {
867            use crate::channel::{close_channel, make_channel, receive, send};
868            use crate::stack::{pop, push};
869            use crate::value::Value;
870            use std::sync::Arc;
871            use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
872
873            static RECEIVED_COUNT: AtomicU32 = AtomicU32::new(0);
874            static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
875
876            // Create channel
877            let stack = crate::stack::alloc_test_stack();
878            let stack = make_channel(stack);
879            let (stack, chan_val) = pop(stack);
880            let channel = match chan_val {
881                Value::Channel(ch) => ch,
882                _ => panic!("Expected Channel"),
883            };
884
885            // Store channel pointer for strands
886            let ch_ptr = Arc::as_ptr(&channel) as i64;
887            CHANNEL_PTR.store(ch_ptr, Ordering::Release);
888
889            // Keep Arc alive
890            std::mem::forget(channel.clone());
891            std::mem::forget(channel.clone());
892
893            // Sender strand: creates arena string, sends through channel
894            extern "C" fn sender(_stack: Stack) -> Stack {
895                use crate::seqstring::arena_string;
896                use crate::value::ChannelData;
897                use std::sync::Arc;
898
899                unsafe {
900                    let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
901                    let channel = Arc::from_raw(ch_ptr);
902                    let channel_clone = Arc::clone(&channel);
903                    std::mem::forget(channel); // Don't drop
904
905                    // Create arena string
906                    let msg = arena_string("Hello from sender!");
907
908                    // Push string and channel for send
909                    let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
910                    let stack = push(stack, Value::Channel(channel_clone));
911
912                    // Send (will clone to global)
913                    send(stack)
914                }
915            }
916
917            // Receiver strand: receives string from channel
918            extern "C" fn receiver(_stack: Stack) -> Stack {
919                use crate::value::ChannelData;
920                use std::sync::Arc;
921                use std::sync::atomic::Ordering;
922
923                unsafe {
924                    let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
925                    let channel = Arc::from_raw(ch_ptr);
926                    let channel_clone = Arc::clone(&channel);
927                    std::mem::forget(channel); // Don't drop
928
929                    // Push channel for receive
930                    let stack = push(
931                        crate::stack::alloc_test_stack(),
932                        Value::Channel(channel_clone),
933                    );
934
935                    // Receive message (returns value, success_flag)
936                    let stack = receive(stack);
937
938                    // Pop success flag first, then message
939                    let (stack, _success) = pop(stack);
940                    let (_stack, msg_val) = pop(stack);
941                    match msg_val {
942                        Value::String(s) => {
943                            assert_eq!(s.as_str(), "Hello from sender!");
944                            RECEIVED_COUNT.fetch_add(1, Ordering::SeqCst);
945                        }
946                        _ => panic!("Expected String"),
947                    }
948
949                    std::ptr::null_mut()
950                }
951            }
952
953            // Spawn sender and receiver
954            spawn_strand(sender);
955            spawn_strand(receiver);
956
957            // Wait for both strands
958            wait_all_strands();
959
960            // Verify message was received
961            assert_eq!(
962                RECEIVED_COUNT.load(Ordering::SeqCst),
963                1,
964                "Receiver should have received message"
965            );
966
967            // Clean up channel
968            let stack = push(stack, Value::Channel(channel));
969            close_channel(stack);
970        }
971    }
972
973    #[test]
974    fn test_no_memory_leak_over_many_iterations() {
975        // PR #11 feedback: Verify 10K+ strand iterations don't cause memory growth
976        unsafe {
977            use crate::arena;
978            use crate::seqstring::arena_string;
979
980            extern "C" fn allocate_strings_and_exit(stack: Stack) -> Stack {
981                // Simulate request processing: many temp allocations
982                for i in 0..50 {
983                    let temp = arena_string(&format!("request header {}", i));
984                    assert!(!temp.as_str().is_empty());
985                    // Strings dropped here but arena memory stays allocated
986                }
987                stack
988            }
989
990            // Run many iterations to detect leaks
991            let iterations = 10_000;
992
993            for i in 0..iterations {
994                // Reset arena before each iteration to start fresh
995                arena::arena_reset();
996
997                // Spawn strand, let it allocate strings, then exit
998                strand_spawn(allocate_strings_and_exit, std::ptr::null_mut());
999
1000                // Wait for completion (triggers arena reset)
1001                wait_all_strands();
1002
1003                // Every 1000 iterations, verify arena is actually reset
1004                if i % 1000 == 0 {
1005                    let stats = arena::arena_stats();
1006                    assert_eq!(
1007                        stats.allocated_bytes, 0,
1008                        "Arena not reset after iteration {} (leaked {} bytes)",
1009                        i, stats.allocated_bytes
1010                    );
1011                }
1012            }
1013
1014            // Final verification: arena should be empty
1015            let final_stats = arena::arena_stats();
1016            assert_eq!(
1017                final_stats.allocated_bytes, 0,
1018                "Arena leaked memory after {} iterations ({} bytes)",
1019                iterations, final_stats.allocated_bytes
1020            );
1021
1022            println!(
1023                "✓ Memory leak test passed: {} iterations with no growth",
1024                iterations
1025            );
1026        }
1027    }
1028
1029    #[test]
1030    fn test_parse_stack_size_valid() {
1031        assert_eq!(parse_stack_size(Some("2097152".to_string())), 2097152);
1032        assert_eq!(parse_stack_size(Some("1".to_string())), 1);
1033        assert_eq!(parse_stack_size(Some("999999999".to_string())), 999999999);
1034    }
1035
1036    #[test]
1037    fn test_parse_stack_size_none() {
1038        assert_eq!(parse_stack_size(None), DEFAULT_STACK_SIZE);
1039    }
1040
1041    #[test]
1042    fn test_parse_stack_size_zero() {
1043        // Zero should fall back to default (with warning printed to stderr)
1044        assert_eq!(parse_stack_size(Some("0".to_string())), DEFAULT_STACK_SIZE);
1045    }
1046
1047    #[test]
1048    fn test_parse_stack_size_invalid() {
1049        // Non-numeric should fall back to default (with warning printed to stderr)
1050        assert_eq!(
1051            parse_stack_size(Some("invalid".to_string())),
1052            DEFAULT_STACK_SIZE
1053        );
1054        assert_eq!(
1055            parse_stack_size(Some("-100".to_string())),
1056            DEFAULT_STACK_SIZE
1057        );
1058        assert_eq!(parse_stack_size(Some("".to_string())), DEFAULT_STACK_SIZE);
1059        assert_eq!(
1060            parse_stack_size(Some("1.5".to_string())),
1061            DEFAULT_STACK_SIZE
1062        );
1063    }
1064
1065    #[test]
1066    #[cfg(feature = "diagnostics")]
1067    fn test_strand_registry_basic() {
1068        let registry = StrandRegistry::new(10);
1069
1070        // Register some strands
1071        assert_eq!(registry.register(1), Some(0)); // First slot
1072        assert_eq!(registry.register(2), Some(1)); // Second slot
1073        assert_eq!(registry.register(3), Some(2)); // Third slot
1074
1075        // Verify active strands
1076        let active: Vec<_> = registry.active_strands().collect();
1077        assert_eq!(active.len(), 3);
1078
1079        // Unregister one
1080        assert!(registry.unregister(2));
1081        let active: Vec<_> = registry.active_strands().collect();
1082        assert_eq!(active.len(), 2);
1083
1084        // Unregister non-existent should return false
1085        assert!(!registry.unregister(999));
1086    }
1087
1088    #[test]
1089    #[cfg(feature = "diagnostics")]
1090    fn test_strand_registry_overflow() {
1091        let registry = StrandRegistry::new(3); // Small capacity
1092
1093        // Fill it up
1094        assert!(registry.register(1).is_some());
1095        assert!(registry.register(2).is_some());
1096        assert!(registry.register(3).is_some());
1097
1098        // Next should overflow
1099        assert!(registry.register(4).is_none());
1100        assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 1);
1101
1102        // Another overflow
1103        assert!(registry.register(5).is_none());
1104        assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 2);
1105    }
1106
1107    #[test]
1108    #[cfg(feature = "diagnostics")]
1109    fn test_strand_registry_slot_reuse() {
1110        let registry = StrandRegistry::new(3);
1111
1112        // Fill it up
1113        registry.register(1);
1114        registry.register(2);
1115        registry.register(3);
1116
1117        // Unregister middle one
1118        registry.unregister(2);
1119
1120        // New registration should reuse the slot
1121        assert!(registry.register(4).is_some());
1122        assert_eq!(registry.active_strands().count(), 3);
1123    }
1124
1125    #[test]
1126    #[cfg(feature = "diagnostics")]
1127    fn test_strand_registry_concurrent_stress() {
1128        use std::sync::Arc;
1129        use std::thread;
1130
1131        let registry = Arc::new(StrandRegistry::new(50)); // Moderate capacity
1132
1133        let handles: Vec<_> = (0..100)
1134            .map(|i| {
1135                let reg = Arc::clone(&registry);
1136                thread::spawn(move || {
1137                    let id = (i + 1) as u64;
1138                    // Register
1139                    let _ = reg.register(id);
1140                    // Brief work
1141                    thread::yield_now();
1142                    // Unregister
1143                    reg.unregister(id);
1144                })
1145            })
1146            .collect();
1147
1148        for h in handles {
1149            h.join().unwrap();
1150        }
1151
1152        // All slots should be free after all threads complete
1153        assert_eq!(registry.active_strands().count(), 0);
1154    }
1155
1156    #[test]
1157    fn test_strand_lifecycle_counters() {
1158        unsafe {
1159            // Reset counters for isolation (not perfect but helps)
1160            let initial_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
1161            let initial_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
1162
1163            static COUNTER: AtomicU32 = AtomicU32::new(0);
1164
1165            extern "C" fn simple_work(_stack: Stack) -> Stack {
1166                COUNTER.fetch_add(1, Ordering::SeqCst);
1167                std::ptr::null_mut()
1168            }
1169
1170            COUNTER.store(0, Ordering::SeqCst);
1171
1172            // Spawn some strands
1173            for _ in 0..10 {
1174                strand_spawn(simple_work, std::ptr::null_mut());
1175            }
1176
1177            wait_all_strands();
1178
1179            // Verify counters incremented
1180            let final_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
1181            let final_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
1182
1183            assert!(
1184                final_spawned >= initial_spawned + 10,
1185                "TOTAL_SPAWNED should have increased by at least 10"
1186            );
1187            assert!(
1188                final_completed >= initial_completed + 10,
1189                "TOTAL_COMPLETED should have increased by at least 10"
1190            );
1191            assert_eq!(COUNTER.load(Ordering::SeqCst), 10);
1192        }
1193    }
1194
1195    // =========================================================================
1196    // Yield Safety Valve Tests
1197    // =========================================================================
1198
1199    #[test]
1200    fn test_maybe_yield_disabled_by_default() {
1201        // When SEQ_YIELD_INTERVAL is not set (or 0), maybe_yield should be a no-op
1202        // This test verifies it doesn't panic and returns quickly
1203        for _ in 0..1000 {
1204            patch_seq_maybe_yield();
1205        }
1206    }
1207
1208    #[test]
1209    fn test_tail_call_counter_increments() {
1210        // Verify the thread-local counter increments correctly
1211        TAIL_CALL_COUNTER.with(|counter| {
1212            let initial = counter.get();
1213            patch_seq_maybe_yield();
1214            patch_seq_maybe_yield();
1215            patch_seq_maybe_yield();
1216            // Counter should have incremented (if threshold > 0) or stayed same (if disabled)
1217            // Either way, it shouldn't panic
1218            let _ = counter.get();
1219            // Reset to avoid affecting other tests
1220            counter.set(initial);
1221        });
1222    }
1223
1224    #[test]
1225    fn test_counter_overflow_safety() {
1226        // Verify wrapping_add prevents overflow panic
1227        TAIL_CALL_COUNTER.with(|counter| {
1228            let initial = counter.get();
1229            // Set counter near max to test overflow behavior
1230            counter.set(u64::MAX - 1);
1231            // These calls should not panic due to overflow
1232            patch_seq_maybe_yield();
1233            patch_seq_maybe_yield();
1234            patch_seq_maybe_yield();
1235            // Reset
1236            counter.set(initial);
1237        });
1238    }
1239}