seq_runtime/scheduler.rs
1//! Scheduler - Green Thread Management with May
2//!
3//! CSP-style concurrency for Seq using May coroutines.
4//! Each strand is a lightweight green thread that can communicate via channels.
5//!
6//! ## Non-Blocking Guarantee
7//!
8//! Channel operations (`send`, `receive`) use May's cooperative blocking and NEVER
9//! block OS threads. However, I/O operations (`write_line`, `read_line` in io.rs)
10//! currently use blocking syscalls. Future work will make all I/O non-blocking.
11//!
12//! ## Panic Behavior
13//!
14//! Functions panic on invalid input (null stacks, negative IDs, closed channels).
15//! In a production system, consider implementing error channels or Result-based
16//! error handling instead of panicking.
17
18use crate::stack::Stack;
19use crate::tagged_stack::StackValue;
20use may::coroutine;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::{Condvar, Mutex, Once};
23use std::time::{Duration, Instant};
24
25static SCHEDULER_INIT: Once = Once::new();
26static SCHEDULER_START_TIME: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
27
28// Strand lifecycle tracking
29//
30// Design rationale:
31// - ACTIVE_STRANDS: Lock-free atomic counter for the hot path (spawn/complete)
32// Every strand increments on spawn, decrements on complete. This is extremely
33// fast (lock-free atomic ops) and suitable for high-frequency operations.
34//
35// - SHUTDOWN_CONDVAR/MUTEX: Event-driven synchronization for the cold path (shutdown wait)
36// Used only when waiting for all strands to complete (program shutdown).
37// Condvar provides event-driven wakeup instead of polling, which is critical
38// for a systems language - no CPU waste, proper OS-level blocking.
39//
40// Why not track JoinHandles?
41// Strands are like Erlang processes - potentially hundreds of thousands of concurrent
42// entities with independent lifecycles. Storing handles would require global mutable
43// state with synchronization overhead on the hot path. The counter + condvar approach
44// keeps the hot path lock-free while providing proper shutdown synchronization.
45pub static ACTIVE_STRANDS: AtomicUsize = AtomicUsize::new(0);
46pub(crate) static SHUTDOWN_CONDVAR: Condvar = Condvar::new();
47pub(crate) static SHUTDOWN_MUTEX: Mutex<()> = Mutex::new(());
48
49// Strand lifecycle statistics (for diagnostics)
50//
51// These counters provide observability into strand lifecycle without any locking.
52// All operations are lock-free atomic increments/loads.
53//
54// - TOTAL_SPAWNED: Monotonically increasing count of all strands ever spawned
55// - TOTAL_COMPLETED: Monotonically increasing count of all strands that completed
56// - PEAK_STRANDS: High-water mark of concurrent strands (helps detect strand leaks)
57//
58// Useful diagnostics:
59// - Currently running: ACTIVE_STRANDS
60// - Completed successfully: TOTAL_COMPLETED
61// - Potential leaks: TOTAL_SPAWNED - TOTAL_COMPLETED - ACTIVE_STRANDS > 0 (strands lost)
62// - Peak concurrency: PEAK_STRANDS
63pub static TOTAL_SPAWNED: AtomicU64 = AtomicU64::new(0);
64pub static TOTAL_COMPLETED: AtomicU64 = AtomicU64::new(0);
65pub static PEAK_STRANDS: AtomicUsize = AtomicUsize::new(0);
66
67// Unique strand ID generation
68static NEXT_STRAND_ID: AtomicU64 = AtomicU64::new(1);
69
70// =============================================================================
71// Lock-Free Strand Registry (only when diagnostics feature is enabled)
72// =============================================================================
73//
74// A fixed-size array of slots for tracking active strands without locks.
75// Each slot stores a strand ID (0 = free) and spawn timestamp.
76//
77// Design principles:
78// - Fixed size: No dynamic allocation, predictable memory footprint
79// - Lock-free: All operations use atomic CAS, no mutex contention
80// - Bounded: If registry is full, strands still run but aren't tracked
81// - Zero cost when not querying: Only diagnostics reads the registry
82//
83// Slot encoding:
84// - strand_id == 0: slot is free
85// - strand_id > 0: slot contains an active strand
86//
87// The registry size can be configured via SEQ_STRAND_REGISTRY_SIZE env var.
88// Default is 1024 slots, which is sufficient for most applications.
89//
90// When the "diagnostics" feature is disabled, the registry is not compiled,
91// eliminating the SystemTime::now() syscall and O(n) scans on every spawn.
92
93#[cfg(feature = "diagnostics")]
94/// Default strand registry size (number of trackable concurrent strands)
95const DEFAULT_REGISTRY_SIZE: usize = 1024;
96
97#[cfg(feature = "diagnostics")]
98/// A slot in the strand registry
99///
100/// Uses two atomics to store strand info without locks.
101/// A slot is free when strand_id == 0.
102pub struct StrandSlot {
103 /// Strand ID (0 = free, >0 = active strand)
104 pub strand_id: AtomicU64,
105 /// Spawn timestamp (seconds since UNIX epoch, for detecting stuck strands)
106 pub spawn_time: AtomicU64,
107}
108
109#[cfg(feature = "diagnostics")]
110impl StrandSlot {
111 const fn new() -> Self {
112 Self {
113 strand_id: AtomicU64::new(0),
114 spawn_time: AtomicU64::new(0),
115 }
116 }
117}
118
119#[cfg(feature = "diagnostics")]
120/// Lock-free strand registry
121///
122/// Provides O(n) registration (scan for free slot) and O(n) unregistration.
123/// This is acceptable because:
124/// 1. N is bounded (default 1024)
125/// 2. Registration/unregistration are infrequent compared to strand work
126/// 3. No locks means no contention, just atomic ops
127pub struct StrandRegistry {
128 slots: Box<[StrandSlot]>,
129 /// Number of slots that couldn't be registered (registry full)
130 pub overflow_count: AtomicU64,
131}
132
133#[cfg(feature = "diagnostics")]
134impl StrandRegistry {
135 /// Create a new registry with the given capacity
136 fn new(capacity: usize) -> Self {
137 let mut slots = Vec::with_capacity(capacity);
138 for _ in 0..capacity {
139 slots.push(StrandSlot::new());
140 }
141 Self {
142 slots: slots.into_boxed_slice(),
143 overflow_count: AtomicU64::new(0),
144 }
145 }
146
147 /// Register a strand, returning the slot index if successful
148 ///
149 /// Uses CAS to atomically claim a free slot.
150 /// Returns None if the registry is full (strand still runs, just not tracked).
151 pub fn register(&self, strand_id: u64) -> Option<usize> {
152 let spawn_time = std::time::SystemTime::now()
153 .duration_since(std::time::UNIX_EPOCH)
154 .map(|d| d.as_secs())
155 .unwrap_or(0);
156
157 // Scan for a free slot
158 for (idx, slot) in self.slots.iter().enumerate() {
159 // Set spawn time first, before claiming the slot
160 // This prevents a race where a reader sees strand_id != 0 but spawn_time == 0
161 // If we fail to claim the slot, the owner will overwrite this value anyway
162 slot.spawn_time.store(spawn_time, Ordering::Relaxed);
163
164 // Try to claim this slot (CAS from 0 to strand_id)
165 // AcqRel ensures the spawn_time write above is visible before strand_id becomes non-zero
166 if slot
167 .strand_id
168 .compare_exchange(0, strand_id, Ordering::AcqRel, Ordering::Relaxed)
169 .is_ok()
170 {
171 return Some(idx);
172 }
173 }
174
175 // Registry full - track overflow but strand still runs
176 self.overflow_count.fetch_add(1, Ordering::Relaxed);
177 None
178 }
179
180 /// Unregister a strand by ID
181 ///
182 /// Scans for the slot containing this strand ID and clears it.
183 /// Returns true if found and cleared, false if not found.
184 ///
185 /// Note: ABA problem is not a concern here because strand IDs are monotonically
186 /// increasing u64 values. ID reuse would require 2^64 spawns, which is practically
187 /// impossible (at 1 billion spawns/sec, it would take ~584 years).
188 pub fn unregister(&self, strand_id: u64) -> bool {
189 for slot in self.slots.iter() {
190 // Check if this slot contains our strand
191 if slot
192 .strand_id
193 .compare_exchange(strand_id, 0, Ordering::AcqRel, Ordering::Relaxed)
194 .is_ok()
195 {
196 // Successfully cleared the slot
197 slot.spawn_time.store(0, Ordering::Release);
198 return true;
199 }
200 }
201 false
202 }
203
204 /// Iterate over active strands (for diagnostics)
205 ///
206 /// Returns an iterator of (strand_id, spawn_time) for non-empty slots.
207 /// Note: This is a snapshot and may be slightly inconsistent due to concurrent updates.
208 pub fn active_strands(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
209 self.slots.iter().filter_map(|slot| {
210 // Acquire on strand_id synchronizes with the Release in register()
211 let id = slot.strand_id.load(Ordering::Acquire);
212 if id > 0 {
213 // Relaxed is sufficient here - we've already synchronized via strand_id Acquire
214 // and spawn_time is written before strand_id in register()
215 let time = slot.spawn_time.load(Ordering::Relaxed);
216 Some((id, time))
217 } else {
218 None
219 }
220 })
221 }
222
223 /// Get the registry capacity
224 pub fn capacity(&self) -> usize {
225 self.slots.len()
226 }
227}
228
229// Global strand registry (lazy initialized)
230#[cfg(feature = "diagnostics")]
231static STRAND_REGISTRY: std::sync::OnceLock<StrandRegistry> = std::sync::OnceLock::new();
232
233/// Get or initialize the global strand registry
234#[cfg(feature = "diagnostics")]
235pub fn strand_registry() -> &'static StrandRegistry {
236 STRAND_REGISTRY.get_or_init(|| {
237 let size = std::env::var("SEQ_STRAND_REGISTRY_SIZE")
238 .ok()
239 .and_then(|s| s.parse().ok())
240 .unwrap_or(DEFAULT_REGISTRY_SIZE);
241 StrandRegistry::new(size)
242 })
243}
244
245/// Get elapsed time since scheduler was initialized
246pub fn scheduler_elapsed() -> Option<Duration> {
247 SCHEDULER_START_TIME.get().map(|start| start.elapsed())
248}
249
250/// Default coroutine stack size: 128KB (0x20000 bytes)
251/// Reduced from 1MB for better spawn performance (~16% faster in benchmarks).
252/// Can be overridden via SEQ_STACK_SIZE environment variable.
253const DEFAULT_STACK_SIZE: usize = 0x20000;
254
255/// Parse stack size from an optional string value.
256/// Returns the parsed size, or DEFAULT_STACK_SIZE if the value is missing, zero, or invalid.
257/// Prints a warning to stderr for invalid values.
258fn parse_stack_size(env_value: Option<String>) -> usize {
259 match env_value {
260 Some(val) => match val.parse::<usize>() {
261 Ok(0) => {
262 eprintln!(
263 "Warning: SEQ_STACK_SIZE=0 is invalid, using default {}",
264 DEFAULT_STACK_SIZE
265 );
266 DEFAULT_STACK_SIZE
267 }
268 Ok(size) => size,
269 Err(_) => {
270 eprintln!(
271 "Warning: SEQ_STACK_SIZE='{}' is not a valid number, using default {}",
272 val, DEFAULT_STACK_SIZE
273 );
274 DEFAULT_STACK_SIZE
275 }
276 },
277 None => DEFAULT_STACK_SIZE,
278 }
279}
280
281/// Default coroutine pool capacity.
282/// May reuses completed coroutine stacks from this pool to avoid allocations.
283/// Default of 1000 is often too small for spawn-heavy workloads.
284const DEFAULT_POOL_CAPACITY: usize = 10000;
285
286/// Initialize the scheduler.
287///
288/// # Safety
289/// Safe to call multiple times (idempotent via Once).
290/// Configures May coroutines with appropriate stack size for LLVM-generated code.
291#[unsafe(no_mangle)]
292pub unsafe extern "C" fn patch_seq_scheduler_init() {
293 SCHEDULER_INIT.call_once(|| {
294 // Configure stack size for coroutines
295 // Default is 128KB, reduced from 1MB for better spawn performance.
296 // Can be overridden via SEQ_STACK_SIZE environment variable (in bytes)
297 // Example: SEQ_STACK_SIZE=2097152 for 2MB
298 // Invalid values (non-numeric, zero) are warned and ignored.
299 let stack_size = parse_stack_size(std::env::var("SEQ_STACK_SIZE").ok());
300
301 // Configure coroutine pool capacity
302 // May reuses coroutine stacks from this pool to reduce allocation overhead.
303 // Default 10000 is 10x May's default (1000), better for spawn-heavy workloads.
304 // Can be overridden via SEQ_POOL_CAPACITY environment variable.
305 let pool_capacity = std::env::var("SEQ_POOL_CAPACITY")
306 .ok()
307 .and_then(|s| s.parse().ok())
308 .filter(|&v| v > 0)
309 .unwrap_or(DEFAULT_POOL_CAPACITY);
310
311 may::config()
312 .set_stack_size(stack_size)
313 .set_pool_capacity(pool_capacity);
314
315 // Record scheduler start time (for at-exit reporting)
316 SCHEDULER_START_TIME.get_or_init(Instant::now);
317
318 // Install SIGINT handler for Ctrl-C (unconditional - basic expected behavior)
319 // Without this, tight loops won't respond to Ctrl-C because signals
320 // are only delivered at syscall boundaries, and TCO loops may never syscall.
321 #[cfg(unix)]
322 {
323 use std::sync::atomic::{AtomicBool, Ordering};
324 static SIGINT_RECEIVED: AtomicBool = AtomicBool::new(false);
325
326 extern "C" fn sigint_handler(_: libc::c_int) {
327 // If we receive SIGINT twice, force exit (user is insistent)
328 if SIGINT_RECEIVED.swap(true, Ordering::SeqCst) {
329 // Second SIGINT - exit immediately
330 unsafe { libc::_exit(130) }; // 128 + 2 (SIGINT)
331 }
332 // First SIGINT - exit cleanly
333 std::process::exit(130);
334 }
335
336 unsafe {
337 libc::signal(
338 libc::SIGINT,
339 sigint_handler as *const () as libc::sighandler_t,
340 );
341 }
342 }
343
344 // Install SIGQUIT handler for runtime diagnostics (kill -3)
345 #[cfg(feature = "diagnostics")]
346 crate::diagnostics::install_signal_handler();
347
348 // Install watchdog timer (if enabled via SEQ_WATCHDOG_SECS)
349 #[cfg(feature = "diagnostics")]
350 crate::watchdog::install_watchdog();
351 });
352}
353
354/// Run the scheduler and wait for all coroutines to complete
355///
356/// # Safety
357/// Returns the final stack (always null for now since May handles all scheduling).
358/// This function blocks until all spawned strands have completed.
359///
360/// Uses a condition variable for event-driven shutdown synchronization rather than
361/// polling. The mutex is only held during the wait protocol, not during strand
362/// execution, so there's no contention on the hot path.
363#[unsafe(no_mangle)]
364pub unsafe extern "C" fn patch_seq_scheduler_run() -> Stack {
365 let mut guard = SHUTDOWN_MUTEX.lock().expect(
366 "scheduler_run: shutdown mutex poisoned - strand panicked during shutdown synchronization",
367 );
368
369 // Wait for all strands to complete
370 // The condition variable will be notified when the last strand exits
371 while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
372 guard = SHUTDOWN_CONDVAR
373 .wait(guard)
374 .expect("scheduler_run: condvar wait failed - strand panicked during shutdown wait");
375 }
376
377 // All strands have completed
378 std::ptr::null_mut()
379}
380
381/// Shutdown the scheduler
382///
383/// # Safety
384/// Safe to call. May doesn't require explicit shutdown, so this is a no-op.
385#[unsafe(no_mangle)]
386pub unsafe extern "C" fn patch_seq_scheduler_shutdown() {
387 // May doesn't require explicit shutdown
388 // This function exists for API symmetry with init
389}
390
391/// Spawn a strand (coroutine) with initial stack
392///
393/// # Safety
394/// - `entry` must be a valid function pointer that can safely execute on any thread
395/// - `initial_stack` must be either null or a valid pointer to a `StackValue` that:
396/// - Was heap-allocated (e.g., via Box)
397/// - Has a 'static lifetime or lives longer than the coroutine
398/// - Is safe to access from the spawned thread
399/// - The caller transfers ownership of `initial_stack` to the coroutine
400/// - Returns a unique strand ID (positive integer)
401///
402/// # Memory Management
403/// The spawned coroutine takes ownership of `initial_stack` and will automatically
404/// free the final stack returned by `entry` upon completion.
405#[unsafe(no_mangle)]
406pub unsafe extern "C" fn patch_seq_strand_spawn(
407 entry: extern "C" fn(Stack) -> Stack,
408 initial_stack: Stack,
409) -> i64 {
410 // For backwards compatibility, use null base (won't support nested spawns)
411 unsafe { patch_seq_strand_spawn_with_base(entry, initial_stack, std::ptr::null_mut()) }
412}
413
414/// Spawn a strand (coroutine) with initial stack and explicit stack base
415///
416/// This variant allows setting the STACK_BASE for the spawned strand, which is
417/// required for the child to perform operations like clone_stack (nested spawn).
418///
419/// # Safety
420/// - `entry` must be a valid function pointer that can safely execute on any thread
421/// - `initial_stack` must be a valid pointer to a `StackValue` array
422/// - `stack_base` must be the base of the stack (or null to skip setting STACK_BASE)
423/// - The caller transfers ownership of `initial_stack` to the coroutine
424/// - Returns a unique strand ID (positive integer)
425#[unsafe(no_mangle)]
426pub unsafe extern "C" fn patch_seq_strand_spawn_with_base(
427 entry: extern "C" fn(Stack) -> Stack,
428 initial_stack: Stack,
429 stack_base: Stack,
430) -> i64 {
431 // Generate unique strand ID
432 let strand_id = NEXT_STRAND_ID.fetch_add(1, Ordering::Relaxed);
433
434 // Increment active strand counter and track total spawned
435 let new_count = ACTIVE_STRANDS.fetch_add(1, Ordering::Release) + 1;
436 TOTAL_SPAWNED.fetch_add(1, Ordering::Relaxed);
437
438 // Update peak strands if this is a new high-water mark
439 // Uses a CAS loop to safely update the maximum without locks
440 // Uses Acquire/Release ordering for proper synchronization with diagnostics reads
441 let mut peak = PEAK_STRANDS.load(Ordering::Acquire);
442 while new_count > peak {
443 match PEAK_STRANDS.compare_exchange_weak(
444 peak,
445 new_count,
446 Ordering::Release,
447 Ordering::Relaxed,
448 ) {
449 Ok(_) => break,
450 Err(current) => peak = current,
451 }
452 }
453
454 // Register strand in the registry (for diagnostics visibility)
455 // If registry is full, strand still runs but isn't tracked
456 #[cfg(feature = "diagnostics")]
457 let _ = strand_registry().register(strand_id);
458
459 // Function pointers are already Send, no wrapper needed
460 let entry_fn = entry;
461
462 // Convert pointers to usize (which is Send)
463 // This is necessary because *mut T is !Send, but the caller guarantees thread safety
464 let stack_addr = initial_stack as usize;
465 let base_addr = stack_base as usize;
466
467 unsafe {
468 coroutine::spawn(move || {
469 // Reconstruct pointers from addresses
470 let stack_ptr = stack_addr as *mut StackValue;
471 let base_ptr = base_addr as *mut StackValue;
472
473 // Debug assertion: validate stack pointer alignment and reasonable address
474 debug_assert!(
475 stack_ptr.is_null()
476 || stack_addr.is_multiple_of(std::mem::align_of::<StackValue>()),
477 "Stack pointer must be null or properly aligned"
478 );
479 debug_assert!(
480 stack_ptr.is_null() || stack_addr > 0x1000,
481 "Stack pointer appears to be in invalid memory region (< 0x1000)"
482 );
483
484 // Set STACK_BASE for this strand if provided
485 // This enables nested spawns and other operations that need clone_stack
486 if !base_ptr.is_null() {
487 crate::stack::patch_seq_set_stack_base(base_ptr);
488 }
489
490 // Execute the entry function
491 let final_stack = entry_fn(stack_ptr);
492
493 // Clean up the final stack to prevent memory leak
494 free_stack(final_stack);
495
496 // Unregister strand from registry (uses captured strand_id)
497 #[cfg(feature = "diagnostics")]
498 strand_registry().unregister(strand_id);
499
500 // Decrement active strand counter first, then track completion
501 // This ordering ensures the invariant SPAWNED = COMPLETED + ACTIVE + lost
502 // is never violated from an external observer's perspective
503 // Use AcqRel to establish proper synchronization (both acquire and release barriers)
504 let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
505
506 // Track completion after decrementing active count
507 TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
508 if prev_count == 1 {
509 // We were the last strand - acquire mutex and signal shutdown
510 // The mutex must be held when calling notify to prevent missed wakeups
511 let _guard = SHUTDOWN_MUTEX.lock()
512 .expect("strand_spawn: shutdown mutex poisoned - strand panicked during shutdown notification");
513 SHUTDOWN_CONDVAR.notify_all();
514 }
515 });
516 }
517
518 strand_id as i64
519}
520
521/// Free a stack allocated by the runtime
522///
523/// With the tagged stack implementation, stack cleanup is handled differently.
524/// The contiguous array is freed when the TaggedStack is dropped.
525/// This function just resets the thread-local arena.
526///
527/// # Safety
528/// Stack pointer must be valid or null.
529fn free_stack(_stack: Stack) {
530 // With tagged stack, the array is freed when TaggedStack is dropped.
531 // We just need to reset the arena for thread-local strings.
532
533 // Reset the thread-local arena to free all arena-allocated strings
534 // This is safe because:
535 // - Any arena strings in Values have been dropped above
536 // - Global strings are unaffected (they have their own allocations)
537 // - Channel sends clone to global, so no cross-strand arena pointers
538 crate::arena::arena_reset();
539}
540
541/// Legacy spawn_strand function (kept for compatibility)
542///
543/// # Safety
544/// `entry` must be a valid function pointer that can safely execute on any thread.
545#[unsafe(no_mangle)]
546pub unsafe extern "C" fn patch_seq_spawn_strand(entry: extern "C" fn(Stack) -> Stack) {
547 unsafe {
548 patch_seq_strand_spawn(entry, std::ptr::null_mut());
549 }
550}
551
552/// Yield execution to allow other coroutines to run
553///
554/// # Safety
555/// Always safe to call from within a May coroutine.
556#[unsafe(no_mangle)]
557pub unsafe extern "C" fn patch_seq_yield_strand(stack: Stack) -> Stack {
558 coroutine::yield_now();
559 stack
560}
561
562// =============================================================================
563// Cooperative Yield Safety Valve
564// =============================================================================
565//
566// Prevents tight TCO loops from starving other strands and making the process
567// unresponsive. When enabled via SEQ_YIELD_INTERVAL, yields after N tail calls.
568//
569// Configuration:
570// SEQ_YIELD_INTERVAL=10000 - Yield every 10,000 tail calls (default: 0 = disabled)
571//
572// Scope:
573// - Covers: User-defined word tail calls (musttail) and quotation tail calls
574// - Does NOT cover: Closure calls (they use regular calls, bounded by stack)
575// - Does NOT cover: Non-tail recursive calls (bounded by stack)
576// This is intentional: the safety valve targets unbounded TCO loops.
577//
578// Design:
579// - Zero overhead when disabled (threshold=0 short-circuits immediately)
580// - Thread-local counter avoids synchronization overhead
581// - Called before every musttail in generated code
582// - Threshold is cached on first access via OnceLock
583//
584// Thread-Local Counter Behavior:
585// The counter is per-OS-thread, not per-coroutine. Multiple coroutines on the
586// same OS thread share the counter, which may cause yields slightly more
587// frequently than the configured interval. This is intentional:
588// - Avoids coroutine-local storage overhead
589// - Still achieves the goal of preventing starvation
590// - Actual yield frequency is still bounded by the threshold
591
592use std::cell::Cell;
593use std::sync::OnceLock;
594
595/// Cached yield interval threshold (0 = disabled)
596static YIELD_THRESHOLD: OnceLock<u64> = OnceLock::new();
597
598thread_local! {
599 /// Per-thread tail call counter
600 static TAIL_CALL_COUNTER: Cell<u64> = const { Cell::new(0) };
601}
602
603/// Get the yield threshold from environment (cached)
604///
605/// Returns 0 (disabled) if SEQ_YIELD_INTERVAL is not set or invalid.
606/// Prints a warning to stderr if the value is set but invalid.
607fn get_yield_threshold() -> u64 {
608 *YIELD_THRESHOLD.get_or_init(|| {
609 match std::env::var("SEQ_YIELD_INTERVAL") {
610 Ok(s) if s.is_empty() => 0,
611 Ok(s) => match s.parse::<u64>() {
612 Ok(n) => n,
613 Err(_) => {
614 eprintln!(
615 "Warning: SEQ_YIELD_INTERVAL='{}' is not a valid positive integer, yield safety valve disabled",
616 s
617 );
618 0
619 }
620 },
621 Err(_) => 0,
622 }
623 })
624}
625
626/// Maybe yield to other coroutines based on tail call count
627///
628/// Called before every tail call in generated code. When SEQ_YIELD_INTERVAL
629/// is set, yields after that many tail calls to prevent starvation.
630///
631/// # Performance
632/// - Disabled (default): Single branch on cached threshold (< 1ns)
633/// - Enabled: Increment + compare + occasional yield (~10-20ns average)
634///
635/// # Safety
636/// Always safe to call. No-op when not in a May coroutine context.
637#[unsafe(no_mangle)]
638pub extern "C" fn patch_seq_maybe_yield() {
639 let threshold = get_yield_threshold();
640
641 // Fast path: disabled
642 if threshold == 0 {
643 return;
644 }
645
646 TAIL_CALL_COUNTER.with(|counter| {
647 let count = counter.get().wrapping_add(1);
648 counter.set(count);
649
650 if count >= threshold {
651 counter.set(0);
652 coroutine::yield_now();
653 }
654 });
655}
656
657/// Wait for all strands to complete
658///
659/// # Safety
660/// Always safe to call. Blocks until all spawned strands have completed.
661///
662/// Uses event-driven synchronization via condition variable - no polling overhead.
663#[unsafe(no_mangle)]
664pub unsafe extern "C" fn patch_seq_wait_all_strands() {
665 let mut guard = SHUTDOWN_MUTEX.lock()
666 .expect("wait_all_strands: shutdown mutex poisoned - strand panicked during shutdown synchronization");
667
668 // Wait for all strands to complete
669 // The condition variable will be notified when the last strand exits
670 while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
671 guard = SHUTDOWN_CONDVAR
672 .wait(guard)
673 .expect("wait_all_strands: condvar wait failed - strand panicked during shutdown wait");
674 }
675}
676
677// Public re-exports with short names for internal use
678pub use patch_seq_maybe_yield as maybe_yield;
679pub use patch_seq_scheduler_init as scheduler_init;
680pub use patch_seq_scheduler_run as scheduler_run;
681pub use patch_seq_scheduler_shutdown as scheduler_shutdown;
682pub use patch_seq_spawn_strand as spawn_strand;
683pub use patch_seq_strand_spawn as strand_spawn;
684pub use patch_seq_wait_all_strands as wait_all_strands;
685pub use patch_seq_yield_strand as yield_strand;
686
687#[cfg(test)]
688mod tests {
689 use super::*;
690 use crate::stack::push;
691 use crate::value::Value;
692 use std::sync::atomic::{AtomicU32, Ordering};
693
694 #[test]
695 fn test_spawn_strand() {
696 unsafe {
697 static COUNTER: AtomicU32 = AtomicU32::new(0);
698
699 extern "C" fn test_entry(_stack: Stack) -> Stack {
700 COUNTER.fetch_add(1, Ordering::SeqCst);
701 std::ptr::null_mut()
702 }
703
704 for _ in 0..100 {
705 spawn_strand(test_entry);
706 }
707
708 std::thread::sleep(std::time::Duration::from_millis(200));
709 assert_eq!(COUNTER.load(Ordering::SeqCst), 100);
710 }
711 }
712
713 #[test]
714 fn test_scheduler_init_idempotent() {
715 unsafe {
716 // Should be safe to call multiple times
717 scheduler_init();
718 scheduler_init();
719 scheduler_init();
720 }
721 }
722
723 #[test]
724 fn test_free_stack_null() {
725 // Freeing null should be a no-op
726 free_stack(std::ptr::null_mut());
727 }
728
729 #[test]
730 fn test_free_stack_valid() {
731 unsafe {
732 // Create a stack, then free it
733 let stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
734 free_stack(stack);
735 // If we get here without crashing, test passed
736 }
737 }
738
739 #[test]
740 fn test_strand_spawn_with_stack() {
741 unsafe {
742 static COUNTER: AtomicU32 = AtomicU32::new(0);
743
744 extern "C" fn test_entry(stack: Stack) -> Stack {
745 COUNTER.fetch_add(1, Ordering::SeqCst);
746 // Return the stack as-is (caller will free it)
747 stack
748 }
749
750 let initial_stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
751 strand_spawn(test_entry, initial_stack);
752
753 std::thread::sleep(std::time::Duration::from_millis(200));
754 assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
755 }
756 }
757
758 #[test]
759 fn test_scheduler_shutdown() {
760 unsafe {
761 scheduler_init();
762 scheduler_shutdown();
763 // Should not crash
764 }
765 }
766
767 #[test]
768 fn test_many_strands_stress() {
769 unsafe {
770 static COUNTER: AtomicU32 = AtomicU32::new(0);
771
772 extern "C" fn increment(_stack: Stack) -> Stack {
773 COUNTER.fetch_add(1, Ordering::SeqCst);
774 std::ptr::null_mut()
775 }
776
777 // Reset counter for this test
778 COUNTER.store(0, Ordering::SeqCst);
779
780 // Spawn many strands to stress test synchronization
781 for _ in 0..1000 {
782 strand_spawn(increment, std::ptr::null_mut());
783 }
784
785 // Wait for all to complete
786 wait_all_strands();
787
788 // Verify all strands executed
789 assert_eq!(COUNTER.load(Ordering::SeqCst), 1000);
790 }
791 }
792
793 #[test]
794 fn test_strand_ids_are_unique() {
795 unsafe {
796 use std::collections::HashSet;
797
798 extern "C" fn noop(_stack: Stack) -> Stack {
799 std::ptr::null_mut()
800 }
801
802 // Spawn strands and collect their IDs
803 let mut ids = Vec::new();
804 for _ in 0..100 {
805 let id = strand_spawn(noop, std::ptr::null_mut());
806 ids.push(id);
807 }
808
809 // Wait for completion
810 wait_all_strands();
811
812 // Verify all IDs are unique
813 let unique_ids: HashSet<_> = ids.iter().collect();
814 assert_eq!(unique_ids.len(), 100, "All strand IDs should be unique");
815
816 // Verify all IDs are positive
817 assert!(
818 ids.iter().all(|&id| id > 0),
819 "All strand IDs should be positive"
820 );
821 }
822 }
823
824 #[test]
825 fn test_arena_reset_with_strands() {
826 unsafe {
827 use crate::arena;
828 use crate::seqstring::arena_string;
829
830 extern "C" fn create_temp_strings(stack: Stack) -> Stack {
831 // Create many temporary arena strings (simulating request parsing)
832 for i in 0..100 {
833 let temp = arena_string(&format!("temporary string {}", i));
834 // Use the string temporarily
835 assert!(!temp.as_str().is_empty());
836 // String is dropped, but memory stays in arena
837 }
838
839 // Arena should have allocated memory
840 let stats = arena::arena_stats();
841 assert!(stats.allocated_bytes > 0, "Arena should have allocations");
842
843 stack // Return empty stack
844 }
845
846 // Reset arena before test
847 arena::arena_reset();
848
849 // Spawn strand that creates many temp strings
850 strand_spawn(create_temp_strings, std::ptr::null_mut());
851
852 // Wait for strand to complete (which calls free_stack -> arena_reset)
853 wait_all_strands();
854
855 // After strand exits, arena should be reset
856 let stats_after = arena::arena_stats();
857 assert_eq!(
858 stats_after.allocated_bytes, 0,
859 "Arena should be reset after strand exits"
860 );
861 }
862 }
863
864 #[test]
865 fn test_arena_with_channel_send() {
866 unsafe {
867 use crate::channel::{close_channel, make_channel, receive, send};
868 use crate::stack::{pop, push};
869 use crate::value::Value;
870 use std::sync::Arc;
871 use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
872
873 static RECEIVED_COUNT: AtomicU32 = AtomicU32::new(0);
874 static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
875
876 // Create channel
877 let stack = crate::stack::alloc_test_stack();
878 let stack = make_channel(stack);
879 let (stack, chan_val) = pop(stack);
880 let channel = match chan_val {
881 Value::Channel(ch) => ch,
882 _ => panic!("Expected Channel"),
883 };
884
885 // Store channel pointer for strands
886 let ch_ptr = Arc::as_ptr(&channel) as i64;
887 CHANNEL_PTR.store(ch_ptr, Ordering::Release);
888
889 // Keep Arc alive
890 std::mem::forget(channel.clone());
891 std::mem::forget(channel.clone());
892
893 // Sender strand: creates arena string, sends through channel
894 extern "C" fn sender(_stack: Stack) -> Stack {
895 use crate::seqstring::arena_string;
896 use crate::value::ChannelData;
897 use std::sync::Arc;
898
899 unsafe {
900 let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
901 let channel = Arc::from_raw(ch_ptr);
902 let channel_clone = Arc::clone(&channel);
903 std::mem::forget(channel); // Don't drop
904
905 // Create arena string
906 let msg = arena_string("Hello from sender!");
907
908 // Push string and channel for send
909 let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
910 let stack = push(stack, Value::Channel(channel_clone));
911
912 // Send (will clone to global)
913 send(stack)
914 }
915 }
916
917 // Receiver strand: receives string from channel
918 extern "C" fn receiver(_stack: Stack) -> Stack {
919 use crate::value::ChannelData;
920 use std::sync::Arc;
921 use std::sync::atomic::Ordering;
922
923 unsafe {
924 let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
925 let channel = Arc::from_raw(ch_ptr);
926 let channel_clone = Arc::clone(&channel);
927 std::mem::forget(channel); // Don't drop
928
929 // Push channel for receive
930 let stack = push(
931 crate::stack::alloc_test_stack(),
932 Value::Channel(channel_clone),
933 );
934
935 // Receive message (returns value, success_flag)
936 let stack = receive(stack);
937
938 // Pop success flag first, then message
939 let (stack, _success) = pop(stack);
940 let (_stack, msg_val) = pop(stack);
941 match msg_val {
942 Value::String(s) => {
943 assert_eq!(s.as_str(), "Hello from sender!");
944 RECEIVED_COUNT.fetch_add(1, Ordering::SeqCst);
945 }
946 _ => panic!("Expected String"),
947 }
948
949 std::ptr::null_mut()
950 }
951 }
952
953 // Spawn sender and receiver
954 spawn_strand(sender);
955 spawn_strand(receiver);
956
957 // Wait for both strands
958 wait_all_strands();
959
960 // Verify message was received
961 assert_eq!(
962 RECEIVED_COUNT.load(Ordering::SeqCst),
963 1,
964 "Receiver should have received message"
965 );
966
967 // Clean up channel
968 let stack = push(stack, Value::Channel(channel));
969 close_channel(stack);
970 }
971 }
972
973 #[test]
974 fn test_no_memory_leak_over_many_iterations() {
975 // PR #11 feedback: Verify 10K+ strand iterations don't cause memory growth
976 unsafe {
977 use crate::arena;
978 use crate::seqstring::arena_string;
979
980 extern "C" fn allocate_strings_and_exit(stack: Stack) -> Stack {
981 // Simulate request processing: many temp allocations
982 for i in 0..50 {
983 let temp = arena_string(&format!("request header {}", i));
984 assert!(!temp.as_str().is_empty());
985 // Strings dropped here but arena memory stays allocated
986 }
987 stack
988 }
989
990 // Run many iterations to detect leaks
991 let iterations = 10_000;
992
993 for i in 0..iterations {
994 // Reset arena before each iteration to start fresh
995 arena::arena_reset();
996
997 // Spawn strand, let it allocate strings, then exit
998 strand_spawn(allocate_strings_and_exit, std::ptr::null_mut());
999
1000 // Wait for completion (triggers arena reset)
1001 wait_all_strands();
1002
1003 // Every 1000 iterations, verify arena is actually reset
1004 if i % 1000 == 0 {
1005 let stats = arena::arena_stats();
1006 assert_eq!(
1007 stats.allocated_bytes, 0,
1008 "Arena not reset after iteration {} (leaked {} bytes)",
1009 i, stats.allocated_bytes
1010 );
1011 }
1012 }
1013
1014 // Final verification: arena should be empty
1015 let final_stats = arena::arena_stats();
1016 assert_eq!(
1017 final_stats.allocated_bytes, 0,
1018 "Arena leaked memory after {} iterations ({} bytes)",
1019 iterations, final_stats.allocated_bytes
1020 );
1021
1022 println!(
1023 "✓ Memory leak test passed: {} iterations with no growth",
1024 iterations
1025 );
1026 }
1027 }
1028
1029 #[test]
1030 fn test_parse_stack_size_valid() {
1031 assert_eq!(parse_stack_size(Some("2097152".to_string())), 2097152);
1032 assert_eq!(parse_stack_size(Some("1".to_string())), 1);
1033 assert_eq!(parse_stack_size(Some("999999999".to_string())), 999999999);
1034 }
1035
1036 #[test]
1037 fn test_parse_stack_size_none() {
1038 assert_eq!(parse_stack_size(None), DEFAULT_STACK_SIZE);
1039 }
1040
1041 #[test]
1042 fn test_parse_stack_size_zero() {
1043 // Zero should fall back to default (with warning printed to stderr)
1044 assert_eq!(parse_stack_size(Some("0".to_string())), DEFAULT_STACK_SIZE);
1045 }
1046
1047 #[test]
1048 fn test_parse_stack_size_invalid() {
1049 // Non-numeric should fall back to default (with warning printed to stderr)
1050 assert_eq!(
1051 parse_stack_size(Some("invalid".to_string())),
1052 DEFAULT_STACK_SIZE
1053 );
1054 assert_eq!(
1055 parse_stack_size(Some("-100".to_string())),
1056 DEFAULT_STACK_SIZE
1057 );
1058 assert_eq!(parse_stack_size(Some("".to_string())), DEFAULT_STACK_SIZE);
1059 assert_eq!(
1060 parse_stack_size(Some("1.5".to_string())),
1061 DEFAULT_STACK_SIZE
1062 );
1063 }
1064
1065 #[test]
1066 #[cfg(feature = "diagnostics")]
1067 fn test_strand_registry_basic() {
1068 let registry = StrandRegistry::new(10);
1069
1070 // Register some strands
1071 assert_eq!(registry.register(1), Some(0)); // First slot
1072 assert_eq!(registry.register(2), Some(1)); // Second slot
1073 assert_eq!(registry.register(3), Some(2)); // Third slot
1074
1075 // Verify active strands
1076 let active: Vec<_> = registry.active_strands().collect();
1077 assert_eq!(active.len(), 3);
1078
1079 // Unregister one
1080 assert!(registry.unregister(2));
1081 let active: Vec<_> = registry.active_strands().collect();
1082 assert_eq!(active.len(), 2);
1083
1084 // Unregister non-existent should return false
1085 assert!(!registry.unregister(999));
1086 }
1087
1088 #[test]
1089 #[cfg(feature = "diagnostics")]
1090 fn test_strand_registry_overflow() {
1091 let registry = StrandRegistry::new(3); // Small capacity
1092
1093 // Fill it up
1094 assert!(registry.register(1).is_some());
1095 assert!(registry.register(2).is_some());
1096 assert!(registry.register(3).is_some());
1097
1098 // Next should overflow
1099 assert!(registry.register(4).is_none());
1100 assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 1);
1101
1102 // Another overflow
1103 assert!(registry.register(5).is_none());
1104 assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 2);
1105 }
1106
1107 #[test]
1108 #[cfg(feature = "diagnostics")]
1109 fn test_strand_registry_slot_reuse() {
1110 let registry = StrandRegistry::new(3);
1111
1112 // Fill it up
1113 registry.register(1);
1114 registry.register(2);
1115 registry.register(3);
1116
1117 // Unregister middle one
1118 registry.unregister(2);
1119
1120 // New registration should reuse the slot
1121 assert!(registry.register(4).is_some());
1122 assert_eq!(registry.active_strands().count(), 3);
1123 }
1124
1125 #[test]
1126 #[cfg(feature = "diagnostics")]
1127 fn test_strand_registry_concurrent_stress() {
1128 use std::sync::Arc;
1129 use std::thread;
1130
1131 let registry = Arc::new(StrandRegistry::new(50)); // Moderate capacity
1132
1133 let handles: Vec<_> = (0..100)
1134 .map(|i| {
1135 let reg = Arc::clone(®istry);
1136 thread::spawn(move || {
1137 let id = (i + 1) as u64;
1138 // Register
1139 let _ = reg.register(id);
1140 // Brief work
1141 thread::yield_now();
1142 // Unregister
1143 reg.unregister(id);
1144 })
1145 })
1146 .collect();
1147
1148 for h in handles {
1149 h.join().unwrap();
1150 }
1151
1152 // All slots should be free after all threads complete
1153 assert_eq!(registry.active_strands().count(), 0);
1154 }
1155
1156 #[test]
1157 fn test_strand_lifecycle_counters() {
1158 unsafe {
1159 // Reset counters for isolation (not perfect but helps)
1160 let initial_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
1161 let initial_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
1162
1163 static COUNTER: AtomicU32 = AtomicU32::new(0);
1164
1165 extern "C" fn simple_work(_stack: Stack) -> Stack {
1166 COUNTER.fetch_add(1, Ordering::SeqCst);
1167 std::ptr::null_mut()
1168 }
1169
1170 COUNTER.store(0, Ordering::SeqCst);
1171
1172 // Spawn some strands
1173 for _ in 0..10 {
1174 strand_spawn(simple_work, std::ptr::null_mut());
1175 }
1176
1177 wait_all_strands();
1178
1179 // Verify counters incremented
1180 let final_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
1181 let final_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
1182
1183 assert!(
1184 final_spawned >= initial_spawned + 10,
1185 "TOTAL_SPAWNED should have increased by at least 10"
1186 );
1187 assert!(
1188 final_completed >= initial_completed + 10,
1189 "TOTAL_COMPLETED should have increased by at least 10"
1190 );
1191 assert_eq!(COUNTER.load(Ordering::SeqCst), 10);
1192 }
1193 }
1194
1195 // =========================================================================
1196 // Yield Safety Valve Tests
1197 // =========================================================================
1198
1199 #[test]
1200 fn test_maybe_yield_disabled_by_default() {
1201 // When SEQ_YIELD_INTERVAL is not set (or 0), maybe_yield should be a no-op
1202 // This test verifies it doesn't panic and returns quickly
1203 for _ in 0..1000 {
1204 patch_seq_maybe_yield();
1205 }
1206 }
1207
1208 #[test]
1209 fn test_tail_call_counter_increments() {
1210 // Verify the thread-local counter increments correctly
1211 TAIL_CALL_COUNTER.with(|counter| {
1212 let initial = counter.get();
1213 patch_seq_maybe_yield();
1214 patch_seq_maybe_yield();
1215 patch_seq_maybe_yield();
1216 // Counter should have incremented (if threshold > 0) or stayed same (if disabled)
1217 // Either way, it shouldn't panic
1218 let _ = counter.get();
1219 // Reset to avoid affecting other tests
1220 counter.set(initial);
1221 });
1222 }
1223
1224 #[test]
1225 fn test_counter_overflow_safety() {
1226 // Verify wrapping_add prevents overflow panic
1227 TAIL_CALL_COUNTER.with(|counter| {
1228 let initial = counter.get();
1229 // Set counter near max to test overflow behavior
1230 counter.set(u64::MAX - 1);
1231 // These calls should not panic due to overflow
1232 patch_seq_maybe_yield();
1233 patch_seq_maybe_yield();
1234 patch_seq_maybe_yield();
1235 // Reset
1236 counter.set(initial);
1237 });
1238 }
1239}