seq_runtime/scheduler.rs
1//! Scheduler - Green Thread Management with May
2//!
3//! CSP-style concurrency for Seq using May coroutines.
4//! Each strand is a lightweight green thread that can communicate via channels.
5//!
6//! ## Non-Blocking Guarantee
7//!
8//! Channel operations (`send`, `receive`) use May's cooperative blocking and NEVER
9//! block OS threads. However, I/O operations (`write_line`, `read_line` in io.rs)
10//! currently use blocking syscalls. Future work will make all I/O non-blocking.
11//!
12//! ## Panic Behavior
13//!
14//! Functions panic on invalid input (null stacks, negative IDs, closed channels).
15//! In a production system, consider implementing error channels or Result-based
16//! error handling instead of panicking.
17
18use crate::pool;
19use crate::stack::{Stack, StackNode};
20use may::coroutine;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::{Condvar, Mutex, Once};
23
24static SCHEDULER_INIT: Once = Once::new();
25
26// Strand lifecycle tracking
27//
28// Design rationale:
29// - ACTIVE_STRANDS: Lock-free atomic counter for the hot path (spawn/complete)
30// Every strand increments on spawn, decrements on complete. This is extremely
31// fast (lock-free atomic ops) and suitable for high-frequency operations.
32//
33// - SHUTDOWN_CONDVAR/MUTEX: Event-driven synchronization for the cold path (shutdown wait)
34// Used only when waiting for all strands to complete (program shutdown).
35// Condvar provides event-driven wakeup instead of polling, which is critical
36// for a systems language - no CPU waste, proper OS-level blocking.
37//
38// Why not track JoinHandles?
39// Strands are like Erlang processes - potentially hundreds of thousands of concurrent
40// entities with independent lifecycles. Storing handles would require global mutable
41// state with synchronization overhead on the hot path. The counter + condvar approach
42// keeps the hot path lock-free while providing proper shutdown synchronization.
43pub static ACTIVE_STRANDS: AtomicUsize = AtomicUsize::new(0);
44static SHUTDOWN_CONDVAR: Condvar = Condvar::new();
45static SHUTDOWN_MUTEX: Mutex<()> = Mutex::new(());
46
47// Strand lifecycle statistics (for diagnostics)
48//
49// These counters provide observability into strand lifecycle without any locking.
50// All operations are lock-free atomic increments/loads.
51//
52// - TOTAL_SPAWNED: Monotonically increasing count of all strands ever spawned
53// - TOTAL_COMPLETED: Monotonically increasing count of all strands that completed
54// - PEAK_STRANDS: High-water mark of concurrent strands (helps detect strand leaks)
55//
56// Useful diagnostics:
57// - Currently running: ACTIVE_STRANDS
58// - Completed successfully: TOTAL_COMPLETED
59// - Potential leaks: TOTAL_SPAWNED - TOTAL_COMPLETED - ACTIVE_STRANDS > 0 (strands lost)
60// - Peak concurrency: PEAK_STRANDS
61pub static TOTAL_SPAWNED: AtomicU64 = AtomicU64::new(0);
62pub static TOTAL_COMPLETED: AtomicU64 = AtomicU64::new(0);
63pub static PEAK_STRANDS: AtomicUsize = AtomicUsize::new(0);
64
65// Unique strand ID generation
66static NEXT_STRAND_ID: AtomicU64 = AtomicU64::new(1);
67
68// =============================================================================
69// Lock-Free Strand Registry
70// =============================================================================
71//
72// A fixed-size array of slots for tracking active strands without locks.
73// Each slot stores a strand ID (0 = free) and spawn timestamp.
74//
75// Design principles:
76// - Fixed size: No dynamic allocation, predictable memory footprint
77// - Lock-free: All operations use atomic CAS, no mutex contention
78// - Bounded: If registry is full, strands still run but aren't tracked
79// - Zero cost when not querying: Only diagnostics reads the registry
80//
81// Slot encoding:
82// - strand_id == 0: slot is free
83// - strand_id > 0: slot contains an active strand
84//
85// The registry size can be configured via SEQ_STRAND_REGISTRY_SIZE env var.
86// Default is 1024 slots, which is sufficient for most applications.
87
88/// Default strand registry size (number of trackable concurrent strands)
89const DEFAULT_REGISTRY_SIZE: usize = 1024;
90
91/// A slot in the strand registry
92///
93/// Uses two atomics to store strand info without locks.
94/// A slot is free when strand_id == 0.
95pub struct StrandSlot {
96 /// Strand ID (0 = free, >0 = active strand)
97 pub strand_id: AtomicU64,
98 /// Spawn timestamp (seconds since UNIX epoch, for detecting stuck strands)
99 pub spawn_time: AtomicU64,
100}
101
102impl StrandSlot {
103 const fn new() -> Self {
104 Self {
105 strand_id: AtomicU64::new(0),
106 spawn_time: AtomicU64::new(0),
107 }
108 }
109}
110
111/// Lock-free strand registry
112///
113/// Provides O(n) registration (scan for free slot) and O(n) unregistration.
114/// This is acceptable because:
115/// 1. N is bounded (default 1024)
116/// 2. Registration/unregistration are infrequent compared to strand work
117/// 3. No locks means no contention, just atomic ops
118pub struct StrandRegistry {
119 slots: Box<[StrandSlot]>,
120 /// Number of slots that couldn't be registered (registry full)
121 pub overflow_count: AtomicU64,
122}
123
124impl StrandRegistry {
125 /// Create a new registry with the given capacity
126 fn new(capacity: usize) -> Self {
127 let mut slots = Vec::with_capacity(capacity);
128 for _ in 0..capacity {
129 slots.push(StrandSlot::new());
130 }
131 Self {
132 slots: slots.into_boxed_slice(),
133 overflow_count: AtomicU64::new(0),
134 }
135 }
136
137 /// Register a strand, returning the slot index if successful
138 ///
139 /// Uses CAS to atomically claim a free slot.
140 /// Returns None if the registry is full (strand still runs, just not tracked).
141 pub fn register(&self, strand_id: u64) -> Option<usize> {
142 let spawn_time = std::time::SystemTime::now()
143 .duration_since(std::time::UNIX_EPOCH)
144 .map(|d| d.as_secs())
145 .unwrap_or(0);
146
147 // Scan for a free slot
148 for (idx, slot) in self.slots.iter().enumerate() {
149 // Set spawn time first, before claiming the slot
150 // This prevents a race where a reader sees strand_id != 0 but spawn_time == 0
151 // If we fail to claim the slot, the owner will overwrite this value anyway
152 slot.spawn_time.store(spawn_time, Ordering::Relaxed);
153
154 // Try to claim this slot (CAS from 0 to strand_id)
155 // AcqRel ensures the spawn_time write above is visible before strand_id becomes non-zero
156 if slot
157 .strand_id
158 .compare_exchange(0, strand_id, Ordering::AcqRel, Ordering::Relaxed)
159 .is_ok()
160 {
161 return Some(idx);
162 }
163 }
164
165 // Registry full - track overflow but strand still runs
166 self.overflow_count.fetch_add(1, Ordering::Relaxed);
167 None
168 }
169
170 /// Unregister a strand by ID
171 ///
172 /// Scans for the slot containing this strand ID and clears it.
173 /// Returns true if found and cleared, false if not found.
174 ///
175 /// Note: ABA problem is not a concern here because strand IDs are monotonically
176 /// increasing u64 values. ID reuse would require 2^64 spawns, which is practically
177 /// impossible (at 1 billion spawns/sec, it would take ~584 years).
178 pub fn unregister(&self, strand_id: u64) -> bool {
179 for slot in self.slots.iter() {
180 // Check if this slot contains our strand
181 if slot
182 .strand_id
183 .compare_exchange(strand_id, 0, Ordering::AcqRel, Ordering::Relaxed)
184 .is_ok()
185 {
186 // Successfully cleared the slot
187 slot.spawn_time.store(0, Ordering::Release);
188 return true;
189 }
190 }
191 false
192 }
193
194 /// Iterate over active strands (for diagnostics)
195 ///
196 /// Returns an iterator of (strand_id, spawn_time) for non-empty slots.
197 /// Note: This is a snapshot and may be slightly inconsistent due to concurrent updates.
198 pub fn active_strands(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
199 self.slots.iter().filter_map(|slot| {
200 // Acquire on strand_id synchronizes with the Release in register()
201 let id = slot.strand_id.load(Ordering::Acquire);
202 if id > 0 {
203 // Relaxed is sufficient here - we've already synchronized via strand_id Acquire
204 // and spawn_time is written before strand_id in register()
205 let time = slot.spawn_time.load(Ordering::Relaxed);
206 Some((id, time))
207 } else {
208 None
209 }
210 })
211 }
212
213 /// Get the registry capacity
214 pub fn capacity(&self) -> usize {
215 self.slots.len()
216 }
217}
218
219// Global strand registry (lazy initialized)
220static STRAND_REGISTRY: std::sync::OnceLock<StrandRegistry> = std::sync::OnceLock::new();
221
222/// Get or initialize the global strand registry
223pub fn strand_registry() -> &'static StrandRegistry {
224 STRAND_REGISTRY.get_or_init(|| {
225 let size = std::env::var("SEQ_STRAND_REGISTRY_SIZE")
226 .ok()
227 .and_then(|s| s.parse().ok())
228 .unwrap_or(DEFAULT_REGISTRY_SIZE);
229 StrandRegistry::new(size)
230 })
231}
232
233/// Default coroutine stack size: 128KB (0x20000 bytes)
234/// Reduced from 1MB for better spawn performance (~16% faster in benchmarks).
235/// Can be overridden via SEQ_STACK_SIZE environment variable.
236const DEFAULT_STACK_SIZE: usize = 0x20000;
237
238/// Parse stack size from an optional string value.
239/// Returns the parsed size, or DEFAULT_STACK_SIZE if the value is missing, zero, or invalid.
240/// Prints a warning to stderr for invalid values.
241fn parse_stack_size(env_value: Option<String>) -> usize {
242 match env_value {
243 Some(val) => match val.parse::<usize>() {
244 Ok(0) => {
245 eprintln!(
246 "Warning: SEQ_STACK_SIZE=0 is invalid, using default {}",
247 DEFAULT_STACK_SIZE
248 );
249 DEFAULT_STACK_SIZE
250 }
251 Ok(size) => size,
252 Err(_) => {
253 eprintln!(
254 "Warning: SEQ_STACK_SIZE='{}' is not a valid number, using default {}",
255 val, DEFAULT_STACK_SIZE
256 );
257 DEFAULT_STACK_SIZE
258 }
259 },
260 None => DEFAULT_STACK_SIZE,
261 }
262}
263
264/// Initialize the scheduler
265///
266/// # Safety
267/// Safe to call multiple times (idempotent via Once).
268/// Configures May coroutines with appropriate stack size for LLVM-generated code.
269#[unsafe(no_mangle)]
270pub unsafe extern "C" fn patch_seq_scheduler_init() {
271 SCHEDULER_INIT.call_once(|| {
272 // Configure stack size for coroutines
273 // Default is 1MB, which is balanced between safety and May's maximum limit
274 // May has internal maximum (attempting 64MB causes ExceedsMaximumSize panic)
275 //
276 // Can be overridden via SEQ_STACK_SIZE environment variable (in bytes)
277 // Example: SEQ_STACK_SIZE=2097152 for 2MB
278 // Invalid values (non-numeric, zero) are warned and ignored.
279 let stack_size = parse_stack_size(std::env::var("SEQ_STACK_SIZE").ok());
280 may::config().set_stack_size(stack_size);
281
282 // Install SIGQUIT handler for runtime diagnostics (kill -3)
283 crate::diagnostics::install_signal_handler();
284
285 // Install watchdog timer (if enabled via SEQ_WATCHDOG_SECS)
286 crate::watchdog::install_watchdog();
287 });
288}
289
290/// Run the scheduler and wait for all coroutines to complete
291///
292/// # Safety
293/// Returns the final stack (always null for now since May handles all scheduling).
294/// This function blocks until all spawned strands have completed.
295///
296/// Uses a condition variable for event-driven shutdown synchronization rather than
297/// polling. The mutex is only held during the wait protocol, not during strand
298/// execution, so there's no contention on the hot path.
299#[unsafe(no_mangle)]
300pub unsafe extern "C" fn patch_seq_scheduler_run() -> Stack {
301 let mut guard = SHUTDOWN_MUTEX.lock().expect(
302 "scheduler_run: shutdown mutex poisoned - strand panicked during shutdown synchronization",
303 );
304
305 // Wait for all strands to complete
306 // The condition variable will be notified when the last strand exits
307 while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
308 guard = SHUTDOWN_CONDVAR
309 .wait(guard)
310 .expect("scheduler_run: condvar wait failed - strand panicked during shutdown wait");
311 }
312
313 // All strands have completed
314 std::ptr::null_mut()
315}
316
317/// Shutdown the scheduler
318///
319/// # Safety
320/// Safe to call. May doesn't require explicit shutdown, so this is a no-op.
321#[unsafe(no_mangle)]
322pub unsafe extern "C" fn patch_seq_scheduler_shutdown() {
323 // May doesn't require explicit shutdown
324 // This function exists for API symmetry with init
325}
326
327/// Spawn a strand (coroutine) with initial stack
328///
329/// # Safety
330/// - `entry` must be a valid function pointer that can safely execute on any thread
331/// - `initial_stack` must be either null or a valid pointer to a `StackNode` that:
332/// - Was heap-allocated (e.g., via Box)
333/// - Has a 'static lifetime or lives longer than the coroutine
334/// - Is safe to access from the spawned thread
335/// - The caller transfers ownership of `initial_stack` to the coroutine
336/// - Returns a unique strand ID (positive integer)
337///
338/// # Memory Management
339/// The spawned coroutine takes ownership of `initial_stack` and will automatically
340/// free the final stack returned by `entry` upon completion.
341#[unsafe(no_mangle)]
342pub unsafe extern "C" fn patch_seq_strand_spawn(
343 entry: extern "C" fn(Stack) -> Stack,
344 initial_stack: Stack,
345) -> i64 {
346 // Generate unique strand ID
347 let strand_id = NEXT_STRAND_ID.fetch_add(1, Ordering::Relaxed);
348
349 // Increment active strand counter and track total spawned
350 let new_count = ACTIVE_STRANDS.fetch_add(1, Ordering::Release) + 1;
351 TOTAL_SPAWNED.fetch_add(1, Ordering::Relaxed);
352
353 // Update peak strands if this is a new high-water mark
354 // Uses a CAS loop to safely update the maximum without locks
355 // Uses Acquire/Release ordering for proper synchronization with diagnostics reads
356 let mut peak = PEAK_STRANDS.load(Ordering::Acquire);
357 while new_count > peak {
358 match PEAK_STRANDS.compare_exchange_weak(
359 peak,
360 new_count,
361 Ordering::Release,
362 Ordering::Relaxed,
363 ) {
364 Ok(_) => break,
365 Err(current) => peak = current,
366 }
367 }
368
369 // Register strand in the registry (for diagnostics visibility)
370 // If registry is full, strand still runs but isn't tracked
371 let _ = strand_registry().register(strand_id);
372
373 // Function pointers are already Send, no wrapper needed
374 let entry_fn = entry;
375
376 // Convert pointer to usize (which is Send)
377 // This is necessary because *mut T is !Send, but the caller guarantees thread safety
378 let stack_addr = initial_stack as usize;
379
380 unsafe {
381 coroutine::spawn(move || {
382 // Reconstruct pointer from address
383 let stack_ptr = stack_addr as *mut StackNode;
384
385 // Debug assertion: validate stack pointer alignment and reasonable address
386 debug_assert!(
387 stack_ptr.is_null() || stack_addr.is_multiple_of(std::mem::align_of::<StackNode>()),
388 "Stack pointer must be null or properly aligned"
389 );
390 debug_assert!(
391 stack_ptr.is_null() || stack_addr > 0x1000,
392 "Stack pointer appears to be in invalid memory region (< 0x1000)"
393 );
394
395 // Execute the entry function
396 let final_stack = entry_fn(stack_ptr);
397
398 // Clean up the final stack to prevent memory leak
399 free_stack(final_stack);
400
401 // Unregister strand from registry (uses captured strand_id)
402 strand_registry().unregister(strand_id);
403
404 // Decrement active strand counter first, then track completion
405 // This ordering ensures the invariant SPAWNED = COMPLETED + ACTIVE + lost
406 // is never violated from an external observer's perspective
407 // Use AcqRel to establish proper synchronization (both acquire and release barriers)
408 let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
409
410 // Track completion after decrementing active count
411 TOTAL_COMPLETED.fetch_add(1, Ordering::Release);
412 if prev_count == 1 {
413 // We were the last strand - acquire mutex and signal shutdown
414 // The mutex must be held when calling notify to prevent missed wakeups
415 let _guard = SHUTDOWN_MUTEX.lock()
416 .expect("strand_spawn: shutdown mutex poisoned - strand panicked during shutdown notification");
417 SHUTDOWN_CONDVAR.notify_all();
418 }
419 });
420 }
421
422 strand_id as i64
423}
424
425/// Free a stack allocated by the runtime
426///
427/// # Safety
428/// - `stack` must be either:
429/// - A null pointer (safe, will be a no-op)
430/// - A valid pointer returned by runtime stack functions (push, etc.)
431/// - The pointer must not have been previously freed
432/// - After calling this function, the pointer is invalid and must not be used
433/// - This function takes ownership and returns nodes to the pool
434///
435/// # Performance
436/// Returns nodes to thread-local pool for reuse instead of freeing to heap
437fn free_stack(mut stack: Stack) {
438 if !stack.is_null() {
439 use crate::value::Value;
440 unsafe {
441 // Walk the stack and return each node to the pool
442 while !stack.is_null() {
443 let next = (*stack).next;
444 // Drop the value, then return node to pool
445 // We need to drop the value to free any heap allocations (String, Variant)
446 drop(std::mem::replace(&mut (*stack).value, Value::Int(0)));
447 // Return node to pool for reuse
448 pool::pool_free(stack);
449 stack = next;
450 }
451 }
452 }
453
454 // Reset the thread-local arena to free all arena-allocated strings
455 // This is safe because:
456 // - Any arena strings in Values have been dropped above
457 // - Global strings are unaffected (they have their own allocations)
458 // - Channel sends clone to global, so no cross-strand arena pointers
459 crate::arena::arena_reset();
460}
461
462/// Legacy spawn_strand function (kept for compatibility)
463///
464/// # Safety
465/// `entry` must be a valid function pointer that can safely execute on any thread.
466#[unsafe(no_mangle)]
467pub unsafe extern "C" fn patch_seq_spawn_strand(entry: extern "C" fn(Stack) -> Stack) {
468 unsafe {
469 patch_seq_strand_spawn(entry, std::ptr::null_mut());
470 }
471}
472
473/// Yield execution to allow other coroutines to run
474///
475/// # Safety
476/// Always safe to call from within a May coroutine.
477#[unsafe(no_mangle)]
478pub unsafe extern "C" fn patch_seq_yield_strand(stack: Stack) -> Stack {
479 coroutine::yield_now();
480 stack
481}
482
483/// Wait for all strands to complete
484///
485/// # Safety
486/// Always safe to call. Blocks until all spawned strands have completed.
487///
488/// Uses event-driven synchronization via condition variable - no polling overhead.
489#[unsafe(no_mangle)]
490pub unsafe extern "C" fn patch_seq_wait_all_strands() {
491 let mut guard = SHUTDOWN_MUTEX.lock()
492 .expect("wait_all_strands: shutdown mutex poisoned - strand panicked during shutdown synchronization");
493
494 // Wait for all strands to complete
495 // The condition variable will be notified when the last strand exits
496 while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
497 guard = SHUTDOWN_CONDVAR
498 .wait(guard)
499 .expect("wait_all_strands: condvar wait failed - strand panicked during shutdown wait");
500 }
501}
502
503// Public re-exports with short names for internal use
504pub use patch_seq_scheduler_init as scheduler_init;
505pub use patch_seq_scheduler_run as scheduler_run;
506pub use patch_seq_scheduler_shutdown as scheduler_shutdown;
507pub use patch_seq_spawn_strand as spawn_strand;
508pub use patch_seq_strand_spawn as strand_spawn;
509pub use patch_seq_wait_all_strands as wait_all_strands;
510pub use patch_seq_yield_strand as yield_strand;
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515 use crate::stack::push;
516 use crate::value::Value;
517 use std::sync::atomic::{AtomicU32, Ordering};
518
519 #[test]
520 fn test_spawn_strand() {
521 unsafe {
522 static COUNTER: AtomicU32 = AtomicU32::new(0);
523
524 extern "C" fn test_entry(_stack: Stack) -> Stack {
525 COUNTER.fetch_add(1, Ordering::SeqCst);
526 std::ptr::null_mut()
527 }
528
529 for _ in 0..100 {
530 spawn_strand(test_entry);
531 }
532
533 std::thread::sleep(std::time::Duration::from_millis(200));
534 assert_eq!(COUNTER.load(Ordering::SeqCst), 100);
535 }
536 }
537
538 #[test]
539 fn test_scheduler_init_idempotent() {
540 unsafe {
541 // Should be safe to call multiple times
542 scheduler_init();
543 scheduler_init();
544 scheduler_init();
545 }
546 }
547
548 #[test]
549 fn test_free_stack_null() {
550 // Freeing null should be a no-op
551 free_stack(std::ptr::null_mut());
552 }
553
554 #[test]
555 fn test_free_stack_valid() {
556 unsafe {
557 // Create a stack, then free it
558 let stack = push(std::ptr::null_mut(), Value::Int(42));
559 free_stack(stack);
560 // If we get here without crashing, test passed
561 }
562 }
563
564 #[test]
565 fn test_strand_spawn_with_stack() {
566 unsafe {
567 static COUNTER: AtomicU32 = AtomicU32::new(0);
568
569 extern "C" fn test_entry(stack: Stack) -> Stack {
570 COUNTER.fetch_add(1, Ordering::SeqCst);
571 // Return the stack as-is (caller will free it)
572 stack
573 }
574
575 let initial_stack = push(std::ptr::null_mut(), Value::Int(99));
576 strand_spawn(test_entry, initial_stack);
577
578 std::thread::sleep(std::time::Duration::from_millis(200));
579 assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
580 }
581 }
582
583 #[test]
584 fn test_scheduler_shutdown() {
585 unsafe {
586 scheduler_init();
587 scheduler_shutdown();
588 // Should not crash
589 }
590 }
591
592 #[test]
593 fn test_many_strands_stress() {
594 unsafe {
595 static COUNTER: AtomicU32 = AtomicU32::new(0);
596
597 extern "C" fn increment(_stack: Stack) -> Stack {
598 COUNTER.fetch_add(1, Ordering::SeqCst);
599 std::ptr::null_mut()
600 }
601
602 // Reset counter for this test
603 COUNTER.store(0, Ordering::SeqCst);
604
605 // Spawn many strands to stress test synchronization
606 for _ in 0..1000 {
607 strand_spawn(increment, std::ptr::null_mut());
608 }
609
610 // Wait for all to complete
611 wait_all_strands();
612
613 // Verify all strands executed
614 assert_eq!(COUNTER.load(Ordering::SeqCst), 1000);
615 }
616 }
617
618 #[test]
619 fn test_strand_ids_are_unique() {
620 unsafe {
621 use std::collections::HashSet;
622
623 extern "C" fn noop(_stack: Stack) -> Stack {
624 std::ptr::null_mut()
625 }
626
627 // Spawn strands and collect their IDs
628 let mut ids = Vec::new();
629 for _ in 0..100 {
630 let id = strand_spawn(noop, std::ptr::null_mut());
631 ids.push(id);
632 }
633
634 // Wait for completion
635 wait_all_strands();
636
637 // Verify all IDs are unique
638 let unique_ids: HashSet<_> = ids.iter().collect();
639 assert_eq!(unique_ids.len(), 100, "All strand IDs should be unique");
640
641 // Verify all IDs are positive
642 assert!(
643 ids.iter().all(|&id| id > 0),
644 "All strand IDs should be positive"
645 );
646 }
647 }
648
649 #[test]
650 fn test_arena_reset_with_strands() {
651 unsafe {
652 use crate::arena;
653 use crate::seqstring::arena_string;
654
655 extern "C" fn create_temp_strings(stack: Stack) -> Stack {
656 // Create many temporary arena strings (simulating request parsing)
657 for i in 0..100 {
658 let temp = arena_string(&format!("temporary string {}", i));
659 // Use the string temporarily
660 assert!(!temp.as_str().is_empty());
661 // String is dropped, but memory stays in arena
662 }
663
664 // Arena should have allocated memory
665 let stats = arena::arena_stats();
666 assert!(stats.allocated_bytes > 0, "Arena should have allocations");
667
668 stack // Return empty stack
669 }
670
671 // Reset arena before test
672 arena::arena_reset();
673
674 // Spawn strand that creates many temp strings
675 strand_spawn(create_temp_strings, std::ptr::null_mut());
676
677 // Wait for strand to complete (which calls free_stack -> arena_reset)
678 wait_all_strands();
679
680 // After strand exits, arena should be reset
681 let stats_after = arena::arena_stats();
682 assert_eq!(
683 stats_after.allocated_bytes, 0,
684 "Arena should be reset after strand exits"
685 );
686 }
687 }
688
689 #[test]
690 fn test_arena_with_channel_send() {
691 unsafe {
692 use crate::channel::{close_channel, make_channel};
693 use crate::stack::{pop, push};
694 use crate::value::Value;
695 use std::sync::atomic::{AtomicU32, Ordering};
696
697 static RECEIVED_COUNT: AtomicU32 = AtomicU32::new(0);
698
699 // Create channel
700 let stack = std::ptr::null_mut();
701 let stack = make_channel(stack);
702 let (stack, chan_val) = pop(stack);
703 let chan_id = match chan_val {
704 Value::Int(id) => id,
705 _ => panic!("Expected channel ID"),
706 };
707
708 // Sender strand: creates arena string, sends through channel
709 extern "C" fn sender(stack: Stack) -> Stack {
710 use crate::channel::send;
711 use crate::seqstring::arena_string;
712 use crate::stack::{pop, push};
713 use crate::value::Value;
714
715 unsafe {
716 // Extract channel ID from stack
717 let (stack, chan_val) = pop(stack);
718 let chan_id = match chan_val {
719 Value::Int(id) => id,
720 _ => panic!("Expected channel ID"),
721 };
722
723 // Create arena string
724 let msg = arena_string("Hello from sender!");
725
726 // Push string and channel ID for send
727 let stack = push(stack, Value::String(msg));
728 let stack = push(stack, Value::Int(chan_id));
729
730 // Send (will clone to global)
731 send(stack)
732 }
733 }
734
735 // Receiver strand: receives string from channel
736 extern "C" fn receiver(stack: Stack) -> Stack {
737 use crate::channel::receive;
738 use crate::stack::{pop, push};
739 use crate::value::Value;
740 use std::sync::atomic::Ordering;
741
742 unsafe {
743 // Extract channel ID from stack
744 let (stack, chan_val) = pop(stack);
745 let chan_id = match chan_val {
746 Value::Int(id) => id,
747 _ => panic!("Expected channel ID"),
748 };
749
750 // Push channel ID for receive
751 let stack = push(stack, Value::Int(chan_id));
752
753 // Receive message
754 let stack = receive(stack);
755
756 // Pop and verify message
757 let (stack, msg_val) = pop(stack);
758 match msg_val {
759 Value::String(s) => {
760 assert_eq!(s.as_str(), "Hello from sender!");
761 RECEIVED_COUNT.fetch_add(1, Ordering::SeqCst);
762 }
763 _ => panic!("Expected String"),
764 }
765
766 stack
767 }
768 }
769
770 // Spawn sender and receiver
771 let sender_stack = push(std::ptr::null_mut(), Value::Int(chan_id));
772 strand_spawn(sender, sender_stack);
773
774 let receiver_stack = push(std::ptr::null_mut(), Value::Int(chan_id));
775 strand_spawn(receiver, receiver_stack);
776
777 // Wait for both strands
778 wait_all_strands();
779
780 // Verify message was received
781 assert_eq!(
782 RECEIVED_COUNT.load(Ordering::SeqCst),
783 1,
784 "Receiver should have received message"
785 );
786
787 // Clean up channel
788 let stack = push(stack, Value::Int(chan_id));
789 close_channel(stack);
790 }
791 }
792
793 #[test]
794 fn test_no_memory_leak_over_many_iterations() {
795 // PR #11 feedback: Verify 10K+ strand iterations don't cause memory growth
796 unsafe {
797 use crate::arena;
798 use crate::seqstring::arena_string;
799
800 extern "C" fn allocate_strings_and_exit(stack: Stack) -> Stack {
801 // Simulate request processing: many temp allocations
802 for i in 0..50 {
803 let temp = arena_string(&format!("request header {}", i));
804 assert!(!temp.as_str().is_empty());
805 // Strings dropped here but arena memory stays allocated
806 }
807 stack
808 }
809
810 // Run many iterations to detect leaks
811 let iterations = 10_000;
812
813 for i in 0..iterations {
814 // Reset arena before each iteration to start fresh
815 arena::arena_reset();
816
817 // Spawn strand, let it allocate strings, then exit
818 strand_spawn(allocate_strings_and_exit, std::ptr::null_mut());
819
820 // Wait for completion (triggers arena reset)
821 wait_all_strands();
822
823 // Every 1000 iterations, verify arena is actually reset
824 if i % 1000 == 0 {
825 let stats = arena::arena_stats();
826 assert_eq!(
827 stats.allocated_bytes, 0,
828 "Arena not reset after iteration {} (leaked {} bytes)",
829 i, stats.allocated_bytes
830 );
831 }
832 }
833
834 // Final verification: arena should be empty
835 let final_stats = arena::arena_stats();
836 assert_eq!(
837 final_stats.allocated_bytes, 0,
838 "Arena leaked memory after {} iterations ({} bytes)",
839 iterations, final_stats.allocated_bytes
840 );
841
842 println!(
843 "✓ Memory leak test passed: {} iterations with no growth",
844 iterations
845 );
846 }
847 }
848
849 #[test]
850 fn test_parse_stack_size_valid() {
851 assert_eq!(parse_stack_size(Some("2097152".to_string())), 2097152);
852 assert_eq!(parse_stack_size(Some("1".to_string())), 1);
853 assert_eq!(parse_stack_size(Some("999999999".to_string())), 999999999);
854 }
855
856 #[test]
857 fn test_parse_stack_size_none() {
858 assert_eq!(parse_stack_size(None), DEFAULT_STACK_SIZE);
859 }
860
861 #[test]
862 fn test_parse_stack_size_zero() {
863 // Zero should fall back to default (with warning printed to stderr)
864 assert_eq!(parse_stack_size(Some("0".to_string())), DEFAULT_STACK_SIZE);
865 }
866
867 #[test]
868 fn test_parse_stack_size_invalid() {
869 // Non-numeric should fall back to default (with warning printed to stderr)
870 assert_eq!(
871 parse_stack_size(Some("invalid".to_string())),
872 DEFAULT_STACK_SIZE
873 );
874 assert_eq!(
875 parse_stack_size(Some("-100".to_string())),
876 DEFAULT_STACK_SIZE
877 );
878 assert_eq!(parse_stack_size(Some("".to_string())), DEFAULT_STACK_SIZE);
879 assert_eq!(
880 parse_stack_size(Some("1.5".to_string())),
881 DEFAULT_STACK_SIZE
882 );
883 }
884
885 #[test]
886 fn test_strand_registry_basic() {
887 let registry = StrandRegistry::new(10);
888
889 // Register some strands
890 assert_eq!(registry.register(1), Some(0)); // First slot
891 assert_eq!(registry.register(2), Some(1)); // Second slot
892 assert_eq!(registry.register(3), Some(2)); // Third slot
893
894 // Verify active strands
895 let active: Vec<_> = registry.active_strands().collect();
896 assert_eq!(active.len(), 3);
897
898 // Unregister one
899 assert!(registry.unregister(2));
900 let active: Vec<_> = registry.active_strands().collect();
901 assert_eq!(active.len(), 2);
902
903 // Unregister non-existent should return false
904 assert!(!registry.unregister(999));
905 }
906
907 #[test]
908 fn test_strand_registry_overflow() {
909 let registry = StrandRegistry::new(3); // Small capacity
910
911 // Fill it up
912 assert!(registry.register(1).is_some());
913 assert!(registry.register(2).is_some());
914 assert!(registry.register(3).is_some());
915
916 // Next should overflow
917 assert!(registry.register(4).is_none());
918 assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 1);
919
920 // Another overflow
921 assert!(registry.register(5).is_none());
922 assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 2);
923 }
924
925 #[test]
926 fn test_strand_registry_slot_reuse() {
927 let registry = StrandRegistry::new(3);
928
929 // Fill it up
930 registry.register(1);
931 registry.register(2);
932 registry.register(3);
933
934 // Unregister middle one
935 registry.unregister(2);
936
937 // New registration should reuse the slot
938 assert!(registry.register(4).is_some());
939 assert_eq!(registry.active_strands().count(), 3);
940 }
941
942 #[test]
943 fn test_strand_registry_concurrent_stress() {
944 use std::sync::Arc;
945 use std::thread;
946
947 let registry = Arc::new(StrandRegistry::new(50)); // Moderate capacity
948
949 let handles: Vec<_> = (0..100)
950 .map(|i| {
951 let reg = Arc::clone(®istry);
952 thread::spawn(move || {
953 let id = (i + 1) as u64;
954 // Register
955 let _ = reg.register(id);
956 // Brief work
957 thread::yield_now();
958 // Unregister
959 reg.unregister(id);
960 })
961 })
962 .collect();
963
964 for h in handles {
965 h.join().unwrap();
966 }
967
968 // All slots should be free after all threads complete
969 assert_eq!(registry.active_strands().count(), 0);
970 }
971
972 #[test]
973 fn test_strand_lifecycle_counters() {
974 unsafe {
975 // Reset counters for isolation (not perfect but helps)
976 let initial_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
977 let initial_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
978
979 static COUNTER: AtomicU32 = AtomicU32::new(0);
980
981 extern "C" fn simple_work(_stack: Stack) -> Stack {
982 COUNTER.fetch_add(1, Ordering::SeqCst);
983 std::ptr::null_mut()
984 }
985
986 COUNTER.store(0, Ordering::SeqCst);
987
988 // Spawn some strands
989 for _ in 0..10 {
990 strand_spawn(simple_work, std::ptr::null_mut());
991 }
992
993 wait_all_strands();
994
995 // Verify counters incremented
996 let final_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
997 let final_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
998
999 assert!(
1000 final_spawned >= initial_spawned + 10,
1001 "TOTAL_SPAWNED should have increased by at least 10"
1002 );
1003 assert!(
1004 final_completed >= initial_completed + 10,
1005 "TOTAL_COMPLETED should have increased by at least 10"
1006 );
1007 assert_eq!(COUNTER.load(Ordering::SeqCst), 10);
1008 }
1009 }
1010}