Skip to main content

seq_runtime/
scheduler.rs

1//! Scheduler - Green Thread Management with May
2//!
3//! CSP-style concurrency for Seq using May coroutines.
4//! Each strand is a lightweight green thread that can communicate via channels.
5//!
6//! ## Non-Blocking Guarantee
7//!
8//! Channel operations (`send`, `receive`) use May's cooperative blocking and NEVER
9//! block OS threads. However, I/O operations (`write_line`, `read_line` in io.rs)
10//! currently use blocking syscalls. Future work will make all I/O non-blocking.
11//!
12//! ## Panic Behavior
13//!
14//! Functions panic on invalid input (null stacks, negative IDs, closed channels).
15//! In a production system, consider implementing error channels or Result-based
16//! error handling instead of panicking.
17
18use crate::stack::Stack;
19use crate::tagged_stack::StackValue;
20use may::coroutine;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::{Condvar, Mutex, Once};
23
24static SCHEDULER_INIT: Once = Once::new();
25
26// Strand lifecycle tracking
27//
28// Design rationale:
29// - ACTIVE_STRANDS: Lock-free atomic counter for the hot path (spawn/complete)
30//   Every strand increments on spawn, decrements on complete. This is extremely
31//   fast (lock-free atomic ops) and suitable for high-frequency operations.
32//
33// - SHUTDOWN_CONDVAR/MUTEX: Event-driven synchronization for the cold path (shutdown wait)
34//   Used only when waiting for all strands to complete (program shutdown).
35//   Condvar provides event-driven wakeup instead of polling, which is critical
36//   for a systems language - no CPU waste, proper OS-level blocking.
37//
38// Why not track JoinHandles?
39// Strands are like Erlang processes - potentially hundreds of thousands of concurrent
40// entities with independent lifecycles. Storing handles would require global mutable
41// state with synchronization overhead on the hot path. The counter + condvar approach
42// keeps the hot path lock-free while providing proper shutdown synchronization.
43pub static ACTIVE_STRANDS: AtomicUsize = AtomicUsize::new(0);
44pub(crate) static SHUTDOWN_CONDVAR: Condvar = Condvar::new();
45pub(crate) static SHUTDOWN_MUTEX: Mutex<()> = Mutex::new(());
46
47// Strand lifecycle statistics (for diagnostics)
48//
49// These counters provide observability into strand lifecycle without any locking.
50// All operations are lock-free atomic increments/loads.
51//
52// - TOTAL_SPAWNED: Monotonically increasing count of all strands ever spawned
53// - TOTAL_COMPLETED: Monotonically increasing count of all strands that completed
54// - PEAK_STRANDS: High-water mark of concurrent strands (helps detect strand leaks)
55//
56// Useful diagnostics:
57// - Currently running: ACTIVE_STRANDS
58// - Completed successfully: TOTAL_COMPLETED
59// - Potential leaks: TOTAL_SPAWNED - TOTAL_COMPLETED - ACTIVE_STRANDS > 0 (strands lost)
60// - Peak concurrency: PEAK_STRANDS
61pub static TOTAL_SPAWNED: AtomicU64 = AtomicU64::new(0);
62pub static TOTAL_COMPLETED: AtomicU64 = AtomicU64::new(0);
63pub static PEAK_STRANDS: AtomicUsize = AtomicUsize::new(0);
64
65// Unique strand ID generation
66static NEXT_STRAND_ID: AtomicU64 = AtomicU64::new(1);
67
68// =============================================================================
69// Lock-Free Strand Registry (only when diagnostics feature is enabled)
70// =============================================================================
71//
72// A fixed-size array of slots for tracking active strands without locks.
73// Each slot stores a strand ID (0 = free) and spawn timestamp.
74//
75// Design principles:
76// - Fixed size: No dynamic allocation, predictable memory footprint
77// - Lock-free: All operations use atomic CAS, no mutex contention
78// - Bounded: If registry is full, strands still run but aren't tracked
79// - Zero cost when not querying: Only diagnostics reads the registry
80//
81// Slot encoding:
82// - strand_id == 0: slot is free
83// - strand_id > 0: slot contains an active strand
84//
85// The registry size can be configured via SEQ_STRAND_REGISTRY_SIZE env var.
86// Default is 1024 slots, which is sufficient for most applications.
87//
88// When the "diagnostics" feature is disabled, the registry is not compiled,
89// eliminating the SystemTime::now() syscall and O(n) scans on every spawn.
90
91#[cfg(feature = "diagnostics")]
92/// Default strand registry size (number of trackable concurrent strands)
93const DEFAULT_REGISTRY_SIZE: usize = 1024;
94
95#[cfg(feature = "diagnostics")]
96/// A slot in the strand registry
97///
98/// Uses two atomics to store strand info without locks.
99/// A slot is free when strand_id == 0.
100pub struct StrandSlot {
101    /// Strand ID (0 = free, >0 = active strand)
102    pub strand_id: AtomicU64,
103    /// Spawn timestamp (seconds since UNIX epoch, for detecting stuck strands)
104    pub spawn_time: AtomicU64,
105}
106
107#[cfg(feature = "diagnostics")]
108impl StrandSlot {
109    const fn new() -> Self {
110        Self {
111            strand_id: AtomicU64::new(0),
112            spawn_time: AtomicU64::new(0),
113        }
114    }
115}
116
117#[cfg(feature = "diagnostics")]
118/// Lock-free strand registry
119///
120/// Provides O(n) registration (scan for free slot) and O(n) unregistration.
121/// This is acceptable because:
122/// 1. N is bounded (default 1024)
123/// 2. Registration/unregistration are infrequent compared to strand work
124/// 3. No locks means no contention, just atomic ops
125pub struct StrandRegistry {
126    slots: Box<[StrandSlot]>,
127    /// Number of slots that couldn't be registered (registry full)
128    pub overflow_count: AtomicU64,
129}
130
131#[cfg(feature = "diagnostics")]
132impl StrandRegistry {
133    /// Create a new registry with the given capacity
134    fn new(capacity: usize) -> Self {
135        let mut slots = Vec::with_capacity(capacity);
136        for _ in 0..capacity {
137            slots.push(StrandSlot::new());
138        }
139        Self {
140            slots: slots.into_boxed_slice(),
141            overflow_count: AtomicU64::new(0),
142        }
143    }
144
145    /// Register a strand, returning the slot index if successful
146    ///
147    /// Uses CAS to atomically claim a free slot.
148    /// Returns None if the registry is full (strand still runs, just not tracked).
149    pub fn register(&self, strand_id: u64) -> Option<usize> {
150        let spawn_time = std::time::SystemTime::now()
151            .duration_since(std::time::UNIX_EPOCH)
152            .map(|d| d.as_secs())
153            .unwrap_or(0);
154
155        // Scan for a free slot
156        for (idx, slot) in self.slots.iter().enumerate() {
157            // Set spawn time first, before claiming the slot
158            // This prevents a race where a reader sees strand_id != 0 but spawn_time == 0
159            // If we fail to claim the slot, the owner will overwrite this value anyway
160            slot.spawn_time.store(spawn_time, Ordering::Relaxed);
161
162            // Try to claim this slot (CAS from 0 to strand_id)
163            // AcqRel ensures the spawn_time write above is visible before strand_id becomes non-zero
164            if slot
165                .strand_id
166                .compare_exchange(0, strand_id, Ordering::AcqRel, Ordering::Relaxed)
167                .is_ok()
168            {
169                return Some(idx);
170            }
171        }
172
173        // Registry full - track overflow but strand still runs
174        self.overflow_count.fetch_add(1, Ordering::Relaxed);
175        None
176    }
177
178    /// Unregister a strand by ID
179    ///
180    /// Scans for the slot containing this strand ID and clears it.
181    /// Returns true if found and cleared, false if not found.
182    ///
183    /// Note: ABA problem is not a concern here because strand IDs are monotonically
184    /// increasing u64 values. ID reuse would require 2^64 spawns, which is practically
185    /// impossible (at 1 billion spawns/sec, it would take ~584 years).
186    pub fn unregister(&self, strand_id: u64) -> bool {
187        for slot in self.slots.iter() {
188            // Check if this slot contains our strand
189            if slot
190                .strand_id
191                .compare_exchange(strand_id, 0, Ordering::AcqRel, Ordering::Relaxed)
192                .is_ok()
193            {
194                // Successfully cleared the slot
195                slot.spawn_time.store(0, Ordering::Release);
196                return true;
197            }
198        }
199        false
200    }
201
202    /// Iterate over active strands (for diagnostics)
203    ///
204    /// Returns an iterator of (strand_id, spawn_time) for non-empty slots.
205    /// Note: This is a snapshot and may be slightly inconsistent due to concurrent updates.
206    pub fn active_strands(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
207        self.slots.iter().filter_map(|slot| {
208            // Acquire on strand_id synchronizes with the Release in register()
209            let id = slot.strand_id.load(Ordering::Acquire);
210            if id > 0 {
211                // Relaxed is sufficient here - we've already synchronized via strand_id Acquire
212                // and spawn_time is written before strand_id in register()
213                let time = slot.spawn_time.load(Ordering::Relaxed);
214                Some((id, time))
215            } else {
216                None
217            }
218        })
219    }
220
221    /// Get the registry capacity
222    pub fn capacity(&self) -> usize {
223        self.slots.len()
224    }
225}
226
227// Global strand registry (lazy initialized)
228#[cfg(feature = "diagnostics")]
229static STRAND_REGISTRY: std::sync::OnceLock<StrandRegistry> = std::sync::OnceLock::new();
230
231/// Get or initialize the global strand registry
232#[cfg(feature = "diagnostics")]
233pub fn strand_registry() -> &'static StrandRegistry {
234    STRAND_REGISTRY.get_or_init(|| {
235        let size = std::env::var("SEQ_STRAND_REGISTRY_SIZE")
236            .ok()
237            .and_then(|s| s.parse().ok())
238            .unwrap_or(DEFAULT_REGISTRY_SIZE);
239        StrandRegistry::new(size)
240    })
241}
242
243/// Default coroutine stack size: 128KB (0x20000 bytes)
244/// Reduced from 1MB for better spawn performance (~16% faster in benchmarks).
245/// Can be overridden via SEQ_STACK_SIZE environment variable.
246const DEFAULT_STACK_SIZE: usize = 0x20000;
247
248/// Parse stack size from an optional string value.
249/// Returns the parsed size, or DEFAULT_STACK_SIZE if the value is missing, zero, or invalid.
250/// Prints a warning to stderr for invalid values.
251fn parse_stack_size(env_value: Option<String>) -> usize {
252    match env_value {
253        Some(val) => match val.parse::<usize>() {
254            Ok(0) => {
255                eprintln!(
256                    "Warning: SEQ_STACK_SIZE=0 is invalid, using default {}",
257                    DEFAULT_STACK_SIZE
258                );
259                DEFAULT_STACK_SIZE
260            }
261            Ok(size) => size,
262            Err(_) => {
263                eprintln!(
264                    "Warning: SEQ_STACK_SIZE='{}' is not a valid number, using default {}",
265                    val, DEFAULT_STACK_SIZE
266                );
267                DEFAULT_STACK_SIZE
268            }
269        },
270        None => DEFAULT_STACK_SIZE,
271    }
272}
273
274/// Default coroutine pool capacity.
275/// May reuses completed coroutine stacks from this pool to avoid allocations.
276/// Default of 1000 is often too small for spawn-heavy workloads.
277const DEFAULT_POOL_CAPACITY: usize = 10000;
278
279/// Initialize the scheduler.
280///
281/// # Safety
282/// Safe to call multiple times (idempotent via Once).
283/// Configures May coroutines with appropriate stack size for LLVM-generated code.
284#[unsafe(no_mangle)]
285pub unsafe extern "C" fn patch_seq_scheduler_init() {
286    SCHEDULER_INIT.call_once(|| {
287        // Configure stack size for coroutines
288        // Default is 128KB, reduced from 1MB for better spawn performance.
289        // Can be overridden via SEQ_STACK_SIZE environment variable (in bytes)
290        // Example: SEQ_STACK_SIZE=2097152 for 2MB
291        // Invalid values (non-numeric, zero) are warned and ignored.
292        let stack_size = parse_stack_size(std::env::var("SEQ_STACK_SIZE").ok());
293
294        // Configure coroutine pool capacity
295        // May reuses coroutine stacks from this pool to reduce allocation overhead.
296        // Default 10000 is 10x May's default (1000), better for spawn-heavy workloads.
297        // Can be overridden via SEQ_POOL_CAPACITY environment variable.
298        let pool_capacity = std::env::var("SEQ_POOL_CAPACITY")
299            .ok()
300            .and_then(|s| s.parse().ok())
301            .filter(|&v| v > 0)
302            .unwrap_or(DEFAULT_POOL_CAPACITY);
303
304        may::config()
305            .set_stack_size(stack_size)
306            .set_pool_capacity(pool_capacity);
307
308        // Install SIGINT handler for Ctrl-C (unconditional - basic expected behavior)
309        // Without this, tight loops won't respond to Ctrl-C because signals
310        // are only delivered at syscall boundaries, and TCO loops may never syscall.
311        #[cfg(unix)]
312        {
313            use std::sync::atomic::{AtomicBool, Ordering};
314            static SIGINT_RECEIVED: AtomicBool = AtomicBool::new(false);
315
316            extern "C" fn sigint_handler(_: libc::c_int) {
317                // If we receive SIGINT twice, force exit (user is insistent)
318                if SIGINT_RECEIVED.swap(true, Ordering::SeqCst) {
319                    // Second SIGINT - exit immediately
320                    unsafe { libc::_exit(130) }; // 128 + 2 (SIGINT)
321                }
322                // First SIGINT - exit cleanly
323                std::process::exit(130);
324            }
325
326            unsafe {
327                libc::signal(
328                    libc::SIGINT,
329                    sigint_handler as *const () as libc::sighandler_t,
330                );
331            }
332        }
333
334        // Install SIGQUIT handler for runtime diagnostics (kill -3)
335        #[cfg(feature = "diagnostics")]
336        crate::diagnostics::install_signal_handler();
337
338        // Install watchdog timer (if enabled via SEQ_WATCHDOG_SECS)
339        #[cfg(feature = "diagnostics")]
340        crate::watchdog::install_watchdog();
341    });
342}
343
344/// Run the scheduler and wait for all coroutines to complete
345///
346/// # Safety
347/// Returns the final stack (always null for now since May handles all scheduling).
348/// This function blocks until all spawned strands have completed.
349///
350/// Uses a condition variable for event-driven shutdown synchronization rather than
351/// polling. The mutex is only held during the wait protocol, not during strand
352/// execution, so there's no contention on the hot path.
353#[unsafe(no_mangle)]
354pub unsafe extern "C" fn patch_seq_scheduler_run() -> Stack {
355    let mut guard = SHUTDOWN_MUTEX.lock().expect(
356        "scheduler_run: shutdown mutex poisoned - strand panicked during shutdown synchronization",
357    );
358
359    // Wait for all strands to complete
360    // The condition variable will be notified when the last strand exits
361    while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
362        guard = SHUTDOWN_CONDVAR
363            .wait(guard)
364            .expect("scheduler_run: condvar wait failed - strand panicked during shutdown wait");
365    }
366
367    // All strands have completed
368    std::ptr::null_mut()
369}
370
371/// Shutdown the scheduler
372///
373/// # Safety
374/// Safe to call. May doesn't require explicit shutdown, so this is a no-op.
375#[unsafe(no_mangle)]
376pub unsafe extern "C" fn patch_seq_scheduler_shutdown() {
377    // May doesn't require explicit shutdown
378    // This function exists for API symmetry with init
379}
380
381/// Spawn a strand (coroutine) with initial stack
382///
383/// # Safety
384/// - `entry` must be a valid function pointer that can safely execute on any thread
385/// - `initial_stack` must be either null or a valid pointer to a `StackValue` that:
386///   - Was heap-allocated (e.g., via Box)
387///   - Has a 'static lifetime or lives longer than the coroutine
388///   - Is safe to access from the spawned thread
389/// - The caller transfers ownership of `initial_stack` to the coroutine
390/// - Returns a unique strand ID (positive integer)
391///
392/// # Memory Management
393/// The spawned coroutine takes ownership of `initial_stack` and will automatically
394/// free the final stack returned by `entry` upon completion.
395#[unsafe(no_mangle)]
396pub unsafe extern "C" fn patch_seq_strand_spawn(
397    entry: extern "C" fn(Stack) -> Stack,
398    initial_stack: Stack,
399) -> i64 {
400    // For backwards compatibility, use null base (won't support nested spawns)
401    unsafe { patch_seq_strand_spawn_with_base(entry, initial_stack, std::ptr::null_mut()) }
402}
403
404/// Spawn a strand (coroutine) with initial stack and explicit stack base
405///
406/// This variant allows setting the STACK_BASE for the spawned strand, which is
407/// required for the child to perform operations like clone_stack (nested spawn).
408///
409/// # Safety
410/// - `entry` must be a valid function pointer that can safely execute on any thread
411/// - `initial_stack` must be a valid pointer to a `StackValue` array
412/// - `stack_base` must be the base of the stack (or null to skip setting STACK_BASE)
413/// - The caller transfers ownership of `initial_stack` to the coroutine
414/// - Returns a unique strand ID (positive integer)
415#[unsafe(no_mangle)]
416pub unsafe extern "C" fn patch_seq_strand_spawn_with_base(
417    entry: extern "C" fn(Stack) -> Stack,
418    initial_stack: Stack,
419    stack_base: Stack,
420) -> i64 {
421    // Generate unique strand ID
422    let strand_id = NEXT_STRAND_ID.fetch_add(1, Ordering::Relaxed);
423
424    // Increment active strand counter and track total spawned
425    let new_count = ACTIVE_STRANDS.fetch_add(1, Ordering::Release) + 1;
426    TOTAL_SPAWNED.fetch_add(1, Ordering::Relaxed);
427
428    // Update peak strands if this is a new high-water mark
429    // Uses a CAS loop to safely update the maximum without locks
430    // Uses Acquire/Release ordering for proper synchronization with diagnostics reads
431    let mut peak = PEAK_STRANDS.load(Ordering::Acquire);
432    while new_count > peak {
433        match PEAK_STRANDS.compare_exchange_weak(
434            peak,
435            new_count,
436            Ordering::Release,
437            Ordering::Relaxed,
438        ) {
439            Ok(_) => break,
440            Err(current) => peak = current,
441        }
442    }
443
444    // Register strand in the registry (for diagnostics visibility)
445    // If registry is full, strand still runs but isn't tracked
446    #[cfg(feature = "diagnostics")]
447    let _ = strand_registry().register(strand_id);
448
449    // Function pointers are already Send, no wrapper needed
450    let entry_fn = entry;
451
452    // Convert pointers to usize (which is Send)
453    // This is necessary because *mut T is !Send, but the caller guarantees thread safety
454    let stack_addr = initial_stack as usize;
455    let base_addr = stack_base as usize;
456
457    unsafe {
458        coroutine::spawn(move || {
459            // Reconstruct pointers from addresses
460            let stack_ptr = stack_addr as *mut StackValue;
461            let base_ptr = base_addr as *mut StackValue;
462
463            // Debug assertion: validate stack pointer alignment and reasonable address
464            debug_assert!(
465                stack_ptr.is_null()
466                    || stack_addr.is_multiple_of(std::mem::align_of::<StackValue>()),
467                "Stack pointer must be null or properly aligned"
468            );
469            debug_assert!(
470                stack_ptr.is_null() || stack_addr > 0x1000,
471                "Stack pointer appears to be in invalid memory region (< 0x1000)"
472            );
473
474            // Set STACK_BASE for this strand if provided
475            // This enables nested spawns and other operations that need clone_stack
476            if !base_ptr.is_null() {
477                crate::stack::patch_seq_set_stack_base(base_ptr);
478            }
479
480            // Execute the entry function
481            let final_stack = entry_fn(stack_ptr);
482
483            // Clean up the final stack to prevent memory leak
484            free_stack(final_stack);
485
486            // Unregister strand from registry (uses captured strand_id)
487            #[cfg(feature = "diagnostics")]
488            strand_registry().unregister(strand_id);
489
490            // Decrement active strand counter first, then track completion
491            // This ordering ensures the invariant SPAWNED = COMPLETED + ACTIVE + lost
492            // is never violated from an external observer's perspective
493            // Use AcqRel to establish proper synchronization (both acquire and release barriers)
494            let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
495
496            // Track completion after decrementing active count
497            TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
498            if prev_count == 1 {
499                // We were the last strand - acquire mutex and signal shutdown
500                // The mutex must be held when calling notify to prevent missed wakeups
501                let _guard = SHUTDOWN_MUTEX.lock()
502                    .expect("strand_spawn: shutdown mutex poisoned - strand panicked during shutdown notification");
503                SHUTDOWN_CONDVAR.notify_all();
504            }
505        });
506    }
507
508    strand_id as i64
509}
510
511/// Free a stack allocated by the runtime
512///
513/// With the tagged stack implementation, stack cleanup is handled differently.
514/// The contiguous array is freed when the TaggedStack is dropped.
515/// This function just resets the thread-local arena.
516///
517/// # Safety
518/// Stack pointer must be valid or null.
519fn free_stack(_stack: Stack) {
520    // With tagged stack, the array is freed when TaggedStack is dropped.
521    // We just need to reset the arena for thread-local strings.
522
523    // Reset the thread-local arena to free all arena-allocated strings
524    // This is safe because:
525    // - Any arena strings in Values have been dropped above
526    // - Global strings are unaffected (they have their own allocations)
527    // - Channel sends clone to global, so no cross-strand arena pointers
528    crate::arena::arena_reset();
529}
530
531/// Legacy spawn_strand function (kept for compatibility)
532///
533/// # Safety
534/// `entry` must be a valid function pointer that can safely execute on any thread.
535#[unsafe(no_mangle)]
536pub unsafe extern "C" fn patch_seq_spawn_strand(entry: extern "C" fn(Stack) -> Stack) {
537    unsafe {
538        patch_seq_strand_spawn(entry, std::ptr::null_mut());
539    }
540}
541
542/// Yield execution to allow other coroutines to run
543///
544/// # Safety
545/// Always safe to call from within a May coroutine.
546#[unsafe(no_mangle)]
547pub unsafe extern "C" fn patch_seq_yield_strand(stack: Stack) -> Stack {
548    coroutine::yield_now();
549    stack
550}
551
552// =============================================================================
553// Cooperative Yield Safety Valve
554// =============================================================================
555//
556// Prevents tight TCO loops from starving other strands and making the process
557// unresponsive. When enabled via SEQ_YIELD_INTERVAL, yields after N tail calls.
558//
559// Configuration:
560//   SEQ_YIELD_INTERVAL=10000  - Yield every 10,000 tail calls (default: 0 = disabled)
561//
562// Scope:
563//   - Covers: User-defined word tail calls (musttail) and quotation tail calls
564//   - Does NOT cover: Closure calls (they use regular calls, bounded by stack)
565//   - Does NOT cover: Non-tail recursive calls (bounded by stack)
566//   This is intentional: the safety valve targets unbounded TCO loops.
567//
568// Design:
569//   - Zero overhead when disabled (threshold=0 short-circuits immediately)
570//   - Thread-local counter avoids synchronization overhead
571//   - Called before every musttail in generated code
572//   - Threshold is cached on first access via OnceLock
573//
574// Thread-Local Counter Behavior:
575//   The counter is per-OS-thread, not per-coroutine. Multiple coroutines on the
576//   same OS thread share the counter, which may cause yields slightly more
577//   frequently than the configured interval. This is intentional:
578//   - Avoids coroutine-local storage overhead
579//   - Still achieves the goal of preventing starvation
580//   - Actual yield frequency is still bounded by the threshold
581
582use std::cell::Cell;
583use std::sync::OnceLock;
584
585/// Cached yield interval threshold (0 = disabled)
586static YIELD_THRESHOLD: OnceLock<u64> = OnceLock::new();
587
588thread_local! {
589    /// Per-thread tail call counter
590    static TAIL_CALL_COUNTER: Cell<u64> = const { Cell::new(0) };
591}
592
593/// Get the yield threshold from environment (cached)
594///
595/// Returns 0 (disabled) if SEQ_YIELD_INTERVAL is not set or invalid.
596/// Prints a warning to stderr if the value is set but invalid.
597fn get_yield_threshold() -> u64 {
598    *YIELD_THRESHOLD.get_or_init(|| {
599        match std::env::var("SEQ_YIELD_INTERVAL") {
600            Ok(s) if s.is_empty() => 0,
601            Ok(s) => match s.parse::<u64>() {
602                Ok(n) => n,
603                Err(_) => {
604                    eprintln!(
605                        "Warning: SEQ_YIELD_INTERVAL='{}' is not a valid positive integer, yield safety valve disabled",
606                        s
607                    );
608                    0
609                }
610            },
611            Err(_) => 0,
612        }
613    })
614}
615
616/// Maybe yield to other coroutines based on tail call count
617///
618/// Called before every tail call in generated code. When SEQ_YIELD_INTERVAL
619/// is set, yields after that many tail calls to prevent starvation.
620///
621/// # Performance
622/// - Disabled (default): Single branch on cached threshold (< 1ns)
623/// - Enabled: Increment + compare + occasional yield (~10-20ns average)
624///
625/// # Safety
626/// Always safe to call. No-op when not in a May coroutine context.
627#[unsafe(no_mangle)]
628pub extern "C" fn patch_seq_maybe_yield() {
629    let threshold = get_yield_threshold();
630
631    // Fast path: disabled
632    if threshold == 0 {
633        return;
634    }
635
636    TAIL_CALL_COUNTER.with(|counter| {
637        let count = counter.get().wrapping_add(1);
638        counter.set(count);
639
640        if count >= threshold {
641            counter.set(0);
642            coroutine::yield_now();
643        }
644    });
645}
646
647/// Wait for all strands to complete
648///
649/// # Safety
650/// Always safe to call. Blocks until all spawned strands have completed.
651///
652/// Uses event-driven synchronization via condition variable - no polling overhead.
653#[unsafe(no_mangle)]
654pub unsafe extern "C" fn patch_seq_wait_all_strands() {
655    let mut guard = SHUTDOWN_MUTEX.lock()
656        .expect("wait_all_strands: shutdown mutex poisoned - strand panicked during shutdown synchronization");
657
658    // Wait for all strands to complete
659    // The condition variable will be notified when the last strand exits
660    while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
661        guard = SHUTDOWN_CONDVAR
662            .wait(guard)
663            .expect("wait_all_strands: condvar wait failed - strand panicked during shutdown wait");
664    }
665}
666
667// Public re-exports with short names for internal use
668pub use patch_seq_maybe_yield as maybe_yield;
669pub use patch_seq_scheduler_init as scheduler_init;
670pub use patch_seq_scheduler_run as scheduler_run;
671pub use patch_seq_scheduler_shutdown as scheduler_shutdown;
672pub use patch_seq_spawn_strand as spawn_strand;
673pub use patch_seq_strand_spawn as strand_spawn;
674pub use patch_seq_wait_all_strands as wait_all_strands;
675pub use patch_seq_yield_strand as yield_strand;
676
677#[cfg(test)]
678mod tests {
679    use super::*;
680    use crate::stack::push;
681    use crate::value::Value;
682    use std::sync::atomic::{AtomicU32, Ordering};
683
684    #[test]
685    fn test_spawn_strand() {
686        unsafe {
687            static COUNTER: AtomicU32 = AtomicU32::new(0);
688
689            extern "C" fn test_entry(_stack: Stack) -> Stack {
690                COUNTER.fetch_add(1, Ordering::SeqCst);
691                std::ptr::null_mut()
692            }
693
694            for _ in 0..100 {
695                spawn_strand(test_entry);
696            }
697
698            std::thread::sleep(std::time::Duration::from_millis(200));
699            assert_eq!(COUNTER.load(Ordering::SeqCst), 100);
700        }
701    }
702
703    #[test]
704    fn test_scheduler_init_idempotent() {
705        unsafe {
706            // Should be safe to call multiple times
707            scheduler_init();
708            scheduler_init();
709            scheduler_init();
710        }
711    }
712
713    #[test]
714    fn test_free_stack_null() {
715        // Freeing null should be a no-op
716        free_stack(std::ptr::null_mut());
717    }
718
719    #[test]
720    fn test_free_stack_valid() {
721        unsafe {
722            // Create a stack, then free it
723            let stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
724            free_stack(stack);
725            // If we get here without crashing, test passed
726        }
727    }
728
729    #[test]
730    fn test_strand_spawn_with_stack() {
731        unsafe {
732            static COUNTER: AtomicU32 = AtomicU32::new(0);
733
734            extern "C" fn test_entry(stack: Stack) -> Stack {
735                COUNTER.fetch_add(1, Ordering::SeqCst);
736                // Return the stack as-is (caller will free it)
737                stack
738            }
739
740            let initial_stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
741            strand_spawn(test_entry, initial_stack);
742
743            std::thread::sleep(std::time::Duration::from_millis(200));
744            assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
745        }
746    }
747
748    #[test]
749    fn test_scheduler_shutdown() {
750        unsafe {
751            scheduler_init();
752            scheduler_shutdown();
753            // Should not crash
754        }
755    }
756
757    #[test]
758    fn test_many_strands_stress() {
759        unsafe {
760            static COUNTER: AtomicU32 = AtomicU32::new(0);
761
762            extern "C" fn increment(_stack: Stack) -> Stack {
763                COUNTER.fetch_add(1, Ordering::SeqCst);
764                std::ptr::null_mut()
765            }
766
767            // Reset counter for this test
768            COUNTER.store(0, Ordering::SeqCst);
769
770            // Spawn many strands to stress test synchronization
771            for _ in 0..1000 {
772                strand_spawn(increment, std::ptr::null_mut());
773            }
774
775            // Wait for all to complete
776            wait_all_strands();
777
778            // Verify all strands executed
779            assert_eq!(COUNTER.load(Ordering::SeqCst), 1000);
780        }
781    }
782
783    #[test]
784    fn test_strand_ids_are_unique() {
785        unsafe {
786            use std::collections::HashSet;
787
788            extern "C" fn noop(_stack: Stack) -> Stack {
789                std::ptr::null_mut()
790            }
791
792            // Spawn strands and collect their IDs
793            let mut ids = Vec::new();
794            for _ in 0..100 {
795                let id = strand_spawn(noop, std::ptr::null_mut());
796                ids.push(id);
797            }
798
799            // Wait for completion
800            wait_all_strands();
801
802            // Verify all IDs are unique
803            let unique_ids: HashSet<_> = ids.iter().collect();
804            assert_eq!(unique_ids.len(), 100, "All strand IDs should be unique");
805
806            // Verify all IDs are positive
807            assert!(
808                ids.iter().all(|&id| id > 0),
809                "All strand IDs should be positive"
810            );
811        }
812    }
813
814    #[test]
815    fn test_arena_reset_with_strands() {
816        unsafe {
817            use crate::arena;
818            use crate::seqstring::arena_string;
819
820            extern "C" fn create_temp_strings(stack: Stack) -> Stack {
821                // Create many temporary arena strings (simulating request parsing)
822                for i in 0..100 {
823                    let temp = arena_string(&format!("temporary string {}", i));
824                    // Use the string temporarily
825                    assert!(!temp.as_str().is_empty());
826                    // String is dropped, but memory stays in arena
827                }
828
829                // Arena should have allocated memory
830                let stats = arena::arena_stats();
831                assert!(stats.allocated_bytes > 0, "Arena should have allocations");
832
833                stack // Return empty stack
834            }
835
836            // Reset arena before test
837            arena::arena_reset();
838
839            // Spawn strand that creates many temp strings
840            strand_spawn(create_temp_strings, std::ptr::null_mut());
841
842            // Wait for strand to complete (which calls free_stack -> arena_reset)
843            wait_all_strands();
844
845            // After strand exits, arena should be reset
846            let stats_after = arena::arena_stats();
847            assert_eq!(
848                stats_after.allocated_bytes, 0,
849                "Arena should be reset after strand exits"
850            );
851        }
852    }
853
854    #[test]
855    fn test_arena_with_channel_send() {
856        unsafe {
857            use crate::channel::{close_channel, make_channel, receive, send};
858            use crate::stack::{pop, push};
859            use crate::value::Value;
860            use std::sync::Arc;
861            use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
862
863            static RECEIVED_COUNT: AtomicU32 = AtomicU32::new(0);
864            static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
865
866            // Create channel
867            let stack = crate::stack::alloc_test_stack();
868            let stack = make_channel(stack);
869            let (stack, chan_val) = pop(stack);
870            let channel = match chan_val {
871                Value::Channel(ch) => ch,
872                _ => panic!("Expected Channel"),
873            };
874
875            // Store channel pointer for strands
876            let ch_ptr = Arc::as_ptr(&channel) as i64;
877            CHANNEL_PTR.store(ch_ptr, Ordering::Release);
878
879            // Keep Arc alive
880            std::mem::forget(channel.clone());
881            std::mem::forget(channel.clone());
882
883            // Sender strand: creates arena string, sends through channel
884            extern "C" fn sender(_stack: Stack) -> Stack {
885                use crate::seqstring::arena_string;
886                use crate::value::ChannelData;
887                use std::sync::Arc;
888
889                unsafe {
890                    let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
891                    let channel = Arc::from_raw(ch_ptr);
892                    let channel_clone = Arc::clone(&channel);
893                    std::mem::forget(channel); // Don't drop
894
895                    // Create arena string
896                    let msg = arena_string("Hello from sender!");
897
898                    // Push string and channel for send
899                    let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
900                    let stack = push(stack, Value::Channel(channel_clone));
901
902                    // Send (will clone to global)
903                    send(stack)
904                }
905            }
906
907            // Receiver strand: receives string from channel
908            extern "C" fn receiver(_stack: Stack) -> Stack {
909                use crate::value::ChannelData;
910                use std::sync::Arc;
911                use std::sync::atomic::Ordering;
912
913                unsafe {
914                    let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
915                    let channel = Arc::from_raw(ch_ptr);
916                    let channel_clone = Arc::clone(&channel);
917                    std::mem::forget(channel); // Don't drop
918
919                    // Push channel for receive
920                    let stack = push(
921                        crate::stack::alloc_test_stack(),
922                        Value::Channel(channel_clone),
923                    );
924
925                    // Receive message (returns value, success_flag)
926                    let stack = receive(stack);
927
928                    // Pop success flag first, then message
929                    let (stack, _success) = pop(stack);
930                    let (_stack, msg_val) = pop(stack);
931                    match msg_val {
932                        Value::String(s) => {
933                            assert_eq!(s.as_str(), "Hello from sender!");
934                            RECEIVED_COUNT.fetch_add(1, Ordering::SeqCst);
935                        }
936                        _ => panic!("Expected String"),
937                    }
938
939                    std::ptr::null_mut()
940                }
941            }
942
943            // Spawn sender and receiver
944            spawn_strand(sender);
945            spawn_strand(receiver);
946
947            // Wait for both strands
948            wait_all_strands();
949
950            // Verify message was received
951            assert_eq!(
952                RECEIVED_COUNT.load(Ordering::SeqCst),
953                1,
954                "Receiver should have received message"
955            );
956
957            // Clean up channel
958            let stack = push(stack, Value::Channel(channel));
959            close_channel(stack);
960        }
961    }
962
963    #[test]
964    fn test_no_memory_leak_over_many_iterations() {
965        // PR #11 feedback: Verify 10K+ strand iterations don't cause memory growth
966        unsafe {
967            use crate::arena;
968            use crate::seqstring::arena_string;
969
970            extern "C" fn allocate_strings_and_exit(stack: Stack) -> Stack {
971                // Simulate request processing: many temp allocations
972                for i in 0..50 {
973                    let temp = arena_string(&format!("request header {}", i));
974                    assert!(!temp.as_str().is_empty());
975                    // Strings dropped here but arena memory stays allocated
976                }
977                stack
978            }
979
980            // Run many iterations to detect leaks
981            let iterations = 10_000;
982
983            for i in 0..iterations {
984                // Reset arena before each iteration to start fresh
985                arena::arena_reset();
986
987                // Spawn strand, let it allocate strings, then exit
988                strand_spawn(allocate_strings_and_exit, std::ptr::null_mut());
989
990                // Wait for completion (triggers arena reset)
991                wait_all_strands();
992
993                // Every 1000 iterations, verify arena is actually reset
994                if i % 1000 == 0 {
995                    let stats = arena::arena_stats();
996                    assert_eq!(
997                        stats.allocated_bytes, 0,
998                        "Arena not reset after iteration {} (leaked {} bytes)",
999                        i, stats.allocated_bytes
1000                    );
1001                }
1002            }
1003
1004            // Final verification: arena should be empty
1005            let final_stats = arena::arena_stats();
1006            assert_eq!(
1007                final_stats.allocated_bytes, 0,
1008                "Arena leaked memory after {} iterations ({} bytes)",
1009                iterations, final_stats.allocated_bytes
1010            );
1011
1012            println!(
1013                "✓ Memory leak test passed: {} iterations with no growth",
1014                iterations
1015            );
1016        }
1017    }
1018
1019    #[test]
1020    fn test_parse_stack_size_valid() {
1021        assert_eq!(parse_stack_size(Some("2097152".to_string())), 2097152);
1022        assert_eq!(parse_stack_size(Some("1".to_string())), 1);
1023        assert_eq!(parse_stack_size(Some("999999999".to_string())), 999999999);
1024    }
1025
1026    #[test]
1027    fn test_parse_stack_size_none() {
1028        assert_eq!(parse_stack_size(None), DEFAULT_STACK_SIZE);
1029    }
1030
1031    #[test]
1032    fn test_parse_stack_size_zero() {
1033        // Zero should fall back to default (with warning printed to stderr)
1034        assert_eq!(parse_stack_size(Some("0".to_string())), DEFAULT_STACK_SIZE);
1035    }
1036
1037    #[test]
1038    fn test_parse_stack_size_invalid() {
1039        // Non-numeric should fall back to default (with warning printed to stderr)
1040        assert_eq!(
1041            parse_stack_size(Some("invalid".to_string())),
1042            DEFAULT_STACK_SIZE
1043        );
1044        assert_eq!(
1045            parse_stack_size(Some("-100".to_string())),
1046            DEFAULT_STACK_SIZE
1047        );
1048        assert_eq!(parse_stack_size(Some("".to_string())), DEFAULT_STACK_SIZE);
1049        assert_eq!(
1050            parse_stack_size(Some("1.5".to_string())),
1051            DEFAULT_STACK_SIZE
1052        );
1053    }
1054
1055    #[test]
1056    #[cfg(feature = "diagnostics")]
1057    fn test_strand_registry_basic() {
1058        let registry = StrandRegistry::new(10);
1059
1060        // Register some strands
1061        assert_eq!(registry.register(1), Some(0)); // First slot
1062        assert_eq!(registry.register(2), Some(1)); // Second slot
1063        assert_eq!(registry.register(3), Some(2)); // Third slot
1064
1065        // Verify active strands
1066        let active: Vec<_> = registry.active_strands().collect();
1067        assert_eq!(active.len(), 3);
1068
1069        // Unregister one
1070        assert!(registry.unregister(2));
1071        let active: Vec<_> = registry.active_strands().collect();
1072        assert_eq!(active.len(), 2);
1073
1074        // Unregister non-existent should return false
1075        assert!(!registry.unregister(999));
1076    }
1077
1078    #[test]
1079    #[cfg(feature = "diagnostics")]
1080    fn test_strand_registry_overflow() {
1081        let registry = StrandRegistry::new(3); // Small capacity
1082
1083        // Fill it up
1084        assert!(registry.register(1).is_some());
1085        assert!(registry.register(2).is_some());
1086        assert!(registry.register(3).is_some());
1087
1088        // Next should overflow
1089        assert!(registry.register(4).is_none());
1090        assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 1);
1091
1092        // Another overflow
1093        assert!(registry.register(5).is_none());
1094        assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 2);
1095    }
1096
1097    #[test]
1098    #[cfg(feature = "diagnostics")]
1099    fn test_strand_registry_slot_reuse() {
1100        let registry = StrandRegistry::new(3);
1101
1102        // Fill it up
1103        registry.register(1);
1104        registry.register(2);
1105        registry.register(3);
1106
1107        // Unregister middle one
1108        registry.unregister(2);
1109
1110        // New registration should reuse the slot
1111        assert!(registry.register(4).is_some());
1112        assert_eq!(registry.active_strands().count(), 3);
1113    }
1114
1115    #[test]
1116    #[cfg(feature = "diagnostics")]
1117    fn test_strand_registry_concurrent_stress() {
1118        use std::sync::Arc;
1119        use std::thread;
1120
1121        let registry = Arc::new(StrandRegistry::new(50)); // Moderate capacity
1122
1123        let handles: Vec<_> = (0..100)
1124            .map(|i| {
1125                let reg = Arc::clone(&registry);
1126                thread::spawn(move || {
1127                    let id = (i + 1) as u64;
1128                    // Register
1129                    let _ = reg.register(id);
1130                    // Brief work
1131                    thread::yield_now();
1132                    // Unregister
1133                    reg.unregister(id);
1134                })
1135            })
1136            .collect();
1137
1138        for h in handles {
1139            h.join().unwrap();
1140        }
1141
1142        // All slots should be free after all threads complete
1143        assert_eq!(registry.active_strands().count(), 0);
1144    }
1145
1146    #[test]
1147    fn test_strand_lifecycle_counters() {
1148        unsafe {
1149            // Reset counters for isolation (not perfect but helps)
1150            let initial_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
1151            let initial_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
1152
1153            static COUNTER: AtomicU32 = AtomicU32::new(0);
1154
1155            extern "C" fn simple_work(_stack: Stack) -> Stack {
1156                COUNTER.fetch_add(1, Ordering::SeqCst);
1157                std::ptr::null_mut()
1158            }
1159
1160            COUNTER.store(0, Ordering::SeqCst);
1161
1162            // Spawn some strands
1163            for _ in 0..10 {
1164                strand_spawn(simple_work, std::ptr::null_mut());
1165            }
1166
1167            wait_all_strands();
1168
1169            // Verify counters incremented
1170            let final_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
1171            let final_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
1172
1173            assert!(
1174                final_spawned >= initial_spawned + 10,
1175                "TOTAL_SPAWNED should have increased by at least 10"
1176            );
1177            assert!(
1178                final_completed >= initial_completed + 10,
1179                "TOTAL_COMPLETED should have increased by at least 10"
1180            );
1181            assert_eq!(COUNTER.load(Ordering::SeqCst), 10);
1182        }
1183    }
1184
1185    // =========================================================================
1186    // Yield Safety Valve Tests
1187    // =========================================================================
1188
1189    #[test]
1190    fn test_maybe_yield_disabled_by_default() {
1191        // When SEQ_YIELD_INTERVAL is not set (or 0), maybe_yield should be a no-op
1192        // This test verifies it doesn't panic and returns quickly
1193        for _ in 0..1000 {
1194            patch_seq_maybe_yield();
1195        }
1196    }
1197
1198    #[test]
1199    fn test_tail_call_counter_increments() {
1200        // Verify the thread-local counter increments correctly
1201        TAIL_CALL_COUNTER.with(|counter| {
1202            let initial = counter.get();
1203            patch_seq_maybe_yield();
1204            patch_seq_maybe_yield();
1205            patch_seq_maybe_yield();
1206            // Counter should have incremented (if threshold > 0) or stayed same (if disabled)
1207            // Either way, it shouldn't panic
1208            let _ = counter.get();
1209            // Reset to avoid affecting other tests
1210            counter.set(initial);
1211        });
1212    }
1213
1214    #[test]
1215    fn test_counter_overflow_safety() {
1216        // Verify wrapping_add prevents overflow panic
1217        TAIL_CALL_COUNTER.with(|counter| {
1218            let initial = counter.get();
1219            // Set counter near max to test overflow behavior
1220            counter.set(u64::MAX - 1);
1221            // These calls should not panic due to overflow
1222            patch_seq_maybe_yield();
1223            patch_seq_maybe_yield();
1224            patch_seq_maybe_yield();
1225            // Reset
1226            counter.set(initial);
1227        });
1228    }
1229}