seq_runtime/
scheduler.rs

1//! Scheduler - Green Thread Management with May
2//!
3//! CSP-style concurrency for Seq using May coroutines.
4//! Each strand is a lightweight green thread that can communicate via channels.
5//!
6//! ## Non-Blocking Guarantee
7//!
8//! Channel operations (`send`, `receive`) use May's cooperative blocking and NEVER
9//! block OS threads. However, I/O operations (`write_line`, `read_line` in io.rs)
10//! currently use blocking syscalls. Future work will make all I/O non-blocking.
11//!
12//! ## Panic Behavior
13//!
14//! Functions panic on invalid input (null stacks, negative IDs, closed channels).
15//! In a production system, consider implementing error channels or Result-based
16//! error handling instead of panicking.
17
18use crate::pool;
19use crate::stack::{Stack, StackNode};
20use may::coroutine;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::{Condvar, Mutex, Once};
23
24static SCHEDULER_INIT: Once = Once::new();
25
26// Strand lifecycle tracking
27//
28// Design rationale:
29// - ACTIVE_STRANDS: Lock-free atomic counter for the hot path (spawn/complete)
30//   Every strand increments on spawn, decrements on complete. This is extremely
31//   fast (lock-free atomic ops) and suitable for high-frequency operations.
32//
33// - SHUTDOWN_CONDVAR/MUTEX: Event-driven synchronization for the cold path (shutdown wait)
34//   Used only when waiting for all strands to complete (program shutdown).
35//   Condvar provides event-driven wakeup instead of polling, which is critical
36//   for a systems language - no CPU waste, proper OS-level blocking.
37//
38// Why not track JoinHandles?
39// Strands are like Erlang processes - potentially hundreds of thousands of concurrent
40// entities with independent lifecycles. Storing handles would require global mutable
41// state with synchronization overhead on the hot path. The counter + condvar approach
42// keeps the hot path lock-free while providing proper shutdown synchronization.
43pub static ACTIVE_STRANDS: AtomicUsize = AtomicUsize::new(0);
44static SHUTDOWN_CONDVAR: Condvar = Condvar::new();
45static SHUTDOWN_MUTEX: Mutex<()> = Mutex::new(());
46
47// Strand lifecycle statistics (for diagnostics)
48//
49// These counters provide observability into strand lifecycle without any locking.
50// All operations are lock-free atomic increments/loads.
51//
52// - TOTAL_SPAWNED: Monotonically increasing count of all strands ever spawned
53// - TOTAL_COMPLETED: Monotonically increasing count of all strands that completed
54// - PEAK_STRANDS: High-water mark of concurrent strands (helps detect strand leaks)
55//
56// Useful diagnostics:
57// - Currently running: ACTIVE_STRANDS
58// - Completed successfully: TOTAL_COMPLETED
59// - Potential leaks: TOTAL_SPAWNED - TOTAL_COMPLETED - ACTIVE_STRANDS > 0 (strands lost)
60// - Peak concurrency: PEAK_STRANDS
61pub static TOTAL_SPAWNED: AtomicU64 = AtomicU64::new(0);
62pub static TOTAL_COMPLETED: AtomicU64 = AtomicU64::new(0);
63pub static PEAK_STRANDS: AtomicUsize = AtomicUsize::new(0);
64
65// Unique strand ID generation
66static NEXT_STRAND_ID: AtomicU64 = AtomicU64::new(1);
67
68// =============================================================================
69// Lock-Free Strand Registry
70// =============================================================================
71//
72// A fixed-size array of slots for tracking active strands without locks.
73// Each slot stores a strand ID (0 = free) and spawn timestamp.
74//
75// Design principles:
76// - Fixed size: No dynamic allocation, predictable memory footprint
77// - Lock-free: All operations use atomic CAS, no mutex contention
78// - Bounded: If registry is full, strands still run but aren't tracked
79// - Zero cost when not querying: Only diagnostics reads the registry
80//
81// Slot encoding:
82// - strand_id == 0: slot is free
83// - strand_id > 0: slot contains an active strand
84//
85// The registry size can be configured via SEQ_STRAND_REGISTRY_SIZE env var.
86// Default is 1024 slots, which is sufficient for most applications.
87
88/// Default strand registry size (number of trackable concurrent strands)
89const DEFAULT_REGISTRY_SIZE: usize = 1024;
90
91/// A slot in the strand registry
92///
93/// Uses two atomics to store strand info without locks.
94/// A slot is free when strand_id == 0.
95pub struct StrandSlot {
96    /// Strand ID (0 = free, >0 = active strand)
97    pub strand_id: AtomicU64,
98    /// Spawn timestamp (seconds since UNIX epoch, for detecting stuck strands)
99    pub spawn_time: AtomicU64,
100}
101
102impl StrandSlot {
103    const fn new() -> Self {
104        Self {
105            strand_id: AtomicU64::new(0),
106            spawn_time: AtomicU64::new(0),
107        }
108    }
109}
110
111/// Lock-free strand registry
112///
113/// Provides O(n) registration (scan for free slot) and O(n) unregistration.
114/// This is acceptable because:
115/// 1. N is bounded (default 1024)
116/// 2. Registration/unregistration are infrequent compared to strand work
117/// 3. No locks means no contention, just atomic ops
118pub struct StrandRegistry {
119    slots: Box<[StrandSlot]>,
120    /// Number of slots that couldn't be registered (registry full)
121    pub overflow_count: AtomicU64,
122}
123
124impl StrandRegistry {
125    /// Create a new registry with the given capacity
126    fn new(capacity: usize) -> Self {
127        let mut slots = Vec::with_capacity(capacity);
128        for _ in 0..capacity {
129            slots.push(StrandSlot::new());
130        }
131        Self {
132            slots: slots.into_boxed_slice(),
133            overflow_count: AtomicU64::new(0),
134        }
135    }
136
137    /// Register a strand, returning the slot index if successful
138    ///
139    /// Uses CAS to atomically claim a free slot.
140    /// Returns None if the registry is full (strand still runs, just not tracked).
141    pub fn register(&self, strand_id: u64) -> Option<usize> {
142        let spawn_time = std::time::SystemTime::now()
143            .duration_since(std::time::UNIX_EPOCH)
144            .map(|d| d.as_secs())
145            .unwrap_or(0);
146
147        // Scan for a free slot
148        for (idx, slot) in self.slots.iter().enumerate() {
149            // Set spawn time first, before claiming the slot
150            // This prevents a race where a reader sees strand_id != 0 but spawn_time == 0
151            // If we fail to claim the slot, the owner will overwrite this value anyway
152            slot.spawn_time.store(spawn_time, Ordering::Relaxed);
153
154            // Try to claim this slot (CAS from 0 to strand_id)
155            // AcqRel ensures the spawn_time write above is visible before strand_id becomes non-zero
156            if slot
157                .strand_id
158                .compare_exchange(0, strand_id, Ordering::AcqRel, Ordering::Relaxed)
159                .is_ok()
160            {
161                return Some(idx);
162            }
163        }
164
165        // Registry full - track overflow but strand still runs
166        self.overflow_count.fetch_add(1, Ordering::Relaxed);
167        None
168    }
169
170    /// Unregister a strand by ID
171    ///
172    /// Scans for the slot containing this strand ID and clears it.
173    /// Returns true if found and cleared, false if not found.
174    ///
175    /// Note: ABA problem is not a concern here because strand IDs are monotonically
176    /// increasing u64 values. ID reuse would require 2^64 spawns, which is practically
177    /// impossible (at 1 billion spawns/sec, it would take ~584 years).
178    pub fn unregister(&self, strand_id: u64) -> bool {
179        for slot in self.slots.iter() {
180            // Check if this slot contains our strand
181            if slot
182                .strand_id
183                .compare_exchange(strand_id, 0, Ordering::AcqRel, Ordering::Relaxed)
184                .is_ok()
185            {
186                // Successfully cleared the slot
187                slot.spawn_time.store(0, Ordering::Release);
188                return true;
189            }
190        }
191        false
192    }
193
194    /// Iterate over active strands (for diagnostics)
195    ///
196    /// Returns an iterator of (strand_id, spawn_time) for non-empty slots.
197    /// Note: This is a snapshot and may be slightly inconsistent due to concurrent updates.
198    pub fn active_strands(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
199        self.slots.iter().filter_map(|slot| {
200            // Acquire on strand_id synchronizes with the Release in register()
201            let id = slot.strand_id.load(Ordering::Acquire);
202            if id > 0 {
203                // Relaxed is sufficient here - we've already synchronized via strand_id Acquire
204                // and spawn_time is written before strand_id in register()
205                let time = slot.spawn_time.load(Ordering::Relaxed);
206                Some((id, time))
207            } else {
208                None
209            }
210        })
211    }
212
213    /// Get the registry capacity
214    pub fn capacity(&self) -> usize {
215        self.slots.len()
216    }
217}
218
219// Global strand registry (lazy initialized)
220static STRAND_REGISTRY: std::sync::OnceLock<StrandRegistry> = std::sync::OnceLock::new();
221
222/// Get or initialize the global strand registry
223pub fn strand_registry() -> &'static StrandRegistry {
224    STRAND_REGISTRY.get_or_init(|| {
225        let size = std::env::var("SEQ_STRAND_REGISTRY_SIZE")
226            .ok()
227            .and_then(|s| s.parse().ok())
228            .unwrap_or(DEFAULT_REGISTRY_SIZE);
229        StrandRegistry::new(size)
230    })
231}
232
233/// Default coroutine stack size: 1MB (0x100000 bytes)
234/// Can be overridden via SEQ_STACK_SIZE environment variable
235const DEFAULT_STACK_SIZE: usize = 0x100000;
236
237/// Parse stack size from an optional string value.
238/// Returns the parsed size, or DEFAULT_STACK_SIZE if the value is missing, zero, or invalid.
239/// Prints a warning to stderr for invalid values.
240fn parse_stack_size(env_value: Option<String>) -> usize {
241    match env_value {
242        Some(val) => match val.parse::<usize>() {
243            Ok(0) => {
244                eprintln!(
245                    "Warning: SEQ_STACK_SIZE=0 is invalid, using default {}",
246                    DEFAULT_STACK_SIZE
247                );
248                DEFAULT_STACK_SIZE
249            }
250            Ok(size) => size,
251            Err(_) => {
252                eprintln!(
253                    "Warning: SEQ_STACK_SIZE='{}' is not a valid number, using default {}",
254                    val, DEFAULT_STACK_SIZE
255                );
256                DEFAULT_STACK_SIZE
257            }
258        },
259        None => DEFAULT_STACK_SIZE,
260    }
261}
262
263/// Initialize the scheduler
264///
265/// # Safety
266/// Safe to call multiple times (idempotent via Once).
267/// Configures May coroutines with appropriate stack size for LLVM-generated code.
268#[unsafe(no_mangle)]
269pub unsafe extern "C" fn patch_seq_scheduler_init() {
270    SCHEDULER_INIT.call_once(|| {
271        // Configure stack size for coroutines
272        // Default is 1MB, which is balanced between safety and May's maximum limit
273        // May has internal maximum (attempting 64MB causes ExceedsMaximumSize panic)
274        //
275        // Can be overridden via SEQ_STACK_SIZE environment variable (in bytes)
276        // Example: SEQ_STACK_SIZE=2097152 for 2MB
277        // Invalid values (non-numeric, zero) are warned and ignored.
278        let stack_size = parse_stack_size(std::env::var("SEQ_STACK_SIZE").ok());
279        may::config().set_stack_size(stack_size);
280
281        // Install SIGQUIT handler for runtime diagnostics (kill -3)
282        crate::diagnostics::install_signal_handler();
283
284        // Install watchdog timer (if enabled via SEQ_WATCHDOG_SECS)
285        crate::watchdog::install_watchdog();
286    });
287}
288
289/// Run the scheduler and wait for all coroutines to complete
290///
291/// # Safety
292/// Returns the final stack (always null for now since May handles all scheduling).
293/// This function blocks until all spawned strands have completed.
294///
295/// Uses a condition variable for event-driven shutdown synchronization rather than
296/// polling. The mutex is only held during the wait protocol, not during strand
297/// execution, so there's no contention on the hot path.
298#[unsafe(no_mangle)]
299pub unsafe extern "C" fn patch_seq_scheduler_run() -> Stack {
300    let mut guard = SHUTDOWN_MUTEX.lock().expect(
301        "scheduler_run: shutdown mutex poisoned - strand panicked during shutdown synchronization",
302    );
303
304    // Wait for all strands to complete
305    // The condition variable will be notified when the last strand exits
306    while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
307        guard = SHUTDOWN_CONDVAR
308            .wait(guard)
309            .expect("scheduler_run: condvar wait failed - strand panicked during shutdown wait");
310    }
311
312    // All strands have completed
313    std::ptr::null_mut()
314}
315
316/// Shutdown the scheduler
317///
318/// # Safety
319/// Safe to call. May doesn't require explicit shutdown, so this is a no-op.
320#[unsafe(no_mangle)]
321pub unsafe extern "C" fn patch_seq_scheduler_shutdown() {
322    // May doesn't require explicit shutdown
323    // This function exists for API symmetry with init
324}
325
326/// Spawn a strand (coroutine) with initial stack
327///
328/// # Safety
329/// - `entry` must be a valid function pointer that can safely execute on any thread
330/// - `initial_stack` must be either null or a valid pointer to a `StackNode` that:
331///   - Was heap-allocated (e.g., via Box)
332///   - Has a 'static lifetime or lives longer than the coroutine
333///   - Is safe to access from the spawned thread
334/// - The caller transfers ownership of `initial_stack` to the coroutine
335/// - Returns a unique strand ID (positive integer)
336///
337/// # Memory Management
338/// The spawned coroutine takes ownership of `initial_stack` and will automatically
339/// free the final stack returned by `entry` upon completion.
340#[unsafe(no_mangle)]
341pub unsafe extern "C" fn patch_seq_strand_spawn(
342    entry: extern "C" fn(Stack) -> Stack,
343    initial_stack: Stack,
344) -> i64 {
345    // Generate unique strand ID
346    let strand_id = NEXT_STRAND_ID.fetch_add(1, Ordering::Relaxed);
347
348    // Increment active strand counter and track total spawned
349    let new_count = ACTIVE_STRANDS.fetch_add(1, Ordering::Release) + 1;
350    TOTAL_SPAWNED.fetch_add(1, Ordering::Relaxed);
351
352    // Update peak strands if this is a new high-water mark
353    // Uses a CAS loop to safely update the maximum without locks
354    // Uses Acquire/Release ordering for proper synchronization with diagnostics reads
355    let mut peak = PEAK_STRANDS.load(Ordering::Acquire);
356    while new_count > peak {
357        match PEAK_STRANDS.compare_exchange_weak(
358            peak,
359            new_count,
360            Ordering::Release,
361            Ordering::Relaxed,
362        ) {
363            Ok(_) => break,
364            Err(current) => peak = current,
365        }
366    }
367
368    // Register strand in the registry (for diagnostics visibility)
369    // If registry is full, strand still runs but isn't tracked
370    let _ = strand_registry().register(strand_id);
371
372    // Function pointers are already Send, no wrapper needed
373    let entry_fn = entry;
374
375    // Convert pointer to usize (which is Send)
376    // This is necessary because *mut T is !Send, but the caller guarantees thread safety
377    let stack_addr = initial_stack as usize;
378
379    unsafe {
380        coroutine::spawn(move || {
381            // Reconstruct pointer from address
382            let stack_ptr = stack_addr as *mut StackNode;
383
384            // Debug assertion: validate stack pointer alignment and reasonable address
385            debug_assert!(
386                stack_ptr.is_null() || stack_addr.is_multiple_of(std::mem::align_of::<StackNode>()),
387                "Stack pointer must be null or properly aligned"
388            );
389            debug_assert!(
390                stack_ptr.is_null() || stack_addr > 0x1000,
391                "Stack pointer appears to be in invalid memory region (< 0x1000)"
392            );
393
394            // Execute the entry function
395            let final_stack = entry_fn(stack_ptr);
396
397            // Clean up the final stack to prevent memory leak
398            free_stack(final_stack);
399
400            // Unregister strand from registry (uses captured strand_id)
401            strand_registry().unregister(strand_id);
402
403            // Decrement active strand counter first, then track completion
404            // This ordering ensures the invariant SPAWNED = COMPLETED + ACTIVE + lost
405            // is never violated from an external observer's perspective
406            // Use AcqRel to establish proper synchronization (both acquire and release barriers)
407            let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
408
409            // Track completion after decrementing active count
410            TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
411            if prev_count == 1 {
412                // We were the last strand - acquire mutex and signal shutdown
413                // The mutex must be held when calling notify to prevent missed wakeups
414                let _guard = SHUTDOWN_MUTEX.lock()
415                    .expect("strand_spawn: shutdown mutex poisoned - strand panicked during shutdown notification");
416                SHUTDOWN_CONDVAR.notify_all();
417            }
418        });
419    }
420
421    strand_id as i64
422}
423
424/// Free a stack allocated by the runtime
425///
426/// # Safety
427/// - `stack` must be either:
428///   - A null pointer (safe, will be a no-op)
429///   - A valid pointer returned by runtime stack functions (push, etc.)
430/// - The pointer must not have been previously freed
431/// - After calling this function, the pointer is invalid and must not be used
432/// - This function takes ownership and returns nodes to the pool
433///
434/// # Performance
435/// Returns nodes to thread-local pool for reuse instead of freeing to heap
436fn free_stack(mut stack: Stack) {
437    if !stack.is_null() {
438        use crate::value::Value;
439        unsafe {
440            // Walk the stack and return each node to the pool
441            while !stack.is_null() {
442                let next = (*stack).next;
443                // Drop the value, then return node to pool
444                // We need to drop the value to free any heap allocations (String, Variant)
445                drop(std::mem::replace(&mut (*stack).value, Value::Int(0)));
446                // Return node to pool for reuse
447                pool::pool_free(stack);
448                stack = next;
449            }
450        }
451    }
452
453    // Reset the thread-local arena to free all arena-allocated strings
454    // This is safe because:
455    // - Any arena strings in Values have been dropped above
456    // - Global strings are unaffected (they have their own allocations)
457    // - Channel sends clone to global, so no cross-strand arena pointers
458    crate::arena::arena_reset();
459}
460
461/// Legacy spawn_strand function (kept for compatibility)
462///
463/// # Safety
464/// `entry` must be a valid function pointer that can safely execute on any thread.
465#[unsafe(no_mangle)]
466pub unsafe extern "C" fn patch_seq_spawn_strand(entry: extern "C" fn(Stack) -> Stack) {
467    unsafe {
468        patch_seq_strand_spawn(entry, std::ptr::null_mut());
469    }
470}
471
472/// Yield execution to allow other coroutines to run
473///
474/// # Safety
475/// Always safe to call from within a May coroutine.
476#[unsafe(no_mangle)]
477pub unsafe extern "C" fn patch_seq_yield_strand() {
478    coroutine::yield_now();
479}
480
481/// Wait for all strands to complete
482///
483/// # Safety
484/// Always safe to call. Blocks until all spawned strands have completed.
485///
486/// Uses event-driven synchronization via condition variable - no polling overhead.
487#[unsafe(no_mangle)]
488pub unsafe extern "C" fn patch_seq_wait_all_strands() {
489    let mut guard = SHUTDOWN_MUTEX.lock()
490        .expect("wait_all_strands: shutdown mutex poisoned - strand panicked during shutdown synchronization");
491
492    // Wait for all strands to complete
493    // The condition variable will be notified when the last strand exits
494    while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
495        guard = SHUTDOWN_CONDVAR
496            .wait(guard)
497            .expect("wait_all_strands: condvar wait failed - strand panicked during shutdown wait");
498    }
499}
500
501// Public re-exports with short names for internal use
502pub use patch_seq_scheduler_init as scheduler_init;
503pub use patch_seq_scheduler_run as scheduler_run;
504pub use patch_seq_scheduler_shutdown as scheduler_shutdown;
505pub use patch_seq_spawn_strand as spawn_strand;
506pub use patch_seq_strand_spawn as strand_spawn;
507pub use patch_seq_wait_all_strands as wait_all_strands;
508pub use patch_seq_yield_strand as yield_strand;
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use crate::stack::push;
514    use crate::value::Value;
515    use std::sync::atomic::{AtomicU32, Ordering};
516
517    #[test]
518    fn test_spawn_strand() {
519        unsafe {
520            static COUNTER: AtomicU32 = AtomicU32::new(0);
521
522            extern "C" fn test_entry(_stack: Stack) -> Stack {
523                COUNTER.fetch_add(1, Ordering::SeqCst);
524                std::ptr::null_mut()
525            }
526
527            for _ in 0..100 {
528                spawn_strand(test_entry);
529            }
530
531            std::thread::sleep(std::time::Duration::from_millis(200));
532            assert_eq!(COUNTER.load(Ordering::SeqCst), 100);
533        }
534    }
535
536    #[test]
537    fn test_scheduler_init_idempotent() {
538        unsafe {
539            // Should be safe to call multiple times
540            scheduler_init();
541            scheduler_init();
542            scheduler_init();
543        }
544    }
545
546    #[test]
547    fn test_free_stack_null() {
548        // Freeing null should be a no-op
549        free_stack(std::ptr::null_mut());
550    }
551
552    #[test]
553    fn test_free_stack_valid() {
554        unsafe {
555            // Create a stack, then free it
556            let stack = push(std::ptr::null_mut(), Value::Int(42));
557            free_stack(stack);
558            // If we get here without crashing, test passed
559        }
560    }
561
562    #[test]
563    fn test_strand_spawn_with_stack() {
564        unsafe {
565            static COUNTER: AtomicU32 = AtomicU32::new(0);
566
567            extern "C" fn test_entry(stack: Stack) -> Stack {
568                COUNTER.fetch_add(1, Ordering::SeqCst);
569                // Return the stack as-is (caller will free it)
570                stack
571            }
572
573            let initial_stack = push(std::ptr::null_mut(), Value::Int(99));
574            strand_spawn(test_entry, initial_stack);
575
576            std::thread::sleep(std::time::Duration::from_millis(200));
577            assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
578        }
579    }
580
581    #[test]
582    fn test_scheduler_shutdown() {
583        unsafe {
584            scheduler_init();
585            scheduler_shutdown();
586            // Should not crash
587        }
588    }
589
590    #[test]
591    fn test_many_strands_stress() {
592        unsafe {
593            static COUNTER: AtomicU32 = AtomicU32::new(0);
594
595            extern "C" fn increment(_stack: Stack) -> Stack {
596                COUNTER.fetch_add(1, Ordering::SeqCst);
597                std::ptr::null_mut()
598            }
599
600            // Reset counter for this test
601            COUNTER.store(0, Ordering::SeqCst);
602
603            // Spawn many strands to stress test synchronization
604            for _ in 0..1000 {
605                strand_spawn(increment, std::ptr::null_mut());
606            }
607
608            // Wait for all to complete
609            wait_all_strands();
610
611            // Verify all strands executed
612            assert_eq!(COUNTER.load(Ordering::SeqCst), 1000);
613        }
614    }
615
616    #[test]
617    fn test_strand_ids_are_unique() {
618        unsafe {
619            use std::collections::HashSet;
620
621            extern "C" fn noop(_stack: Stack) -> Stack {
622                std::ptr::null_mut()
623            }
624
625            // Spawn strands and collect their IDs
626            let mut ids = Vec::new();
627            for _ in 0..100 {
628                let id = strand_spawn(noop, std::ptr::null_mut());
629                ids.push(id);
630            }
631
632            // Wait for completion
633            wait_all_strands();
634
635            // Verify all IDs are unique
636            let unique_ids: HashSet<_> = ids.iter().collect();
637            assert_eq!(unique_ids.len(), 100, "All strand IDs should be unique");
638
639            // Verify all IDs are positive
640            assert!(
641                ids.iter().all(|&id| id > 0),
642                "All strand IDs should be positive"
643            );
644        }
645    }
646
647    #[test]
648    fn test_arena_reset_with_strands() {
649        unsafe {
650            use crate::arena;
651            use crate::seqstring::arena_string;
652
653            extern "C" fn create_temp_strings(stack: Stack) -> Stack {
654                // Create many temporary arena strings (simulating request parsing)
655                for i in 0..100 {
656                    let temp = arena_string(&format!("temporary string {}", i));
657                    // Use the string temporarily
658                    assert!(!temp.as_str().is_empty());
659                    // String is dropped, but memory stays in arena
660                }
661
662                // Arena should have allocated memory
663                let stats = arena::arena_stats();
664                assert!(stats.allocated_bytes > 0, "Arena should have allocations");
665
666                stack // Return empty stack
667            }
668
669            // Reset arena before test
670            arena::arena_reset();
671
672            // Spawn strand that creates many temp strings
673            strand_spawn(create_temp_strings, std::ptr::null_mut());
674
675            // Wait for strand to complete (which calls free_stack -> arena_reset)
676            wait_all_strands();
677
678            // After strand exits, arena should be reset
679            let stats_after = arena::arena_stats();
680            assert_eq!(
681                stats_after.allocated_bytes, 0,
682                "Arena should be reset after strand exits"
683            );
684        }
685    }
686
687    #[test]
688    fn test_arena_with_channel_send() {
689        unsafe {
690            use crate::channel::{close_channel, make_channel};
691            use crate::stack::{pop, push};
692            use crate::value::Value;
693            use std::sync::atomic::{AtomicU32, Ordering};
694
695            static RECEIVED_COUNT: AtomicU32 = AtomicU32::new(0);
696
697            // Create channel
698            let stack = std::ptr::null_mut();
699            let stack = make_channel(stack);
700            let (stack, chan_val) = pop(stack);
701            let chan_id = match chan_val {
702                Value::Int(id) => id,
703                _ => panic!("Expected channel ID"),
704            };
705
706            // Sender strand: creates arena string, sends through channel
707            extern "C" fn sender(stack: Stack) -> Stack {
708                use crate::channel::send;
709                use crate::seqstring::arena_string;
710                use crate::stack::{pop, push};
711                use crate::value::Value;
712
713                unsafe {
714                    // Extract channel ID from stack
715                    let (stack, chan_val) = pop(stack);
716                    let chan_id = match chan_val {
717                        Value::Int(id) => id,
718                        _ => panic!("Expected channel ID"),
719                    };
720
721                    // Create arena string
722                    let msg = arena_string("Hello from sender!");
723
724                    // Push string and channel ID for send
725                    let stack = push(stack, Value::String(msg));
726                    let stack = push(stack, Value::Int(chan_id));
727
728                    // Send (will clone to global)
729                    send(stack)
730                }
731            }
732
733            // Receiver strand: receives string from channel
734            extern "C" fn receiver(stack: Stack) -> Stack {
735                use crate::channel::receive;
736                use crate::stack::{pop, push};
737                use crate::value::Value;
738                use std::sync::atomic::Ordering;
739
740                unsafe {
741                    // Extract channel ID from stack
742                    let (stack, chan_val) = pop(stack);
743                    let chan_id = match chan_val {
744                        Value::Int(id) => id,
745                        _ => panic!("Expected channel ID"),
746                    };
747
748                    // Push channel ID for receive
749                    let stack = push(stack, Value::Int(chan_id));
750
751                    // Receive message
752                    let stack = receive(stack);
753
754                    // Pop and verify message
755                    let (stack, msg_val) = pop(stack);
756                    match msg_val {
757                        Value::String(s) => {
758                            assert_eq!(s.as_str(), "Hello from sender!");
759                            RECEIVED_COUNT.fetch_add(1, Ordering::SeqCst);
760                        }
761                        _ => panic!("Expected String"),
762                    }
763
764                    stack
765                }
766            }
767
768            // Spawn sender and receiver
769            let sender_stack = push(std::ptr::null_mut(), Value::Int(chan_id));
770            strand_spawn(sender, sender_stack);
771
772            let receiver_stack = push(std::ptr::null_mut(), Value::Int(chan_id));
773            strand_spawn(receiver, receiver_stack);
774
775            // Wait for both strands
776            wait_all_strands();
777
778            // Verify message was received
779            assert_eq!(
780                RECEIVED_COUNT.load(Ordering::SeqCst),
781                1,
782                "Receiver should have received message"
783            );
784
785            // Clean up channel
786            let stack = push(stack, Value::Int(chan_id));
787            close_channel(stack);
788        }
789    }
790
791    #[test]
792    fn test_no_memory_leak_over_many_iterations() {
793        // PR #11 feedback: Verify 10K+ strand iterations don't cause memory growth
794        unsafe {
795            use crate::arena;
796            use crate::seqstring::arena_string;
797
798            extern "C" fn allocate_strings_and_exit(stack: Stack) -> Stack {
799                // Simulate request processing: many temp allocations
800                for i in 0..50 {
801                    let temp = arena_string(&format!("request header {}", i));
802                    assert!(!temp.as_str().is_empty());
803                    // Strings dropped here but arena memory stays allocated
804                }
805                stack
806            }
807
808            // Run many iterations to detect leaks
809            let iterations = 10_000;
810
811            for i in 0..iterations {
812                // Reset arena before each iteration to start fresh
813                arena::arena_reset();
814
815                // Spawn strand, let it allocate strings, then exit
816                strand_spawn(allocate_strings_and_exit, std::ptr::null_mut());
817
818                // Wait for completion (triggers arena reset)
819                wait_all_strands();
820
821                // Every 1000 iterations, verify arena is actually reset
822                if i % 1000 == 0 {
823                    let stats = arena::arena_stats();
824                    assert_eq!(
825                        stats.allocated_bytes, 0,
826                        "Arena not reset after iteration {} (leaked {} bytes)",
827                        i, stats.allocated_bytes
828                    );
829                }
830            }
831
832            // Final verification: arena should be empty
833            let final_stats = arena::arena_stats();
834            assert_eq!(
835                final_stats.allocated_bytes, 0,
836                "Arena leaked memory after {} iterations ({} bytes)",
837                iterations, final_stats.allocated_bytes
838            );
839
840            println!(
841                "✓ Memory leak test passed: {} iterations with no growth",
842                iterations
843            );
844        }
845    }
846
847    #[test]
848    fn test_parse_stack_size_valid() {
849        assert_eq!(parse_stack_size(Some("2097152".to_string())), 2097152);
850        assert_eq!(parse_stack_size(Some("1".to_string())), 1);
851        assert_eq!(parse_stack_size(Some("999999999".to_string())), 999999999);
852    }
853
854    #[test]
855    fn test_parse_stack_size_none() {
856        assert_eq!(parse_stack_size(None), DEFAULT_STACK_SIZE);
857    }
858
859    #[test]
860    fn test_parse_stack_size_zero() {
861        // Zero should fall back to default (with warning printed to stderr)
862        assert_eq!(parse_stack_size(Some("0".to_string())), DEFAULT_STACK_SIZE);
863    }
864
865    #[test]
866    fn test_parse_stack_size_invalid() {
867        // Non-numeric should fall back to default (with warning printed to stderr)
868        assert_eq!(
869            parse_stack_size(Some("invalid".to_string())),
870            DEFAULT_STACK_SIZE
871        );
872        assert_eq!(
873            parse_stack_size(Some("-100".to_string())),
874            DEFAULT_STACK_SIZE
875        );
876        assert_eq!(parse_stack_size(Some("".to_string())), DEFAULT_STACK_SIZE);
877        assert_eq!(
878            parse_stack_size(Some("1.5".to_string())),
879            DEFAULT_STACK_SIZE
880        );
881    }
882
883    #[test]
884    fn test_strand_registry_basic() {
885        let registry = StrandRegistry::new(10);
886
887        // Register some strands
888        assert_eq!(registry.register(1), Some(0)); // First slot
889        assert_eq!(registry.register(2), Some(1)); // Second slot
890        assert_eq!(registry.register(3), Some(2)); // Third slot
891
892        // Verify active strands
893        let active: Vec<_> = registry.active_strands().collect();
894        assert_eq!(active.len(), 3);
895
896        // Unregister one
897        assert!(registry.unregister(2));
898        let active: Vec<_> = registry.active_strands().collect();
899        assert_eq!(active.len(), 2);
900
901        // Unregister non-existent should return false
902        assert!(!registry.unregister(999));
903    }
904
905    #[test]
906    fn test_strand_registry_overflow() {
907        let registry = StrandRegistry::new(3); // Small capacity
908
909        // Fill it up
910        assert!(registry.register(1).is_some());
911        assert!(registry.register(2).is_some());
912        assert!(registry.register(3).is_some());
913
914        // Next should overflow
915        assert!(registry.register(4).is_none());
916        assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 1);
917
918        // Another overflow
919        assert!(registry.register(5).is_none());
920        assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 2);
921    }
922
923    #[test]
924    fn test_strand_registry_slot_reuse() {
925        let registry = StrandRegistry::new(3);
926
927        // Fill it up
928        registry.register(1);
929        registry.register(2);
930        registry.register(3);
931
932        // Unregister middle one
933        registry.unregister(2);
934
935        // New registration should reuse the slot
936        assert!(registry.register(4).is_some());
937        assert_eq!(registry.active_strands().count(), 3);
938    }
939
940    #[test]
941    fn test_strand_registry_concurrent_stress() {
942        use std::sync::Arc;
943        use std::thread;
944
945        let registry = Arc::new(StrandRegistry::new(50)); // Moderate capacity
946
947        let handles: Vec<_> = (0..100)
948            .map(|i| {
949                let reg = Arc::clone(&registry);
950                thread::spawn(move || {
951                    let id = (i + 1) as u64;
952                    // Register
953                    let _ = reg.register(id);
954                    // Brief work
955                    thread::yield_now();
956                    // Unregister
957                    reg.unregister(id);
958                })
959            })
960            .collect();
961
962        for h in handles {
963            h.join().unwrap();
964        }
965
966        // All slots should be free after all threads complete
967        assert_eq!(registry.active_strands().count(), 0);
968    }
969
970    #[test]
971    fn test_strand_lifecycle_counters() {
972        unsafe {
973            // Reset counters for isolation (not perfect but helps)
974            let initial_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
975            let initial_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
976
977            static COUNTER: AtomicU32 = AtomicU32::new(0);
978
979            extern "C" fn simple_work(_stack: Stack) -> Stack {
980                COUNTER.fetch_add(1, Ordering::SeqCst);
981                std::ptr::null_mut()
982            }
983
984            COUNTER.store(0, Ordering::SeqCst);
985
986            // Spawn some strands
987            for _ in 0..10 {
988                strand_spawn(simple_work, std::ptr::null_mut());
989            }
990
991            wait_all_strands();
992
993            // Verify counters incremented
994            let final_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
995            let final_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
996
997            assert!(
998                final_spawned >= initial_spawned + 10,
999                "TOTAL_SPAWNED should have increased by at least 10"
1000            );
1001            assert!(
1002                final_completed >= initial_completed + 10,
1003                "TOTAL_COMPLETED should have increased by at least 10"
1004            );
1005            assert_eq!(COUNTER.load(Ordering::SeqCst), 10);
1006        }
1007    }
1008}