seq_runtime/scheduler.rs
1//! Scheduler - Green Thread Management with May
2//!
3//! CSP-style concurrency for Seq using May coroutines.
4//! Each strand is a lightweight green thread that can communicate via channels.
5//!
6//! ## Non-Blocking Guarantee
7//!
8//! Channel operations (`send`, `receive`) use May's cooperative blocking and NEVER
9//! block OS threads. However, I/O operations (`write_line`, `read_line` in io.rs)
10//! currently use blocking syscalls. Future work will make all I/O non-blocking.
11//!
12//! ## Panic Behavior
13//!
14//! Functions panic on invalid input (null stacks, negative IDs, closed channels).
15//! In a production system, consider implementing error channels or Result-based
16//! error handling instead of panicking.
17
18use crate::stack::Stack;
19use crate::tagged_stack::StackValue;
20use may::coroutine;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::{Condvar, Mutex, Once};
23
24static SCHEDULER_INIT: Once = Once::new();
25
26// Strand lifecycle tracking
27//
28// Design rationale:
29// - ACTIVE_STRANDS: Lock-free atomic counter for the hot path (spawn/complete)
30// Every strand increments on spawn, decrements on complete. This is extremely
31// fast (lock-free atomic ops) and suitable for high-frequency operations.
32//
33// - SHUTDOWN_CONDVAR/MUTEX: Event-driven synchronization for the cold path (shutdown wait)
34// Used only when waiting for all strands to complete (program shutdown).
35// Condvar provides event-driven wakeup instead of polling, which is critical
36// for a systems language - no CPU waste, proper OS-level blocking.
37//
38// Why not track JoinHandles?
39// Strands are like Erlang processes - potentially hundreds of thousands of concurrent
40// entities with independent lifecycles. Storing handles would require global mutable
41// state with synchronization overhead on the hot path. The counter + condvar approach
42// keeps the hot path lock-free while providing proper shutdown synchronization.
43pub static ACTIVE_STRANDS: AtomicUsize = AtomicUsize::new(0);
44pub(crate) static SHUTDOWN_CONDVAR: Condvar = Condvar::new();
45pub(crate) static SHUTDOWN_MUTEX: Mutex<()> = Mutex::new(());
46
47// Strand lifecycle statistics (for diagnostics)
48//
49// These counters provide observability into strand lifecycle without any locking.
50// All operations are lock-free atomic increments/loads.
51//
52// - TOTAL_SPAWNED: Monotonically increasing count of all strands ever spawned
53// - TOTAL_COMPLETED: Monotonically increasing count of all strands that completed
54// - PEAK_STRANDS: High-water mark of concurrent strands (helps detect strand leaks)
55//
56// Useful diagnostics:
57// - Currently running: ACTIVE_STRANDS
58// - Completed successfully: TOTAL_COMPLETED
59// - Potential leaks: TOTAL_SPAWNED - TOTAL_COMPLETED - ACTIVE_STRANDS > 0 (strands lost)
60// - Peak concurrency: PEAK_STRANDS
61pub static TOTAL_SPAWNED: AtomicU64 = AtomicU64::new(0);
62pub static TOTAL_COMPLETED: AtomicU64 = AtomicU64::new(0);
63pub static PEAK_STRANDS: AtomicUsize = AtomicUsize::new(0);
64
65// Unique strand ID generation
66static NEXT_STRAND_ID: AtomicU64 = AtomicU64::new(1);
67
68// =============================================================================
69// Lock-Free Strand Registry (only when diagnostics feature is enabled)
70// =============================================================================
71//
72// A fixed-size array of slots for tracking active strands without locks.
73// Each slot stores a strand ID (0 = free) and spawn timestamp.
74//
75// Design principles:
76// - Fixed size: No dynamic allocation, predictable memory footprint
77// - Lock-free: All operations use atomic CAS, no mutex contention
78// - Bounded: If registry is full, strands still run but aren't tracked
79// - Zero cost when not querying: Only diagnostics reads the registry
80//
81// Slot encoding:
82// - strand_id == 0: slot is free
83// - strand_id > 0: slot contains an active strand
84//
85// The registry size can be configured via SEQ_STRAND_REGISTRY_SIZE env var.
86// Default is 1024 slots, which is sufficient for most applications.
87//
88// When the "diagnostics" feature is disabled, the registry is not compiled,
89// eliminating the SystemTime::now() syscall and O(n) scans on every spawn.
90
91#[cfg(feature = "diagnostics")]
92/// Default strand registry size (number of trackable concurrent strands)
93const DEFAULT_REGISTRY_SIZE: usize = 1024;
94
95#[cfg(feature = "diagnostics")]
96/// A slot in the strand registry
97///
98/// Uses two atomics to store strand info without locks.
99/// A slot is free when strand_id == 0.
100pub struct StrandSlot {
101 /// Strand ID (0 = free, >0 = active strand)
102 pub strand_id: AtomicU64,
103 /// Spawn timestamp (seconds since UNIX epoch, for detecting stuck strands)
104 pub spawn_time: AtomicU64,
105}
106
107#[cfg(feature = "diagnostics")]
108impl StrandSlot {
109 const fn new() -> Self {
110 Self {
111 strand_id: AtomicU64::new(0),
112 spawn_time: AtomicU64::new(0),
113 }
114 }
115}
116
117#[cfg(feature = "diagnostics")]
118/// Lock-free strand registry
119///
120/// Provides O(n) registration (scan for free slot) and O(n) unregistration.
121/// This is acceptable because:
122/// 1. N is bounded (default 1024)
123/// 2. Registration/unregistration are infrequent compared to strand work
124/// 3. No locks means no contention, just atomic ops
125pub struct StrandRegistry {
126 slots: Box<[StrandSlot]>,
127 /// Number of slots that couldn't be registered (registry full)
128 pub overflow_count: AtomicU64,
129}
130
131#[cfg(feature = "diagnostics")]
132impl StrandRegistry {
133 /// Create a new registry with the given capacity
134 fn new(capacity: usize) -> Self {
135 let mut slots = Vec::with_capacity(capacity);
136 for _ in 0..capacity {
137 slots.push(StrandSlot::new());
138 }
139 Self {
140 slots: slots.into_boxed_slice(),
141 overflow_count: AtomicU64::new(0),
142 }
143 }
144
145 /// Register a strand, returning the slot index if successful
146 ///
147 /// Uses CAS to atomically claim a free slot.
148 /// Returns None if the registry is full (strand still runs, just not tracked).
149 pub fn register(&self, strand_id: u64) -> Option<usize> {
150 let spawn_time = std::time::SystemTime::now()
151 .duration_since(std::time::UNIX_EPOCH)
152 .map(|d| d.as_secs())
153 .unwrap_or(0);
154
155 // Scan for a free slot
156 for (idx, slot) in self.slots.iter().enumerate() {
157 // Set spawn time first, before claiming the slot
158 // This prevents a race where a reader sees strand_id != 0 but spawn_time == 0
159 // If we fail to claim the slot, the owner will overwrite this value anyway
160 slot.spawn_time.store(spawn_time, Ordering::Relaxed);
161
162 // Try to claim this slot (CAS from 0 to strand_id)
163 // AcqRel ensures the spawn_time write above is visible before strand_id becomes non-zero
164 if slot
165 .strand_id
166 .compare_exchange(0, strand_id, Ordering::AcqRel, Ordering::Relaxed)
167 .is_ok()
168 {
169 return Some(idx);
170 }
171 }
172
173 // Registry full - track overflow but strand still runs
174 self.overflow_count.fetch_add(1, Ordering::Relaxed);
175 None
176 }
177
178 /// Unregister a strand by ID
179 ///
180 /// Scans for the slot containing this strand ID and clears it.
181 /// Returns true if found and cleared, false if not found.
182 ///
183 /// Note: ABA problem is not a concern here because strand IDs are monotonically
184 /// increasing u64 values. ID reuse would require 2^64 spawns, which is practically
185 /// impossible (at 1 billion spawns/sec, it would take ~584 years).
186 pub fn unregister(&self, strand_id: u64) -> bool {
187 for slot in self.slots.iter() {
188 // Check if this slot contains our strand
189 if slot
190 .strand_id
191 .compare_exchange(strand_id, 0, Ordering::AcqRel, Ordering::Relaxed)
192 .is_ok()
193 {
194 // Successfully cleared the slot
195 slot.spawn_time.store(0, Ordering::Release);
196 return true;
197 }
198 }
199 false
200 }
201
202 /// Iterate over active strands (for diagnostics)
203 ///
204 /// Returns an iterator of (strand_id, spawn_time) for non-empty slots.
205 /// Note: This is a snapshot and may be slightly inconsistent due to concurrent updates.
206 pub fn active_strands(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
207 self.slots.iter().filter_map(|slot| {
208 // Acquire on strand_id synchronizes with the Release in register()
209 let id = slot.strand_id.load(Ordering::Acquire);
210 if id > 0 {
211 // Relaxed is sufficient here - we've already synchronized via strand_id Acquire
212 // and spawn_time is written before strand_id in register()
213 let time = slot.spawn_time.load(Ordering::Relaxed);
214 Some((id, time))
215 } else {
216 None
217 }
218 })
219 }
220
221 /// Get the registry capacity
222 pub fn capacity(&self) -> usize {
223 self.slots.len()
224 }
225}
226
227// Global strand registry (lazy initialized)
228#[cfg(feature = "diagnostics")]
229static STRAND_REGISTRY: std::sync::OnceLock<StrandRegistry> = std::sync::OnceLock::new();
230
231/// Get or initialize the global strand registry
232#[cfg(feature = "diagnostics")]
233pub fn strand_registry() -> &'static StrandRegistry {
234 STRAND_REGISTRY.get_or_init(|| {
235 let size = std::env::var("SEQ_STRAND_REGISTRY_SIZE")
236 .ok()
237 .and_then(|s| s.parse().ok())
238 .unwrap_or(DEFAULT_REGISTRY_SIZE);
239 StrandRegistry::new(size)
240 })
241}
242
243/// Default coroutine stack size: 128KB (0x20000 bytes)
244/// Reduced from 1MB for better spawn performance (~16% faster in benchmarks).
245/// Can be overridden via SEQ_STACK_SIZE environment variable.
246const DEFAULT_STACK_SIZE: usize = 0x20000;
247
248/// Parse stack size from an optional string value.
249/// Returns the parsed size, or DEFAULT_STACK_SIZE if the value is missing, zero, or invalid.
250/// Prints a warning to stderr for invalid values.
251fn parse_stack_size(env_value: Option<String>) -> usize {
252 match env_value {
253 Some(val) => match val.parse::<usize>() {
254 Ok(0) => {
255 eprintln!(
256 "Warning: SEQ_STACK_SIZE=0 is invalid, using default {}",
257 DEFAULT_STACK_SIZE
258 );
259 DEFAULT_STACK_SIZE
260 }
261 Ok(size) => size,
262 Err(_) => {
263 eprintln!(
264 "Warning: SEQ_STACK_SIZE='{}' is not a valid number, using default {}",
265 val, DEFAULT_STACK_SIZE
266 );
267 DEFAULT_STACK_SIZE
268 }
269 },
270 None => DEFAULT_STACK_SIZE,
271 }
272}
273
274/// Default coroutine pool capacity.
275/// May reuses completed coroutine stacks from this pool to avoid allocations.
276/// Default of 1000 is often too small for spawn-heavy workloads.
277const DEFAULT_POOL_CAPACITY: usize = 10000;
278
279/// Initialize the scheduler.
280///
281/// # Safety
282/// Safe to call multiple times (idempotent via Once).
283/// Configures May coroutines with appropriate stack size for LLVM-generated code.
284#[unsafe(no_mangle)]
285pub unsafe extern "C" fn patch_seq_scheduler_init() {
286 SCHEDULER_INIT.call_once(|| {
287 // Configure stack size for coroutines
288 // Default is 128KB, reduced from 1MB for better spawn performance.
289 // Can be overridden via SEQ_STACK_SIZE environment variable (in bytes)
290 // Example: SEQ_STACK_SIZE=2097152 for 2MB
291 // Invalid values (non-numeric, zero) are warned and ignored.
292 let stack_size = parse_stack_size(std::env::var("SEQ_STACK_SIZE").ok());
293
294 // Configure coroutine pool capacity
295 // May reuses coroutine stacks from this pool to reduce allocation overhead.
296 // Default 10000 is 10x May's default (1000), better for spawn-heavy workloads.
297 // Can be overridden via SEQ_POOL_CAPACITY environment variable.
298 let pool_capacity = std::env::var("SEQ_POOL_CAPACITY")
299 .ok()
300 .and_then(|s| s.parse().ok())
301 .filter(|&v| v > 0)
302 .unwrap_or(DEFAULT_POOL_CAPACITY);
303
304 may::config()
305 .set_stack_size(stack_size)
306 .set_pool_capacity(pool_capacity);
307
308 // Install SIGINT handler for Ctrl-C (unconditional - basic expected behavior)
309 // Without this, tight loops won't respond to Ctrl-C because signals
310 // are only delivered at syscall boundaries, and TCO loops may never syscall.
311 #[cfg(unix)]
312 {
313 use std::sync::atomic::{AtomicBool, Ordering};
314 static SIGINT_RECEIVED: AtomicBool = AtomicBool::new(false);
315
316 extern "C" fn sigint_handler(_: libc::c_int) {
317 // If we receive SIGINT twice, force exit (user is insistent)
318 if SIGINT_RECEIVED.swap(true, Ordering::SeqCst) {
319 // Second SIGINT - exit immediately
320 unsafe { libc::_exit(130) }; // 128 + 2 (SIGINT)
321 }
322 // First SIGINT - exit cleanly
323 std::process::exit(130);
324 }
325
326 unsafe {
327 libc::signal(libc::SIGINT, sigint_handler as libc::sighandler_t);
328 }
329 }
330
331 // Install SIGQUIT handler for runtime diagnostics (kill -3)
332 #[cfg(feature = "diagnostics")]
333 crate::diagnostics::install_signal_handler();
334
335 // Install watchdog timer (if enabled via SEQ_WATCHDOG_SECS)
336 #[cfg(feature = "diagnostics")]
337 crate::watchdog::install_watchdog();
338 });
339}
340
341/// Run the scheduler and wait for all coroutines to complete
342///
343/// # Safety
344/// Returns the final stack (always null for now since May handles all scheduling).
345/// This function blocks until all spawned strands have completed.
346///
347/// Uses a condition variable for event-driven shutdown synchronization rather than
348/// polling. The mutex is only held during the wait protocol, not during strand
349/// execution, so there's no contention on the hot path.
350#[unsafe(no_mangle)]
351pub unsafe extern "C" fn patch_seq_scheduler_run() -> Stack {
352 let mut guard = SHUTDOWN_MUTEX.lock().expect(
353 "scheduler_run: shutdown mutex poisoned - strand panicked during shutdown synchronization",
354 );
355
356 // Wait for all strands to complete
357 // The condition variable will be notified when the last strand exits
358 while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
359 guard = SHUTDOWN_CONDVAR
360 .wait(guard)
361 .expect("scheduler_run: condvar wait failed - strand panicked during shutdown wait");
362 }
363
364 // All strands have completed
365 std::ptr::null_mut()
366}
367
368/// Shutdown the scheduler
369///
370/// # Safety
371/// Safe to call. May doesn't require explicit shutdown, so this is a no-op.
372#[unsafe(no_mangle)]
373pub unsafe extern "C" fn patch_seq_scheduler_shutdown() {
374 // May doesn't require explicit shutdown
375 // This function exists for API symmetry with init
376}
377
378/// Spawn a strand (coroutine) with initial stack
379///
380/// # Safety
381/// - `entry` must be a valid function pointer that can safely execute on any thread
382/// - `initial_stack` must be either null or a valid pointer to a `StackValue` that:
383/// - Was heap-allocated (e.g., via Box)
384/// - Has a 'static lifetime or lives longer than the coroutine
385/// - Is safe to access from the spawned thread
386/// - The caller transfers ownership of `initial_stack` to the coroutine
387/// - Returns a unique strand ID (positive integer)
388///
389/// # Memory Management
390/// The spawned coroutine takes ownership of `initial_stack` and will automatically
391/// free the final stack returned by `entry` upon completion.
392#[unsafe(no_mangle)]
393pub unsafe extern "C" fn patch_seq_strand_spawn(
394 entry: extern "C" fn(Stack) -> Stack,
395 initial_stack: Stack,
396) -> i64 {
397 // For backwards compatibility, use null base (won't support nested spawns)
398 unsafe { patch_seq_strand_spawn_with_base(entry, initial_stack, std::ptr::null_mut()) }
399}
400
401/// Spawn a strand (coroutine) with initial stack and explicit stack base
402///
403/// This variant allows setting the STACK_BASE for the spawned strand, which is
404/// required for the child to perform operations like clone_stack (nested spawn).
405///
406/// # Safety
407/// - `entry` must be a valid function pointer that can safely execute on any thread
408/// - `initial_stack` must be a valid pointer to a `StackValue` array
409/// - `stack_base` must be the base of the stack (or null to skip setting STACK_BASE)
410/// - The caller transfers ownership of `initial_stack` to the coroutine
411/// - Returns a unique strand ID (positive integer)
412#[unsafe(no_mangle)]
413pub unsafe extern "C" fn patch_seq_strand_spawn_with_base(
414 entry: extern "C" fn(Stack) -> Stack,
415 initial_stack: Stack,
416 stack_base: Stack,
417) -> i64 {
418 // Generate unique strand ID
419 let strand_id = NEXT_STRAND_ID.fetch_add(1, Ordering::Relaxed);
420
421 // Increment active strand counter and track total spawned
422 let new_count = ACTIVE_STRANDS.fetch_add(1, Ordering::Release) + 1;
423 TOTAL_SPAWNED.fetch_add(1, Ordering::Relaxed);
424
425 // Update peak strands if this is a new high-water mark
426 // Uses a CAS loop to safely update the maximum without locks
427 // Uses Acquire/Release ordering for proper synchronization with diagnostics reads
428 let mut peak = PEAK_STRANDS.load(Ordering::Acquire);
429 while new_count > peak {
430 match PEAK_STRANDS.compare_exchange_weak(
431 peak,
432 new_count,
433 Ordering::Release,
434 Ordering::Relaxed,
435 ) {
436 Ok(_) => break,
437 Err(current) => peak = current,
438 }
439 }
440
441 // Register strand in the registry (for diagnostics visibility)
442 // If registry is full, strand still runs but isn't tracked
443 #[cfg(feature = "diagnostics")]
444 let _ = strand_registry().register(strand_id);
445
446 // Function pointers are already Send, no wrapper needed
447 let entry_fn = entry;
448
449 // Convert pointers to usize (which is Send)
450 // This is necessary because *mut T is !Send, but the caller guarantees thread safety
451 let stack_addr = initial_stack as usize;
452 let base_addr = stack_base as usize;
453
454 unsafe {
455 coroutine::spawn(move || {
456 // Reconstruct pointers from addresses
457 let stack_ptr = stack_addr as *mut StackValue;
458 let base_ptr = base_addr as *mut StackValue;
459
460 // Debug assertion: validate stack pointer alignment and reasonable address
461 debug_assert!(
462 stack_ptr.is_null()
463 || stack_addr.is_multiple_of(std::mem::align_of::<StackValue>()),
464 "Stack pointer must be null or properly aligned"
465 );
466 debug_assert!(
467 stack_ptr.is_null() || stack_addr > 0x1000,
468 "Stack pointer appears to be in invalid memory region (< 0x1000)"
469 );
470
471 // Set STACK_BASE for this strand if provided
472 // This enables nested spawns and other operations that need clone_stack
473 if !base_ptr.is_null() {
474 crate::stack::patch_seq_set_stack_base(base_ptr);
475 }
476
477 // Execute the entry function
478 let final_stack = entry_fn(stack_ptr);
479
480 // Clean up the final stack to prevent memory leak
481 free_stack(final_stack);
482
483 // Unregister strand from registry (uses captured strand_id)
484 #[cfg(feature = "diagnostics")]
485 strand_registry().unregister(strand_id);
486
487 // Decrement active strand counter first, then track completion
488 // This ordering ensures the invariant SPAWNED = COMPLETED + ACTIVE + lost
489 // is never violated from an external observer's perspective
490 // Use AcqRel to establish proper synchronization (both acquire and release barriers)
491 let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
492
493 // Track completion after decrementing active count
494 TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
495 if prev_count == 1 {
496 // We were the last strand - acquire mutex and signal shutdown
497 // The mutex must be held when calling notify to prevent missed wakeups
498 let _guard = SHUTDOWN_MUTEX.lock()
499 .expect("strand_spawn: shutdown mutex poisoned - strand panicked during shutdown notification");
500 SHUTDOWN_CONDVAR.notify_all();
501 }
502 });
503 }
504
505 strand_id as i64
506}
507
508/// Free a stack allocated by the runtime
509///
510/// With the tagged stack implementation, stack cleanup is handled differently.
511/// The contiguous array is freed when the TaggedStack is dropped.
512/// This function just resets the thread-local arena.
513///
514/// # Safety
515/// Stack pointer must be valid or null.
516fn free_stack(_stack: Stack) {
517 // With tagged stack, the array is freed when TaggedStack is dropped.
518 // We just need to reset the arena for thread-local strings.
519
520 // Reset the thread-local arena to free all arena-allocated strings
521 // This is safe because:
522 // - Any arena strings in Values have been dropped above
523 // - Global strings are unaffected (they have their own allocations)
524 // - Channel sends clone to global, so no cross-strand arena pointers
525 crate::arena::arena_reset();
526}
527
528/// Legacy spawn_strand function (kept for compatibility)
529///
530/// # Safety
531/// `entry` must be a valid function pointer that can safely execute on any thread.
532#[unsafe(no_mangle)]
533pub unsafe extern "C" fn patch_seq_spawn_strand(entry: extern "C" fn(Stack) -> Stack) {
534 unsafe {
535 patch_seq_strand_spawn(entry, std::ptr::null_mut());
536 }
537}
538
539/// Yield execution to allow other coroutines to run
540///
541/// # Safety
542/// Always safe to call from within a May coroutine.
543#[unsafe(no_mangle)]
544pub unsafe extern "C" fn patch_seq_yield_strand(stack: Stack) -> Stack {
545 coroutine::yield_now();
546 stack
547}
548
549// =============================================================================
550// Cooperative Yield Safety Valve
551// =============================================================================
552//
553// Prevents tight TCO loops from starving other strands and making the process
554// unresponsive. When enabled via SEQ_YIELD_INTERVAL, yields after N tail calls.
555//
556// Configuration:
557// SEQ_YIELD_INTERVAL=10000 - Yield every 10,000 tail calls (default: 0 = disabled)
558//
559// Scope:
560// - Covers: User-defined word tail calls (musttail) and quotation tail calls
561// - Does NOT cover: Closure calls (they use regular calls, bounded by stack)
562// - Does NOT cover: Non-tail recursive calls (bounded by stack)
563// This is intentional: the safety valve targets unbounded TCO loops.
564//
565// Design:
566// - Zero overhead when disabled (threshold=0 short-circuits immediately)
567// - Thread-local counter avoids synchronization overhead
568// - Called before every musttail in generated code
569// - Threshold is cached on first access via OnceLock
570//
571// Thread-Local Counter Behavior:
572// The counter is per-OS-thread, not per-coroutine. Multiple coroutines on the
573// same OS thread share the counter, which may cause yields slightly more
574// frequently than the configured interval. This is intentional:
575// - Avoids coroutine-local storage overhead
576// - Still achieves the goal of preventing starvation
577// - Actual yield frequency is still bounded by the threshold
578
579use std::cell::Cell;
580use std::sync::OnceLock;
581
582/// Cached yield interval threshold (0 = disabled)
583static YIELD_THRESHOLD: OnceLock<u64> = OnceLock::new();
584
585thread_local! {
586 /// Per-thread tail call counter
587 static TAIL_CALL_COUNTER: Cell<u64> = const { Cell::new(0) };
588}
589
590/// Get the yield threshold from environment (cached)
591///
592/// Returns 0 (disabled) if SEQ_YIELD_INTERVAL is not set or invalid.
593/// Prints a warning to stderr if the value is set but invalid.
594fn get_yield_threshold() -> u64 {
595 *YIELD_THRESHOLD.get_or_init(|| {
596 match std::env::var("SEQ_YIELD_INTERVAL") {
597 Ok(s) if s.is_empty() => 0,
598 Ok(s) => match s.parse::<u64>() {
599 Ok(n) => n,
600 Err(_) => {
601 eprintln!(
602 "Warning: SEQ_YIELD_INTERVAL='{}' is not a valid positive integer, yield safety valve disabled",
603 s
604 );
605 0
606 }
607 },
608 Err(_) => 0,
609 }
610 })
611}
612
613/// Maybe yield to other coroutines based on tail call count
614///
615/// Called before every tail call in generated code. When SEQ_YIELD_INTERVAL
616/// is set, yields after that many tail calls to prevent starvation.
617///
618/// # Performance
619/// - Disabled (default): Single branch on cached threshold (< 1ns)
620/// - Enabled: Increment + compare + occasional yield (~10-20ns average)
621///
622/// # Safety
623/// Always safe to call. No-op when not in a May coroutine context.
624#[unsafe(no_mangle)]
625pub extern "C" fn patch_seq_maybe_yield() {
626 let threshold = get_yield_threshold();
627
628 // Fast path: disabled
629 if threshold == 0 {
630 return;
631 }
632
633 TAIL_CALL_COUNTER.with(|counter| {
634 let count = counter.get().wrapping_add(1);
635 counter.set(count);
636
637 if count >= threshold {
638 counter.set(0);
639 coroutine::yield_now();
640 }
641 });
642}
643
644/// Wait for all strands to complete
645///
646/// # Safety
647/// Always safe to call. Blocks until all spawned strands have completed.
648///
649/// Uses event-driven synchronization via condition variable - no polling overhead.
650#[unsafe(no_mangle)]
651pub unsafe extern "C" fn patch_seq_wait_all_strands() {
652 let mut guard = SHUTDOWN_MUTEX.lock()
653 .expect("wait_all_strands: shutdown mutex poisoned - strand panicked during shutdown synchronization");
654
655 // Wait for all strands to complete
656 // The condition variable will be notified when the last strand exits
657 while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
658 guard = SHUTDOWN_CONDVAR
659 .wait(guard)
660 .expect("wait_all_strands: condvar wait failed - strand panicked during shutdown wait");
661 }
662}
663
664// Public re-exports with short names for internal use
665pub use patch_seq_maybe_yield as maybe_yield;
666pub use patch_seq_scheduler_init as scheduler_init;
667pub use patch_seq_scheduler_run as scheduler_run;
668pub use patch_seq_scheduler_shutdown as scheduler_shutdown;
669pub use patch_seq_spawn_strand as spawn_strand;
670pub use patch_seq_strand_spawn as strand_spawn;
671pub use patch_seq_wait_all_strands as wait_all_strands;
672pub use patch_seq_yield_strand as yield_strand;
673
674#[cfg(test)]
675mod tests {
676 use super::*;
677 use crate::stack::push;
678 use crate::value::Value;
679 use std::sync::atomic::{AtomicU32, Ordering};
680
681 #[test]
682 fn test_spawn_strand() {
683 unsafe {
684 static COUNTER: AtomicU32 = AtomicU32::new(0);
685
686 extern "C" fn test_entry(_stack: Stack) -> Stack {
687 COUNTER.fetch_add(1, Ordering::SeqCst);
688 std::ptr::null_mut()
689 }
690
691 for _ in 0..100 {
692 spawn_strand(test_entry);
693 }
694
695 std::thread::sleep(std::time::Duration::from_millis(200));
696 assert_eq!(COUNTER.load(Ordering::SeqCst), 100);
697 }
698 }
699
700 #[test]
701 fn test_scheduler_init_idempotent() {
702 unsafe {
703 // Should be safe to call multiple times
704 scheduler_init();
705 scheduler_init();
706 scheduler_init();
707 }
708 }
709
710 #[test]
711 fn test_free_stack_null() {
712 // Freeing null should be a no-op
713 free_stack(std::ptr::null_mut());
714 }
715
716 #[test]
717 fn test_free_stack_valid() {
718 unsafe {
719 // Create a stack, then free it
720 let stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
721 free_stack(stack);
722 // If we get here without crashing, test passed
723 }
724 }
725
726 #[test]
727 fn test_strand_spawn_with_stack() {
728 unsafe {
729 static COUNTER: AtomicU32 = AtomicU32::new(0);
730
731 extern "C" fn test_entry(stack: Stack) -> Stack {
732 COUNTER.fetch_add(1, Ordering::SeqCst);
733 // Return the stack as-is (caller will free it)
734 stack
735 }
736
737 let initial_stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
738 strand_spawn(test_entry, initial_stack);
739
740 std::thread::sleep(std::time::Duration::from_millis(200));
741 assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
742 }
743 }
744
745 #[test]
746 fn test_scheduler_shutdown() {
747 unsafe {
748 scheduler_init();
749 scheduler_shutdown();
750 // Should not crash
751 }
752 }
753
754 #[test]
755 fn test_many_strands_stress() {
756 unsafe {
757 static COUNTER: AtomicU32 = AtomicU32::new(0);
758
759 extern "C" fn increment(_stack: Stack) -> Stack {
760 COUNTER.fetch_add(1, Ordering::SeqCst);
761 std::ptr::null_mut()
762 }
763
764 // Reset counter for this test
765 COUNTER.store(0, Ordering::SeqCst);
766
767 // Spawn many strands to stress test synchronization
768 for _ in 0..1000 {
769 strand_spawn(increment, std::ptr::null_mut());
770 }
771
772 // Wait for all to complete
773 wait_all_strands();
774
775 // Verify all strands executed
776 assert_eq!(COUNTER.load(Ordering::SeqCst), 1000);
777 }
778 }
779
780 #[test]
781 fn test_strand_ids_are_unique() {
782 unsafe {
783 use std::collections::HashSet;
784
785 extern "C" fn noop(_stack: Stack) -> Stack {
786 std::ptr::null_mut()
787 }
788
789 // Spawn strands and collect their IDs
790 let mut ids = Vec::new();
791 for _ in 0..100 {
792 let id = strand_spawn(noop, std::ptr::null_mut());
793 ids.push(id);
794 }
795
796 // Wait for completion
797 wait_all_strands();
798
799 // Verify all IDs are unique
800 let unique_ids: HashSet<_> = ids.iter().collect();
801 assert_eq!(unique_ids.len(), 100, "All strand IDs should be unique");
802
803 // Verify all IDs are positive
804 assert!(
805 ids.iter().all(|&id| id > 0),
806 "All strand IDs should be positive"
807 );
808 }
809 }
810
811 #[test]
812 fn test_arena_reset_with_strands() {
813 unsafe {
814 use crate::arena;
815 use crate::seqstring::arena_string;
816
817 extern "C" fn create_temp_strings(stack: Stack) -> Stack {
818 // Create many temporary arena strings (simulating request parsing)
819 for i in 0..100 {
820 let temp = arena_string(&format!("temporary string {}", i));
821 // Use the string temporarily
822 assert!(!temp.as_str().is_empty());
823 // String is dropped, but memory stays in arena
824 }
825
826 // Arena should have allocated memory
827 let stats = arena::arena_stats();
828 assert!(stats.allocated_bytes > 0, "Arena should have allocations");
829
830 stack // Return empty stack
831 }
832
833 // Reset arena before test
834 arena::arena_reset();
835
836 // Spawn strand that creates many temp strings
837 strand_spawn(create_temp_strings, std::ptr::null_mut());
838
839 // Wait for strand to complete (which calls free_stack -> arena_reset)
840 wait_all_strands();
841
842 // After strand exits, arena should be reset
843 let stats_after = arena::arena_stats();
844 assert_eq!(
845 stats_after.allocated_bytes, 0,
846 "Arena should be reset after strand exits"
847 );
848 }
849 }
850
851 #[test]
852 fn test_arena_with_channel_send() {
853 unsafe {
854 use crate::channel::{close_channel, make_channel, receive, send};
855 use crate::stack::{pop, push};
856 use crate::value::Value;
857 use std::sync::Arc;
858 use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
859
860 static RECEIVED_COUNT: AtomicU32 = AtomicU32::new(0);
861 static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
862
863 // Create channel
864 let stack = crate::stack::alloc_test_stack();
865 let stack = make_channel(stack);
866 let (stack, chan_val) = pop(stack);
867 let channel = match chan_val {
868 Value::Channel(ch) => ch,
869 _ => panic!("Expected Channel"),
870 };
871
872 // Store channel pointer for strands
873 let ch_ptr = Arc::as_ptr(&channel) as i64;
874 CHANNEL_PTR.store(ch_ptr, Ordering::Release);
875
876 // Keep Arc alive
877 std::mem::forget(channel.clone());
878 std::mem::forget(channel.clone());
879
880 // Sender strand: creates arena string, sends through channel
881 extern "C" fn sender(_stack: Stack) -> Stack {
882 use crate::seqstring::arena_string;
883 use crate::value::ChannelData;
884 use std::sync::Arc;
885
886 unsafe {
887 let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
888 let channel = Arc::from_raw(ch_ptr);
889 let channel_clone = Arc::clone(&channel);
890 std::mem::forget(channel); // Don't drop
891
892 // Create arena string
893 let msg = arena_string("Hello from sender!");
894
895 // Push string and channel for send
896 let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
897 let stack = push(stack, Value::Channel(channel_clone));
898
899 // Send (will clone to global)
900 send(stack)
901 }
902 }
903
904 // Receiver strand: receives string from channel
905 extern "C" fn receiver(_stack: Stack) -> Stack {
906 use crate::value::ChannelData;
907 use std::sync::Arc;
908 use std::sync::atomic::Ordering;
909
910 unsafe {
911 let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
912 let channel = Arc::from_raw(ch_ptr);
913 let channel_clone = Arc::clone(&channel);
914 std::mem::forget(channel); // Don't drop
915
916 // Push channel for receive
917 let stack = push(
918 crate::stack::alloc_test_stack(),
919 Value::Channel(channel_clone),
920 );
921
922 // Receive message (returns value, success_flag)
923 let stack = receive(stack);
924
925 // Pop success flag first, then message
926 let (stack, _success) = pop(stack);
927 let (_stack, msg_val) = pop(stack);
928 match msg_val {
929 Value::String(s) => {
930 assert_eq!(s.as_str(), "Hello from sender!");
931 RECEIVED_COUNT.fetch_add(1, Ordering::SeqCst);
932 }
933 _ => panic!("Expected String"),
934 }
935
936 std::ptr::null_mut()
937 }
938 }
939
940 // Spawn sender and receiver
941 spawn_strand(sender);
942 spawn_strand(receiver);
943
944 // Wait for both strands
945 wait_all_strands();
946
947 // Verify message was received
948 assert_eq!(
949 RECEIVED_COUNT.load(Ordering::SeqCst),
950 1,
951 "Receiver should have received message"
952 );
953
954 // Clean up channel
955 let stack = push(stack, Value::Channel(channel));
956 close_channel(stack);
957 }
958 }
959
960 #[test]
961 fn test_no_memory_leak_over_many_iterations() {
962 // PR #11 feedback: Verify 10K+ strand iterations don't cause memory growth
963 unsafe {
964 use crate::arena;
965 use crate::seqstring::arena_string;
966
967 extern "C" fn allocate_strings_and_exit(stack: Stack) -> Stack {
968 // Simulate request processing: many temp allocations
969 for i in 0..50 {
970 let temp = arena_string(&format!("request header {}", i));
971 assert!(!temp.as_str().is_empty());
972 // Strings dropped here but arena memory stays allocated
973 }
974 stack
975 }
976
977 // Run many iterations to detect leaks
978 let iterations = 10_000;
979
980 for i in 0..iterations {
981 // Reset arena before each iteration to start fresh
982 arena::arena_reset();
983
984 // Spawn strand, let it allocate strings, then exit
985 strand_spawn(allocate_strings_and_exit, std::ptr::null_mut());
986
987 // Wait for completion (triggers arena reset)
988 wait_all_strands();
989
990 // Every 1000 iterations, verify arena is actually reset
991 if i % 1000 == 0 {
992 let stats = arena::arena_stats();
993 assert_eq!(
994 stats.allocated_bytes, 0,
995 "Arena not reset after iteration {} (leaked {} bytes)",
996 i, stats.allocated_bytes
997 );
998 }
999 }
1000
1001 // Final verification: arena should be empty
1002 let final_stats = arena::arena_stats();
1003 assert_eq!(
1004 final_stats.allocated_bytes, 0,
1005 "Arena leaked memory after {} iterations ({} bytes)",
1006 iterations, final_stats.allocated_bytes
1007 );
1008
1009 println!(
1010 "✓ Memory leak test passed: {} iterations with no growth",
1011 iterations
1012 );
1013 }
1014 }
1015
1016 #[test]
1017 fn test_parse_stack_size_valid() {
1018 assert_eq!(parse_stack_size(Some("2097152".to_string())), 2097152);
1019 assert_eq!(parse_stack_size(Some("1".to_string())), 1);
1020 assert_eq!(parse_stack_size(Some("999999999".to_string())), 999999999);
1021 }
1022
1023 #[test]
1024 fn test_parse_stack_size_none() {
1025 assert_eq!(parse_stack_size(None), DEFAULT_STACK_SIZE);
1026 }
1027
1028 #[test]
1029 fn test_parse_stack_size_zero() {
1030 // Zero should fall back to default (with warning printed to stderr)
1031 assert_eq!(parse_stack_size(Some("0".to_string())), DEFAULT_STACK_SIZE);
1032 }
1033
1034 #[test]
1035 fn test_parse_stack_size_invalid() {
1036 // Non-numeric should fall back to default (with warning printed to stderr)
1037 assert_eq!(
1038 parse_stack_size(Some("invalid".to_string())),
1039 DEFAULT_STACK_SIZE
1040 );
1041 assert_eq!(
1042 parse_stack_size(Some("-100".to_string())),
1043 DEFAULT_STACK_SIZE
1044 );
1045 assert_eq!(parse_stack_size(Some("".to_string())), DEFAULT_STACK_SIZE);
1046 assert_eq!(
1047 parse_stack_size(Some("1.5".to_string())),
1048 DEFAULT_STACK_SIZE
1049 );
1050 }
1051
1052 #[test]
1053 #[cfg(feature = "diagnostics")]
1054 fn test_strand_registry_basic() {
1055 let registry = StrandRegistry::new(10);
1056
1057 // Register some strands
1058 assert_eq!(registry.register(1), Some(0)); // First slot
1059 assert_eq!(registry.register(2), Some(1)); // Second slot
1060 assert_eq!(registry.register(3), Some(2)); // Third slot
1061
1062 // Verify active strands
1063 let active: Vec<_> = registry.active_strands().collect();
1064 assert_eq!(active.len(), 3);
1065
1066 // Unregister one
1067 assert!(registry.unregister(2));
1068 let active: Vec<_> = registry.active_strands().collect();
1069 assert_eq!(active.len(), 2);
1070
1071 // Unregister non-existent should return false
1072 assert!(!registry.unregister(999));
1073 }
1074
1075 #[test]
1076 #[cfg(feature = "diagnostics")]
1077 fn test_strand_registry_overflow() {
1078 let registry = StrandRegistry::new(3); // Small capacity
1079
1080 // Fill it up
1081 assert!(registry.register(1).is_some());
1082 assert!(registry.register(2).is_some());
1083 assert!(registry.register(3).is_some());
1084
1085 // Next should overflow
1086 assert!(registry.register(4).is_none());
1087 assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 1);
1088
1089 // Another overflow
1090 assert!(registry.register(5).is_none());
1091 assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 2);
1092 }
1093
1094 #[test]
1095 #[cfg(feature = "diagnostics")]
1096 fn test_strand_registry_slot_reuse() {
1097 let registry = StrandRegistry::new(3);
1098
1099 // Fill it up
1100 registry.register(1);
1101 registry.register(2);
1102 registry.register(3);
1103
1104 // Unregister middle one
1105 registry.unregister(2);
1106
1107 // New registration should reuse the slot
1108 assert!(registry.register(4).is_some());
1109 assert_eq!(registry.active_strands().count(), 3);
1110 }
1111
1112 #[test]
1113 #[cfg(feature = "diagnostics")]
1114 fn test_strand_registry_concurrent_stress() {
1115 use std::sync::Arc;
1116 use std::thread;
1117
1118 let registry = Arc::new(StrandRegistry::new(50)); // Moderate capacity
1119
1120 let handles: Vec<_> = (0..100)
1121 .map(|i| {
1122 let reg = Arc::clone(®istry);
1123 thread::spawn(move || {
1124 let id = (i + 1) as u64;
1125 // Register
1126 let _ = reg.register(id);
1127 // Brief work
1128 thread::yield_now();
1129 // Unregister
1130 reg.unregister(id);
1131 })
1132 })
1133 .collect();
1134
1135 for h in handles {
1136 h.join().unwrap();
1137 }
1138
1139 // All slots should be free after all threads complete
1140 assert_eq!(registry.active_strands().count(), 0);
1141 }
1142
1143 #[test]
1144 fn test_strand_lifecycle_counters() {
1145 unsafe {
1146 // Reset counters for isolation (not perfect but helps)
1147 let initial_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
1148 let initial_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
1149
1150 static COUNTER: AtomicU32 = AtomicU32::new(0);
1151
1152 extern "C" fn simple_work(_stack: Stack) -> Stack {
1153 COUNTER.fetch_add(1, Ordering::SeqCst);
1154 std::ptr::null_mut()
1155 }
1156
1157 COUNTER.store(0, Ordering::SeqCst);
1158
1159 // Spawn some strands
1160 for _ in 0..10 {
1161 strand_spawn(simple_work, std::ptr::null_mut());
1162 }
1163
1164 wait_all_strands();
1165
1166 // Verify counters incremented
1167 let final_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
1168 let final_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
1169
1170 assert!(
1171 final_spawned >= initial_spawned + 10,
1172 "TOTAL_SPAWNED should have increased by at least 10"
1173 );
1174 assert!(
1175 final_completed >= initial_completed + 10,
1176 "TOTAL_COMPLETED should have increased by at least 10"
1177 );
1178 assert_eq!(COUNTER.load(Ordering::SeqCst), 10);
1179 }
1180 }
1181
1182 // =========================================================================
1183 // Yield Safety Valve Tests
1184 // =========================================================================
1185
1186 #[test]
1187 fn test_maybe_yield_disabled_by_default() {
1188 // When SEQ_YIELD_INTERVAL is not set (or 0), maybe_yield should be a no-op
1189 // This test verifies it doesn't panic and returns quickly
1190 for _ in 0..1000 {
1191 patch_seq_maybe_yield();
1192 }
1193 }
1194
1195 #[test]
1196 fn test_tail_call_counter_increments() {
1197 // Verify the thread-local counter increments correctly
1198 TAIL_CALL_COUNTER.with(|counter| {
1199 let initial = counter.get();
1200 patch_seq_maybe_yield();
1201 patch_seq_maybe_yield();
1202 patch_seq_maybe_yield();
1203 // Counter should have incremented (if threshold > 0) or stayed same (if disabled)
1204 // Either way, it shouldn't panic
1205 let _ = counter.get();
1206 // Reset to avoid affecting other tests
1207 counter.set(initial);
1208 });
1209 }
1210
1211 #[test]
1212 fn test_counter_overflow_safety() {
1213 // Verify wrapping_add prevents overflow panic
1214 TAIL_CALL_COUNTER.with(|counter| {
1215 let initial = counter.get();
1216 // Set counter near max to test overflow behavior
1217 counter.set(u64::MAX - 1);
1218 // These calls should not panic due to overflow
1219 patch_seq_maybe_yield();
1220 patch_seq_maybe_yield();
1221 patch_seq_maybe_yield();
1222 // Reset
1223 counter.set(initial);
1224 });
1225 }
1226}