seq_runtime/
scheduler.rs

1//! Scheduler - Green Thread Management with May
2//!
3//! CSP-style concurrency for Seq using May coroutines.
4//! Each strand is a lightweight green thread that can communicate via channels.
5//!
6//! ## Non-Blocking Guarantee
7//!
8//! Channel operations (`send`, `receive`) use May's cooperative blocking and NEVER
9//! block OS threads. However, I/O operations (`write_line`, `read_line` in io.rs)
10//! currently use blocking syscalls. Future work will make all I/O non-blocking.
11//!
12//! ## Panic Behavior
13//!
14//! Functions panic on invalid input (null stacks, negative IDs, closed channels).
15//! In a production system, consider implementing error channels or Result-based
16//! error handling instead of panicking.
17
18use crate::pool;
19use crate::stack::{Stack, StackNode};
20use may::coroutine;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::{Condvar, Mutex, Once};
23
24static SCHEDULER_INIT: Once = Once::new();
25
26// Strand lifecycle tracking
27//
28// Design rationale:
29// - ACTIVE_STRANDS: Lock-free atomic counter for the hot path (spawn/complete)
30//   Every strand increments on spawn, decrements on complete. This is extremely
31//   fast (lock-free atomic ops) and suitable for high-frequency operations.
32//
33// - SHUTDOWN_CONDVAR/MUTEX: Event-driven synchronization for the cold path (shutdown wait)
34//   Used only when waiting for all strands to complete (program shutdown).
35//   Condvar provides event-driven wakeup instead of polling, which is critical
36//   for a systems language - no CPU waste, proper OS-level blocking.
37//
38// Why not track JoinHandles?
39// Strands are like Erlang processes - potentially hundreds of thousands of concurrent
40// entities with independent lifecycles. Storing handles would require global mutable
41// state with synchronization overhead on the hot path. The counter + condvar approach
42// keeps the hot path lock-free while providing proper shutdown synchronization.
43pub static ACTIVE_STRANDS: AtomicUsize = AtomicUsize::new(0);
44static SHUTDOWN_CONDVAR: Condvar = Condvar::new();
45static SHUTDOWN_MUTEX: Mutex<()> = Mutex::new(());
46
47// Strand lifecycle statistics (for diagnostics)
48//
49// These counters provide observability into strand lifecycle without any locking.
50// All operations are lock-free atomic increments/loads.
51//
52// - TOTAL_SPAWNED: Monotonically increasing count of all strands ever spawned
53// - TOTAL_COMPLETED: Monotonically increasing count of all strands that completed
54// - PEAK_STRANDS: High-water mark of concurrent strands (helps detect strand leaks)
55//
56// Useful diagnostics:
57// - Currently running: ACTIVE_STRANDS
58// - Completed successfully: TOTAL_COMPLETED
59// - Potential leaks: TOTAL_SPAWNED - TOTAL_COMPLETED - ACTIVE_STRANDS > 0 (strands lost)
60// - Peak concurrency: PEAK_STRANDS
61pub static TOTAL_SPAWNED: AtomicU64 = AtomicU64::new(0);
62pub static TOTAL_COMPLETED: AtomicU64 = AtomicU64::new(0);
63pub static PEAK_STRANDS: AtomicUsize = AtomicUsize::new(0);
64
65// Unique strand ID generation
66static NEXT_STRAND_ID: AtomicU64 = AtomicU64::new(1);
67
68// =============================================================================
69// Lock-Free Strand Registry
70// =============================================================================
71//
72// A fixed-size array of slots for tracking active strands without locks.
73// Each slot stores a strand ID (0 = free) and spawn timestamp.
74//
75// Design principles:
76// - Fixed size: No dynamic allocation, predictable memory footprint
77// - Lock-free: All operations use atomic CAS, no mutex contention
78// - Bounded: If registry is full, strands still run but aren't tracked
79// - Zero cost when not querying: Only diagnostics reads the registry
80//
81// Slot encoding:
82// - strand_id == 0: slot is free
83// - strand_id > 0: slot contains an active strand
84//
85// The registry size can be configured via SEQ_STRAND_REGISTRY_SIZE env var.
86// Default is 1024 slots, which is sufficient for most applications.
87
88/// Default strand registry size (number of trackable concurrent strands)
89const DEFAULT_REGISTRY_SIZE: usize = 1024;
90
91/// A slot in the strand registry
92///
93/// Uses two atomics to store strand info without locks.
94/// A slot is free when strand_id == 0.
95pub struct StrandSlot {
96    /// Strand ID (0 = free, >0 = active strand)
97    pub strand_id: AtomicU64,
98    /// Spawn timestamp (seconds since UNIX epoch, for detecting stuck strands)
99    pub spawn_time: AtomicU64,
100}
101
102impl StrandSlot {
103    const fn new() -> Self {
104        Self {
105            strand_id: AtomicU64::new(0),
106            spawn_time: AtomicU64::new(0),
107        }
108    }
109}
110
111/// Lock-free strand registry
112///
113/// Provides O(n) registration (scan for free slot) and O(n) unregistration.
114/// This is acceptable because:
115/// 1. N is bounded (default 1024)
116/// 2. Registration/unregistration are infrequent compared to strand work
117/// 3. No locks means no contention, just atomic ops
118pub struct StrandRegistry {
119    slots: Box<[StrandSlot]>,
120    /// Number of slots that couldn't be registered (registry full)
121    pub overflow_count: AtomicU64,
122}
123
124impl StrandRegistry {
125    /// Create a new registry with the given capacity
126    fn new(capacity: usize) -> Self {
127        let mut slots = Vec::with_capacity(capacity);
128        for _ in 0..capacity {
129            slots.push(StrandSlot::new());
130        }
131        Self {
132            slots: slots.into_boxed_slice(),
133            overflow_count: AtomicU64::new(0),
134        }
135    }
136
137    /// Register a strand, returning the slot index if successful
138    ///
139    /// Uses CAS to atomically claim a free slot.
140    /// Returns None if the registry is full (strand still runs, just not tracked).
141    pub fn register(&self, strand_id: u64) -> Option<usize> {
142        let spawn_time = std::time::SystemTime::now()
143            .duration_since(std::time::UNIX_EPOCH)
144            .map(|d| d.as_secs())
145            .unwrap_or(0);
146
147        // Scan for a free slot
148        for (idx, slot) in self.slots.iter().enumerate() {
149            // Set spawn time first, before claiming the slot
150            // This prevents a race where a reader sees strand_id != 0 but spawn_time == 0
151            // If we fail to claim the slot, the owner will overwrite this value anyway
152            slot.spawn_time.store(spawn_time, Ordering::Relaxed);
153
154            // Try to claim this slot (CAS from 0 to strand_id)
155            // AcqRel ensures the spawn_time write above is visible before strand_id becomes non-zero
156            if slot
157                .strand_id
158                .compare_exchange(0, strand_id, Ordering::AcqRel, Ordering::Relaxed)
159                .is_ok()
160            {
161                return Some(idx);
162            }
163        }
164
165        // Registry full - track overflow but strand still runs
166        self.overflow_count.fetch_add(1, Ordering::Relaxed);
167        None
168    }
169
170    /// Unregister a strand by ID
171    ///
172    /// Scans for the slot containing this strand ID and clears it.
173    /// Returns true if found and cleared, false if not found.
174    ///
175    /// Note: ABA problem is not a concern here because strand IDs are monotonically
176    /// increasing u64 values. ID reuse would require 2^64 spawns, which is practically
177    /// impossible (at 1 billion spawns/sec, it would take ~584 years).
178    pub fn unregister(&self, strand_id: u64) -> bool {
179        for slot in self.slots.iter() {
180            // Check if this slot contains our strand
181            if slot
182                .strand_id
183                .compare_exchange(strand_id, 0, Ordering::AcqRel, Ordering::Relaxed)
184                .is_ok()
185            {
186                // Successfully cleared the slot
187                slot.spawn_time.store(0, Ordering::Release);
188                return true;
189            }
190        }
191        false
192    }
193
194    /// Iterate over active strands (for diagnostics)
195    ///
196    /// Returns an iterator of (strand_id, spawn_time) for non-empty slots.
197    /// Note: This is a snapshot and may be slightly inconsistent due to concurrent updates.
198    pub fn active_strands(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
199        self.slots.iter().filter_map(|slot| {
200            // Acquire on strand_id synchronizes with the Release in register()
201            let id = slot.strand_id.load(Ordering::Acquire);
202            if id > 0 {
203                // Relaxed is sufficient here - we've already synchronized via strand_id Acquire
204                // and spawn_time is written before strand_id in register()
205                let time = slot.spawn_time.load(Ordering::Relaxed);
206                Some((id, time))
207            } else {
208                None
209            }
210        })
211    }
212
213    /// Get the registry capacity
214    pub fn capacity(&self) -> usize {
215        self.slots.len()
216    }
217}
218
219// Global strand registry (lazy initialized)
220static STRAND_REGISTRY: std::sync::OnceLock<StrandRegistry> = std::sync::OnceLock::new();
221
222/// Get or initialize the global strand registry
223pub fn strand_registry() -> &'static StrandRegistry {
224    STRAND_REGISTRY.get_or_init(|| {
225        let size = std::env::var("SEQ_STRAND_REGISTRY_SIZE")
226            .ok()
227            .and_then(|s| s.parse().ok())
228            .unwrap_or(DEFAULT_REGISTRY_SIZE);
229        StrandRegistry::new(size)
230    })
231}
232
233/// Default coroutine stack size: 128KB (0x20000 bytes)
234/// Reduced from 1MB for better spawn performance (~16% faster in benchmarks).
235/// Can be overridden via SEQ_STACK_SIZE environment variable.
236const DEFAULT_STACK_SIZE: usize = 0x20000;
237
238/// Parse stack size from an optional string value.
239/// Returns the parsed size, or DEFAULT_STACK_SIZE if the value is missing, zero, or invalid.
240/// Prints a warning to stderr for invalid values.
241fn parse_stack_size(env_value: Option<String>) -> usize {
242    match env_value {
243        Some(val) => match val.parse::<usize>() {
244            Ok(0) => {
245                eprintln!(
246                    "Warning: SEQ_STACK_SIZE=0 is invalid, using default {}",
247                    DEFAULT_STACK_SIZE
248                );
249                DEFAULT_STACK_SIZE
250            }
251            Ok(size) => size,
252            Err(_) => {
253                eprintln!(
254                    "Warning: SEQ_STACK_SIZE='{}' is not a valid number, using default {}",
255                    val, DEFAULT_STACK_SIZE
256                );
257                DEFAULT_STACK_SIZE
258            }
259        },
260        None => DEFAULT_STACK_SIZE,
261    }
262}
263
264/// Initialize the scheduler
265///
266/// # Safety
267/// Safe to call multiple times (idempotent via Once).
268/// Configures May coroutines with appropriate stack size for LLVM-generated code.
269#[unsafe(no_mangle)]
270pub unsafe extern "C" fn patch_seq_scheduler_init() {
271    SCHEDULER_INIT.call_once(|| {
272        // Configure stack size for coroutines
273        // Default is 1MB, which is balanced between safety and May's maximum limit
274        // May has internal maximum (attempting 64MB causes ExceedsMaximumSize panic)
275        //
276        // Can be overridden via SEQ_STACK_SIZE environment variable (in bytes)
277        // Example: SEQ_STACK_SIZE=2097152 for 2MB
278        // Invalid values (non-numeric, zero) are warned and ignored.
279        let stack_size = parse_stack_size(std::env::var("SEQ_STACK_SIZE").ok());
280        may::config().set_stack_size(stack_size);
281
282        // Install SIGQUIT handler for runtime diagnostics (kill -3)
283        crate::diagnostics::install_signal_handler();
284
285        // Install watchdog timer (if enabled via SEQ_WATCHDOG_SECS)
286        crate::watchdog::install_watchdog();
287    });
288}
289
290/// Run the scheduler and wait for all coroutines to complete
291///
292/// # Safety
293/// Returns the final stack (always null for now since May handles all scheduling).
294/// This function blocks until all spawned strands have completed.
295///
296/// Uses a condition variable for event-driven shutdown synchronization rather than
297/// polling. The mutex is only held during the wait protocol, not during strand
298/// execution, so there's no contention on the hot path.
299#[unsafe(no_mangle)]
300pub unsafe extern "C" fn patch_seq_scheduler_run() -> Stack {
301    let mut guard = SHUTDOWN_MUTEX.lock().expect(
302        "scheduler_run: shutdown mutex poisoned - strand panicked during shutdown synchronization",
303    );
304
305    // Wait for all strands to complete
306    // The condition variable will be notified when the last strand exits
307    while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
308        guard = SHUTDOWN_CONDVAR
309            .wait(guard)
310            .expect("scheduler_run: condvar wait failed - strand panicked during shutdown wait");
311    }
312
313    // All strands have completed
314    std::ptr::null_mut()
315}
316
317/// Shutdown the scheduler
318///
319/// # Safety
320/// Safe to call. May doesn't require explicit shutdown, so this is a no-op.
321#[unsafe(no_mangle)]
322pub unsafe extern "C" fn patch_seq_scheduler_shutdown() {
323    // May doesn't require explicit shutdown
324    // This function exists for API symmetry with init
325}
326
327/// Spawn a strand (coroutine) with initial stack
328///
329/// # Safety
330/// - `entry` must be a valid function pointer that can safely execute on any thread
331/// - `initial_stack` must be either null or a valid pointer to a `StackNode` that:
332///   - Was heap-allocated (e.g., via Box)
333///   - Has a 'static lifetime or lives longer than the coroutine
334///   - Is safe to access from the spawned thread
335/// - The caller transfers ownership of `initial_stack` to the coroutine
336/// - Returns a unique strand ID (positive integer)
337///
338/// # Memory Management
339/// The spawned coroutine takes ownership of `initial_stack` and will automatically
340/// free the final stack returned by `entry` upon completion.
341#[unsafe(no_mangle)]
342pub unsafe extern "C" fn patch_seq_strand_spawn(
343    entry: extern "C" fn(Stack) -> Stack,
344    initial_stack: Stack,
345) -> i64 {
346    // Generate unique strand ID
347    let strand_id = NEXT_STRAND_ID.fetch_add(1, Ordering::Relaxed);
348
349    // Increment active strand counter and track total spawned
350    let new_count = ACTIVE_STRANDS.fetch_add(1, Ordering::Release) + 1;
351    TOTAL_SPAWNED.fetch_add(1, Ordering::Relaxed);
352
353    // Update peak strands if this is a new high-water mark
354    // Uses a CAS loop to safely update the maximum without locks
355    // Uses Acquire/Release ordering for proper synchronization with diagnostics reads
356    let mut peak = PEAK_STRANDS.load(Ordering::Acquire);
357    while new_count > peak {
358        match PEAK_STRANDS.compare_exchange_weak(
359            peak,
360            new_count,
361            Ordering::Release,
362            Ordering::Relaxed,
363        ) {
364            Ok(_) => break,
365            Err(current) => peak = current,
366        }
367    }
368
369    // Register strand in the registry (for diagnostics visibility)
370    // If registry is full, strand still runs but isn't tracked
371    let _ = strand_registry().register(strand_id);
372
373    // Function pointers are already Send, no wrapper needed
374    let entry_fn = entry;
375
376    // Convert pointer to usize (which is Send)
377    // This is necessary because *mut T is !Send, but the caller guarantees thread safety
378    let stack_addr = initial_stack as usize;
379
380    unsafe {
381        coroutine::spawn(move || {
382            // Reconstruct pointer from address
383            let stack_ptr = stack_addr as *mut StackNode;
384
385            // Debug assertion: validate stack pointer alignment and reasonable address
386            debug_assert!(
387                stack_ptr.is_null() || stack_addr.is_multiple_of(std::mem::align_of::<StackNode>()),
388                "Stack pointer must be null or properly aligned"
389            );
390            debug_assert!(
391                stack_ptr.is_null() || stack_addr > 0x1000,
392                "Stack pointer appears to be in invalid memory region (< 0x1000)"
393            );
394
395            // Execute the entry function
396            let final_stack = entry_fn(stack_ptr);
397
398            // Clean up the final stack to prevent memory leak
399            free_stack(final_stack);
400
401            // Unregister strand from registry (uses captured strand_id)
402            strand_registry().unregister(strand_id);
403
404            // Decrement active strand counter first, then track completion
405            // This ordering ensures the invariant SPAWNED = COMPLETED + ACTIVE + lost
406            // is never violated from an external observer's perspective
407            // Use AcqRel to establish proper synchronization (both acquire and release barriers)
408            let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
409
410            // Track completion after decrementing active count
411            TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
412            if prev_count == 1 {
413                // We were the last strand - acquire mutex and signal shutdown
414                // The mutex must be held when calling notify to prevent missed wakeups
415                let _guard = SHUTDOWN_MUTEX.lock()
416                    .expect("strand_spawn: shutdown mutex poisoned - strand panicked during shutdown notification");
417                SHUTDOWN_CONDVAR.notify_all();
418            }
419        });
420    }
421
422    strand_id as i64
423}
424
425/// Free a stack allocated by the runtime
426///
427/// # Safety
428/// - `stack` must be either:
429///   - A null pointer (safe, will be a no-op)
430///   - A valid pointer returned by runtime stack functions (push, etc.)
431/// - The pointer must not have been previously freed
432/// - After calling this function, the pointer is invalid and must not be used
433/// - This function takes ownership and returns nodes to the pool
434///
435/// # Performance
436/// Returns nodes to thread-local pool for reuse instead of freeing to heap
437fn free_stack(mut stack: Stack) {
438    if !stack.is_null() {
439        use crate::value::Value;
440        unsafe {
441            // Walk the stack and return each node to the pool
442            while !stack.is_null() {
443                let next = (*stack).next;
444                // Drop the value, then return node to pool
445                // We need to drop the value to free any heap allocations (String, Variant)
446                drop(std::mem::replace(&mut (*stack).value, Value::Int(0)));
447                // Return node to pool for reuse
448                pool::pool_free(stack);
449                stack = next;
450            }
451        }
452    }
453
454    // Reset the thread-local arena to free all arena-allocated strings
455    // This is safe because:
456    // - Any arena strings in Values have been dropped above
457    // - Global strings are unaffected (they have their own allocations)
458    // - Channel sends clone to global, so no cross-strand arena pointers
459    crate::arena::arena_reset();
460}
461
462/// Legacy spawn_strand function (kept for compatibility)
463///
464/// # Safety
465/// `entry` must be a valid function pointer that can safely execute on any thread.
466#[unsafe(no_mangle)]
467pub unsafe extern "C" fn patch_seq_spawn_strand(entry: extern "C" fn(Stack) -> Stack) {
468    unsafe {
469        patch_seq_strand_spawn(entry, std::ptr::null_mut());
470    }
471}
472
473/// Yield execution to allow other coroutines to run
474///
475/// # Safety
476/// Always safe to call from within a May coroutine.
477#[unsafe(no_mangle)]
478pub unsafe extern "C" fn patch_seq_yield_strand(stack: Stack) -> Stack {
479    coroutine::yield_now();
480    stack
481}
482
483/// Wait for all strands to complete
484///
485/// # Safety
486/// Always safe to call. Blocks until all spawned strands have completed.
487///
488/// Uses event-driven synchronization via condition variable - no polling overhead.
489#[unsafe(no_mangle)]
490pub unsafe extern "C" fn patch_seq_wait_all_strands() {
491    let mut guard = SHUTDOWN_MUTEX.lock()
492        .expect("wait_all_strands: shutdown mutex poisoned - strand panicked during shutdown synchronization");
493
494    // Wait for all strands to complete
495    // The condition variable will be notified when the last strand exits
496    while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
497        guard = SHUTDOWN_CONDVAR
498            .wait(guard)
499            .expect("wait_all_strands: condvar wait failed - strand panicked during shutdown wait");
500    }
501}
502
503// Public re-exports with short names for internal use
504pub use patch_seq_scheduler_init as scheduler_init;
505pub use patch_seq_scheduler_run as scheduler_run;
506pub use patch_seq_scheduler_shutdown as scheduler_shutdown;
507pub use patch_seq_spawn_strand as spawn_strand;
508pub use patch_seq_strand_spawn as strand_spawn;
509pub use patch_seq_wait_all_strands as wait_all_strands;
510pub use patch_seq_yield_strand as yield_strand;
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use crate::stack::push;
516    use crate::value::Value;
517    use std::sync::atomic::{AtomicU32, Ordering};
518
519    #[test]
520    fn test_spawn_strand() {
521        unsafe {
522            static COUNTER: AtomicU32 = AtomicU32::new(0);
523
524            extern "C" fn test_entry(_stack: Stack) -> Stack {
525                COUNTER.fetch_add(1, Ordering::SeqCst);
526                std::ptr::null_mut()
527            }
528
529            for _ in 0..100 {
530                spawn_strand(test_entry);
531            }
532
533            std::thread::sleep(std::time::Duration::from_millis(200));
534            assert_eq!(COUNTER.load(Ordering::SeqCst), 100);
535        }
536    }
537
538    #[test]
539    fn test_scheduler_init_idempotent() {
540        unsafe {
541            // Should be safe to call multiple times
542            scheduler_init();
543            scheduler_init();
544            scheduler_init();
545        }
546    }
547
548    #[test]
549    fn test_free_stack_null() {
550        // Freeing null should be a no-op
551        free_stack(std::ptr::null_mut());
552    }
553
554    #[test]
555    fn test_free_stack_valid() {
556        unsafe {
557            // Create a stack, then free it
558            let stack = push(std::ptr::null_mut(), Value::Int(42));
559            free_stack(stack);
560            // If we get here without crashing, test passed
561        }
562    }
563
564    #[test]
565    fn test_strand_spawn_with_stack() {
566        unsafe {
567            static COUNTER: AtomicU32 = AtomicU32::new(0);
568
569            extern "C" fn test_entry(stack: Stack) -> Stack {
570                COUNTER.fetch_add(1, Ordering::SeqCst);
571                // Return the stack as-is (caller will free it)
572                stack
573            }
574
575            let initial_stack = push(std::ptr::null_mut(), Value::Int(99));
576            strand_spawn(test_entry, initial_stack);
577
578            std::thread::sleep(std::time::Duration::from_millis(200));
579            assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
580        }
581    }
582
583    #[test]
584    fn test_scheduler_shutdown() {
585        unsafe {
586            scheduler_init();
587            scheduler_shutdown();
588            // Should not crash
589        }
590    }
591
592    #[test]
593    fn test_many_strands_stress() {
594        unsafe {
595            static COUNTER: AtomicU32 = AtomicU32::new(0);
596
597            extern "C" fn increment(_stack: Stack) -> Stack {
598                COUNTER.fetch_add(1, Ordering::SeqCst);
599                std::ptr::null_mut()
600            }
601
602            // Reset counter for this test
603            COUNTER.store(0, Ordering::SeqCst);
604
605            // Spawn many strands to stress test synchronization
606            for _ in 0..1000 {
607                strand_spawn(increment, std::ptr::null_mut());
608            }
609
610            // Wait for all to complete
611            wait_all_strands();
612
613            // Verify all strands executed
614            assert_eq!(COUNTER.load(Ordering::SeqCst), 1000);
615        }
616    }
617
618    #[test]
619    fn test_strand_ids_are_unique() {
620        unsafe {
621            use std::collections::HashSet;
622
623            extern "C" fn noop(_stack: Stack) -> Stack {
624                std::ptr::null_mut()
625            }
626
627            // Spawn strands and collect their IDs
628            let mut ids = Vec::new();
629            for _ in 0..100 {
630                let id = strand_spawn(noop, std::ptr::null_mut());
631                ids.push(id);
632            }
633
634            // Wait for completion
635            wait_all_strands();
636
637            // Verify all IDs are unique
638            let unique_ids: HashSet<_> = ids.iter().collect();
639            assert_eq!(unique_ids.len(), 100, "All strand IDs should be unique");
640
641            // Verify all IDs are positive
642            assert!(
643                ids.iter().all(|&id| id > 0),
644                "All strand IDs should be positive"
645            );
646        }
647    }
648
649    #[test]
650    fn test_arena_reset_with_strands() {
651        unsafe {
652            use crate::arena;
653            use crate::seqstring::arena_string;
654
655            extern "C" fn create_temp_strings(stack: Stack) -> Stack {
656                // Create many temporary arena strings (simulating request parsing)
657                for i in 0..100 {
658                    let temp = arena_string(&format!("temporary string {}", i));
659                    // Use the string temporarily
660                    assert!(!temp.as_str().is_empty());
661                    // String is dropped, but memory stays in arena
662                }
663
664                // Arena should have allocated memory
665                let stats = arena::arena_stats();
666                assert!(stats.allocated_bytes > 0, "Arena should have allocations");
667
668                stack // Return empty stack
669            }
670
671            // Reset arena before test
672            arena::arena_reset();
673
674            // Spawn strand that creates many temp strings
675            strand_spawn(create_temp_strings, std::ptr::null_mut());
676
677            // Wait for strand to complete (which calls free_stack -> arena_reset)
678            wait_all_strands();
679
680            // After strand exits, arena should be reset
681            let stats_after = arena::arena_stats();
682            assert_eq!(
683                stats_after.allocated_bytes, 0,
684                "Arena should be reset after strand exits"
685            );
686        }
687    }
688
689    #[test]
690    fn test_arena_with_channel_send() {
691        unsafe {
692            use crate::channel::{close_channel, make_channel};
693            use crate::stack::{pop, push};
694            use crate::value::Value;
695            use std::sync::atomic::{AtomicU32, Ordering};
696
697            static RECEIVED_COUNT: AtomicU32 = AtomicU32::new(0);
698
699            // Create channel
700            let stack = std::ptr::null_mut();
701            let stack = make_channel(stack);
702            let (stack, chan_val) = pop(stack);
703            let chan_id = match chan_val {
704                Value::Int(id) => id,
705                _ => panic!("Expected channel ID"),
706            };
707
708            // Sender strand: creates arena string, sends through channel
709            extern "C" fn sender(stack: Stack) -> Stack {
710                use crate::channel::send;
711                use crate::seqstring::arena_string;
712                use crate::stack::{pop, push};
713                use crate::value::Value;
714
715                unsafe {
716                    // Extract channel ID from stack
717                    let (stack, chan_val) = pop(stack);
718                    let chan_id = match chan_val {
719                        Value::Int(id) => id,
720                        _ => panic!("Expected channel ID"),
721                    };
722
723                    // Create arena string
724                    let msg = arena_string("Hello from sender!");
725
726                    // Push string and channel ID for send
727                    let stack = push(stack, Value::String(msg));
728                    let stack = push(stack, Value::Int(chan_id));
729
730                    // Send (will clone to global)
731                    send(stack)
732                }
733            }
734
735            // Receiver strand: receives string from channel
736            extern "C" fn receiver(stack: Stack) -> Stack {
737                use crate::channel::receive;
738                use crate::stack::{pop, push};
739                use crate::value::Value;
740                use std::sync::atomic::Ordering;
741
742                unsafe {
743                    // Extract channel ID from stack
744                    let (stack, chan_val) = pop(stack);
745                    let chan_id = match chan_val {
746                        Value::Int(id) => id,
747                        _ => panic!("Expected channel ID"),
748                    };
749
750                    // Push channel ID for receive
751                    let stack = push(stack, Value::Int(chan_id));
752
753                    // Receive message
754                    let stack = receive(stack);
755
756                    // Pop and verify message
757                    let (stack, msg_val) = pop(stack);
758                    match msg_val {
759                        Value::String(s) => {
760                            assert_eq!(s.as_str(), "Hello from sender!");
761                            RECEIVED_COUNT.fetch_add(1, Ordering::SeqCst);
762                        }
763                        _ => panic!("Expected String"),
764                    }
765
766                    stack
767                }
768            }
769
770            // Spawn sender and receiver
771            let sender_stack = push(std::ptr::null_mut(), Value::Int(chan_id));
772            strand_spawn(sender, sender_stack);
773
774            let receiver_stack = push(std::ptr::null_mut(), Value::Int(chan_id));
775            strand_spawn(receiver, receiver_stack);
776
777            // Wait for both strands
778            wait_all_strands();
779
780            // Verify message was received
781            assert_eq!(
782                RECEIVED_COUNT.load(Ordering::SeqCst),
783                1,
784                "Receiver should have received message"
785            );
786
787            // Clean up channel
788            let stack = push(stack, Value::Int(chan_id));
789            close_channel(stack);
790        }
791    }
792
793    #[test]
794    fn test_no_memory_leak_over_many_iterations() {
795        // PR #11 feedback: Verify 10K+ strand iterations don't cause memory growth
796        unsafe {
797            use crate::arena;
798            use crate::seqstring::arena_string;
799
800            extern "C" fn allocate_strings_and_exit(stack: Stack) -> Stack {
801                // Simulate request processing: many temp allocations
802                for i in 0..50 {
803                    let temp = arena_string(&format!("request header {}", i));
804                    assert!(!temp.as_str().is_empty());
805                    // Strings dropped here but arena memory stays allocated
806                }
807                stack
808            }
809
810            // Run many iterations to detect leaks
811            let iterations = 10_000;
812
813            for i in 0..iterations {
814                // Reset arena before each iteration to start fresh
815                arena::arena_reset();
816
817                // Spawn strand, let it allocate strings, then exit
818                strand_spawn(allocate_strings_and_exit, std::ptr::null_mut());
819
820                // Wait for completion (triggers arena reset)
821                wait_all_strands();
822
823                // Every 1000 iterations, verify arena is actually reset
824                if i % 1000 == 0 {
825                    let stats = arena::arena_stats();
826                    assert_eq!(
827                        stats.allocated_bytes, 0,
828                        "Arena not reset after iteration {} (leaked {} bytes)",
829                        i, stats.allocated_bytes
830                    );
831                }
832            }
833
834            // Final verification: arena should be empty
835            let final_stats = arena::arena_stats();
836            assert_eq!(
837                final_stats.allocated_bytes, 0,
838                "Arena leaked memory after {} iterations ({} bytes)",
839                iterations, final_stats.allocated_bytes
840            );
841
842            println!(
843                "✓ Memory leak test passed: {} iterations with no growth",
844                iterations
845            );
846        }
847    }
848
849    #[test]
850    fn test_parse_stack_size_valid() {
851        assert_eq!(parse_stack_size(Some("2097152".to_string())), 2097152);
852        assert_eq!(parse_stack_size(Some("1".to_string())), 1);
853        assert_eq!(parse_stack_size(Some("999999999".to_string())), 999999999);
854    }
855
856    #[test]
857    fn test_parse_stack_size_none() {
858        assert_eq!(parse_stack_size(None), DEFAULT_STACK_SIZE);
859    }
860
861    #[test]
862    fn test_parse_stack_size_zero() {
863        // Zero should fall back to default (with warning printed to stderr)
864        assert_eq!(parse_stack_size(Some("0".to_string())), DEFAULT_STACK_SIZE);
865    }
866
867    #[test]
868    fn test_parse_stack_size_invalid() {
869        // Non-numeric should fall back to default (with warning printed to stderr)
870        assert_eq!(
871            parse_stack_size(Some("invalid".to_string())),
872            DEFAULT_STACK_SIZE
873        );
874        assert_eq!(
875            parse_stack_size(Some("-100".to_string())),
876            DEFAULT_STACK_SIZE
877        );
878        assert_eq!(parse_stack_size(Some("".to_string())), DEFAULT_STACK_SIZE);
879        assert_eq!(
880            parse_stack_size(Some("1.5".to_string())),
881            DEFAULT_STACK_SIZE
882        );
883    }
884
885    #[test]
886    fn test_strand_registry_basic() {
887        let registry = StrandRegistry::new(10);
888
889        // Register some strands
890        assert_eq!(registry.register(1), Some(0)); // First slot
891        assert_eq!(registry.register(2), Some(1)); // Second slot
892        assert_eq!(registry.register(3), Some(2)); // Third slot
893
894        // Verify active strands
895        let active: Vec<_> = registry.active_strands().collect();
896        assert_eq!(active.len(), 3);
897
898        // Unregister one
899        assert!(registry.unregister(2));
900        let active: Vec<_> = registry.active_strands().collect();
901        assert_eq!(active.len(), 2);
902
903        // Unregister non-existent should return false
904        assert!(!registry.unregister(999));
905    }
906
907    #[test]
908    fn test_strand_registry_overflow() {
909        let registry = StrandRegistry::new(3); // Small capacity
910
911        // Fill it up
912        assert!(registry.register(1).is_some());
913        assert!(registry.register(2).is_some());
914        assert!(registry.register(3).is_some());
915
916        // Next should overflow
917        assert!(registry.register(4).is_none());
918        assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 1);
919
920        // Another overflow
921        assert!(registry.register(5).is_none());
922        assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 2);
923    }
924
925    #[test]
926    fn test_strand_registry_slot_reuse() {
927        let registry = StrandRegistry::new(3);
928
929        // Fill it up
930        registry.register(1);
931        registry.register(2);
932        registry.register(3);
933
934        // Unregister middle one
935        registry.unregister(2);
936
937        // New registration should reuse the slot
938        assert!(registry.register(4).is_some());
939        assert_eq!(registry.active_strands().count(), 3);
940    }
941
942    #[test]
943    fn test_strand_registry_concurrent_stress() {
944        use std::sync::Arc;
945        use std::thread;
946
947        let registry = Arc::new(StrandRegistry::new(50)); // Moderate capacity
948
949        let handles: Vec<_> = (0..100)
950            .map(|i| {
951                let reg = Arc::clone(&registry);
952                thread::spawn(move || {
953                    let id = (i + 1) as u64;
954                    // Register
955                    let _ = reg.register(id);
956                    // Brief work
957                    thread::yield_now();
958                    // Unregister
959                    reg.unregister(id);
960                })
961            })
962            .collect();
963
964        for h in handles {
965            h.join().unwrap();
966        }
967
968        // All slots should be free after all threads complete
969        assert_eq!(registry.active_strands().count(), 0);
970    }
971
972    #[test]
973    fn test_strand_lifecycle_counters() {
974        unsafe {
975            // Reset counters for isolation (not perfect but helps)
976            let initial_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
977            let initial_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
978
979            static COUNTER: AtomicU32 = AtomicU32::new(0);
980
981            extern "C" fn simple_work(_stack: Stack) -> Stack {
982                COUNTER.fetch_add(1, Ordering::SeqCst);
983                std::ptr::null_mut()
984            }
985
986            COUNTER.store(0, Ordering::SeqCst);
987
988            // Spawn some strands
989            for _ in 0..10 {
990                strand_spawn(simple_work, std::ptr::null_mut());
991            }
992
993            wait_all_strands();
994
995            // Verify counters incremented
996            let final_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
997            let final_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
998
999            assert!(
1000                final_spawned >= initial_spawned + 10,
1001                "TOTAL_SPAWNED should have increased by at least 10"
1002            );
1003            assert!(
1004                final_completed >= initial_completed + 10,
1005                "TOTAL_COMPLETED should have increased by at least 10"
1006            );
1007            assert_eq!(COUNTER.load(Ordering::SeqCst), 10);
1008        }
1009    }
1010}