seq_runtime/
scheduler.rs

1//! Scheduler - Green Thread Management with May
2//!
3//! CSP-style concurrency for Seq using May coroutines.
4//! Each strand is a lightweight green thread that can communicate via channels.
5//!
6//! ## Non-Blocking Guarantee
7//!
8//! Channel operations (`send`, `receive`) use May's cooperative blocking and NEVER
9//! block OS threads. However, I/O operations (`write_line`, `read_line` in io.rs)
10//! currently use blocking syscalls. Future work will make all I/O non-blocking.
11//!
12//! ## Panic Behavior
13//!
14//! Functions panic on invalid input (null stacks, negative IDs, closed channels).
15//! In a production system, consider implementing error channels or Result-based
16//! error handling instead of panicking.
17
18use crate::pool;
19use crate::stack::{Stack, StackNode};
20use may::coroutine;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::{Condvar, Mutex, Once};
23
24static SCHEDULER_INIT: Once = Once::new();
25
26// Strand lifecycle tracking
27//
28// Design rationale:
29// - ACTIVE_STRANDS: Lock-free atomic counter for the hot path (spawn/complete)
30//   Every strand increments on spawn, decrements on complete. This is extremely
31//   fast (lock-free atomic ops) and suitable for high-frequency operations.
32//
33// - SHUTDOWN_CONDVAR/MUTEX: Event-driven synchronization for the cold path (shutdown wait)
34//   Used only when waiting for all strands to complete (program shutdown).
35//   Condvar provides event-driven wakeup instead of polling, which is critical
36//   for a systems language - no CPU waste, proper OS-level blocking.
37//
38// Why not track JoinHandles?
39// Strands are like Erlang processes - potentially hundreds of thousands of concurrent
40// entities with independent lifecycles. Storing handles would require global mutable
41// state with synchronization overhead on the hot path. The counter + condvar approach
42// keeps the hot path lock-free while providing proper shutdown synchronization.
43static ACTIVE_STRANDS: AtomicUsize = AtomicUsize::new(0);
44static SHUTDOWN_CONDVAR: Condvar = Condvar::new();
45static SHUTDOWN_MUTEX: Mutex<()> = Mutex::new(());
46
47// Unique strand ID generation
48static NEXT_STRAND_ID: AtomicU64 = AtomicU64::new(1);
49
50/// Initialize the scheduler
51///
52/// # Safety
53/// Safe to call multiple times (idempotent via Once).
54/// Configures May coroutines with appropriate stack size for LLVM-generated code.
55#[unsafe(no_mangle)]
56pub unsafe extern "C" fn patch_seq_scheduler_init() {
57    SCHEDULER_INIT.call_once(|| {
58        // Configure stack size for coroutines
59        // Default is 0x1000 words (32KB on 64-bit), which is too small for LLVM-generated code
60        // Using 8MB (0x100000 words) - balanced between safety and May's maximum limit
61        // May has internal maximum (attempting 64MB causes ExceedsMaximumSize panic)
62        may::config().set_stack_size(0x100000);
63    });
64}
65
66/// Run the scheduler and wait for all coroutines to complete
67///
68/// # Safety
69/// Returns the final stack (always null for now since May handles all scheduling).
70/// This function blocks until all spawned strands have completed.
71///
72/// Uses a condition variable for event-driven shutdown synchronization rather than
73/// polling. The mutex is only held during the wait protocol, not during strand
74/// execution, so there's no contention on the hot path.
75#[unsafe(no_mangle)]
76pub unsafe extern "C" fn patch_seq_scheduler_run() -> Stack {
77    let mut guard = SHUTDOWN_MUTEX.lock().expect(
78        "scheduler_run: shutdown mutex poisoned - strand panicked during shutdown synchronization",
79    );
80
81    // Wait for all strands to complete
82    // The condition variable will be notified when the last strand exits
83    while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
84        guard = SHUTDOWN_CONDVAR
85            .wait(guard)
86            .expect("scheduler_run: condvar wait failed - strand panicked during shutdown wait");
87    }
88
89    // All strands have completed
90    std::ptr::null_mut()
91}
92
93/// Shutdown the scheduler
94///
95/// # Safety
96/// Safe to call. May doesn't require explicit shutdown, so this is a no-op.
97#[unsafe(no_mangle)]
98pub unsafe extern "C" fn patch_seq_scheduler_shutdown() {
99    // May doesn't require explicit shutdown
100    // This function exists for API symmetry with init
101}
102
103/// Spawn a strand (coroutine) with initial stack
104///
105/// # Safety
106/// - `entry` must be a valid function pointer that can safely execute on any thread
107/// - `initial_stack` must be either null or a valid pointer to a `StackNode` that:
108///   - Was heap-allocated (e.g., via Box)
109///   - Has a 'static lifetime or lives longer than the coroutine
110///   - Is safe to access from the spawned thread
111/// - The caller transfers ownership of `initial_stack` to the coroutine
112/// - Returns a unique strand ID (positive integer)
113///
114/// # Memory Management
115/// The spawned coroutine takes ownership of `initial_stack` and will automatically
116/// free the final stack returned by `entry` upon completion.
117#[unsafe(no_mangle)]
118pub unsafe extern "C" fn patch_seq_strand_spawn(
119    entry: extern "C" fn(Stack) -> Stack,
120    initial_stack: Stack,
121) -> i64 {
122    // Generate unique strand ID
123    let strand_id = NEXT_STRAND_ID.fetch_add(1, Ordering::Relaxed);
124
125    // Increment active strand counter
126    ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
127
128    // Function pointers are already Send, no wrapper needed
129    let entry_fn = entry;
130
131    // Convert pointer to usize (which is Send)
132    // This is necessary because *mut T is !Send, but the caller guarantees thread safety
133    let stack_addr = initial_stack as usize;
134
135    unsafe {
136        coroutine::spawn(move || {
137            // Reconstruct pointer from address
138            let stack_ptr = stack_addr as *mut StackNode;
139
140            // Debug assertion: validate stack pointer alignment and reasonable address
141            debug_assert!(
142                stack_ptr.is_null() || stack_addr.is_multiple_of(std::mem::align_of::<StackNode>()),
143                "Stack pointer must be null or properly aligned"
144            );
145            debug_assert!(
146                stack_ptr.is_null() || stack_addr > 0x1000,
147                "Stack pointer appears to be in invalid memory region (< 0x1000)"
148            );
149
150            // Execute the entry function
151            let final_stack = entry_fn(stack_ptr);
152
153            // Clean up the final stack to prevent memory leak
154            free_stack(final_stack);
155
156            // Decrement active strand counter
157            // If this was the last strand, notify anyone waiting for shutdown
158            // Use AcqRel to establish proper synchronization (both acquire and release barriers)
159            let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
160            if prev_count == 1 {
161                // We were the last strand - acquire mutex and signal shutdown
162                // The mutex must be held when calling notify to prevent missed wakeups
163                let _guard = SHUTDOWN_MUTEX.lock()
164                    .expect("strand_spawn: shutdown mutex poisoned - strand panicked during shutdown notification");
165                SHUTDOWN_CONDVAR.notify_all();
166            }
167        });
168    }
169
170    strand_id as i64
171}
172
173/// Free a stack allocated by the runtime
174///
175/// # Safety
176/// - `stack` must be either:
177///   - A null pointer (safe, will be a no-op)
178///   - A valid pointer returned by runtime stack functions (push, etc.)
179/// - The pointer must not have been previously freed
180/// - After calling this function, the pointer is invalid and must not be used
181/// - This function takes ownership and returns nodes to the pool
182///
183/// # Performance
184/// Returns nodes to thread-local pool for reuse instead of freeing to heap
185fn free_stack(mut stack: Stack) {
186    if !stack.is_null() {
187        use crate::value::Value;
188        unsafe {
189            // Walk the stack and return each node to the pool
190            while !stack.is_null() {
191                let next = (*stack).next;
192                // Drop the value, then return node to pool
193                // We need to drop the value to free any heap allocations (String, Variant)
194                drop(std::mem::replace(&mut (*stack).value, Value::Int(0)));
195                // Return node to pool for reuse
196                pool::pool_free(stack);
197                stack = next;
198            }
199        }
200    }
201
202    // Reset the thread-local arena to free all arena-allocated strings
203    // This is safe because:
204    // - Any arena strings in Values have been dropped above
205    // - Global strings are unaffected (they have their own allocations)
206    // - Channel sends clone to global, so no cross-strand arena pointers
207    crate::arena::arena_reset();
208}
209
210/// Legacy spawn_strand function (kept for compatibility)
211///
212/// # Safety
213/// `entry` must be a valid function pointer that can safely execute on any thread.
214#[unsafe(no_mangle)]
215pub unsafe extern "C" fn patch_seq_spawn_strand(entry: extern "C" fn(Stack) -> Stack) {
216    unsafe {
217        patch_seq_strand_spawn(entry, std::ptr::null_mut());
218    }
219}
220
221/// Yield execution to allow other coroutines to run
222///
223/// # Safety
224/// Always safe to call from within a May coroutine.
225#[unsafe(no_mangle)]
226pub unsafe extern "C" fn patch_seq_yield_strand() {
227    coroutine::yield_now();
228}
229
230/// Wait for all strands to complete
231///
232/// # Safety
233/// Always safe to call. Blocks until all spawned strands have completed.
234///
235/// Uses event-driven synchronization via condition variable - no polling overhead.
236#[unsafe(no_mangle)]
237pub unsafe extern "C" fn patch_seq_wait_all_strands() {
238    let mut guard = SHUTDOWN_MUTEX.lock()
239        .expect("wait_all_strands: shutdown mutex poisoned - strand panicked during shutdown synchronization");
240
241    // Wait for all strands to complete
242    // The condition variable will be notified when the last strand exits
243    while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
244        guard = SHUTDOWN_CONDVAR
245            .wait(guard)
246            .expect("wait_all_strands: condvar wait failed - strand panicked during shutdown wait");
247    }
248}
249
250// Public re-exports with short names for internal use
251pub use patch_seq_scheduler_init as scheduler_init;
252pub use patch_seq_scheduler_run as scheduler_run;
253pub use patch_seq_scheduler_shutdown as scheduler_shutdown;
254pub use patch_seq_spawn_strand as spawn_strand;
255pub use patch_seq_strand_spawn as strand_spawn;
256pub use patch_seq_wait_all_strands as wait_all_strands;
257pub use patch_seq_yield_strand as yield_strand;
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262    use crate::stack::push;
263    use crate::value::Value;
264    use std::sync::atomic::{AtomicU32, Ordering};
265
266    #[test]
267    fn test_spawn_strand() {
268        unsafe {
269            static COUNTER: AtomicU32 = AtomicU32::new(0);
270
271            extern "C" fn test_entry(_stack: Stack) -> Stack {
272                COUNTER.fetch_add(1, Ordering::SeqCst);
273                std::ptr::null_mut()
274            }
275
276            for _ in 0..100 {
277                spawn_strand(test_entry);
278            }
279
280            std::thread::sleep(std::time::Duration::from_millis(200));
281            assert_eq!(COUNTER.load(Ordering::SeqCst), 100);
282        }
283    }
284
285    #[test]
286    fn test_scheduler_init_idempotent() {
287        unsafe {
288            // Should be safe to call multiple times
289            scheduler_init();
290            scheduler_init();
291            scheduler_init();
292        }
293    }
294
295    #[test]
296    fn test_free_stack_null() {
297        // Freeing null should be a no-op
298        free_stack(std::ptr::null_mut());
299    }
300
301    #[test]
302    fn test_free_stack_valid() {
303        unsafe {
304            // Create a stack, then free it
305            let stack = push(std::ptr::null_mut(), Value::Int(42));
306            free_stack(stack);
307            // If we get here without crashing, test passed
308        }
309    }
310
311    #[test]
312    fn test_strand_spawn_with_stack() {
313        unsafe {
314            static COUNTER: AtomicU32 = AtomicU32::new(0);
315
316            extern "C" fn test_entry(stack: Stack) -> Stack {
317                COUNTER.fetch_add(1, Ordering::SeqCst);
318                // Return the stack as-is (caller will free it)
319                stack
320            }
321
322            let initial_stack = push(std::ptr::null_mut(), Value::Int(99));
323            strand_spawn(test_entry, initial_stack);
324
325            std::thread::sleep(std::time::Duration::from_millis(200));
326            assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
327        }
328    }
329
330    #[test]
331    fn test_scheduler_shutdown() {
332        unsafe {
333            scheduler_init();
334            scheduler_shutdown();
335            // Should not crash
336        }
337    }
338
339    #[test]
340    fn test_many_strands_stress() {
341        unsafe {
342            static COUNTER: AtomicU32 = AtomicU32::new(0);
343
344            extern "C" fn increment(_stack: Stack) -> Stack {
345                COUNTER.fetch_add(1, Ordering::SeqCst);
346                std::ptr::null_mut()
347            }
348
349            // Reset counter for this test
350            COUNTER.store(0, Ordering::SeqCst);
351
352            // Spawn many strands to stress test synchronization
353            for _ in 0..1000 {
354                strand_spawn(increment, std::ptr::null_mut());
355            }
356
357            // Wait for all to complete
358            wait_all_strands();
359
360            // Verify all strands executed
361            assert_eq!(COUNTER.load(Ordering::SeqCst), 1000);
362        }
363    }
364
365    #[test]
366    fn test_strand_ids_are_unique() {
367        unsafe {
368            use std::collections::HashSet;
369
370            extern "C" fn noop(_stack: Stack) -> Stack {
371                std::ptr::null_mut()
372            }
373
374            // Spawn strands and collect their IDs
375            let mut ids = Vec::new();
376            for _ in 0..100 {
377                let id = strand_spawn(noop, std::ptr::null_mut());
378                ids.push(id);
379            }
380
381            // Wait for completion
382            wait_all_strands();
383
384            // Verify all IDs are unique
385            let unique_ids: HashSet<_> = ids.iter().collect();
386            assert_eq!(unique_ids.len(), 100, "All strand IDs should be unique");
387
388            // Verify all IDs are positive
389            assert!(
390                ids.iter().all(|&id| id > 0),
391                "All strand IDs should be positive"
392            );
393        }
394    }
395
396    #[test]
397    fn test_arena_reset_with_strands() {
398        unsafe {
399            use crate::arena;
400            use crate::seqstring::arena_string;
401
402            extern "C" fn create_temp_strings(stack: Stack) -> Stack {
403                // Create many temporary arena strings (simulating request parsing)
404                for i in 0..100 {
405                    let temp = arena_string(&format!("temporary string {}", i));
406                    // Use the string temporarily
407                    assert!(!temp.as_str().is_empty());
408                    // String is dropped, but memory stays in arena
409                }
410
411                // Arena should have allocated memory
412                let stats = arena::arena_stats();
413                assert!(stats.allocated_bytes > 0, "Arena should have allocations");
414
415                stack // Return empty stack
416            }
417
418            // Reset arena before test
419            arena::arena_reset();
420
421            // Spawn strand that creates many temp strings
422            strand_spawn(create_temp_strings, std::ptr::null_mut());
423
424            // Wait for strand to complete (which calls free_stack -> arena_reset)
425            wait_all_strands();
426
427            // After strand exits, arena should be reset
428            let stats_after = arena::arena_stats();
429            assert_eq!(
430                stats_after.allocated_bytes, 0,
431                "Arena should be reset after strand exits"
432            );
433        }
434    }
435
436    #[test]
437    fn test_arena_with_channel_send() {
438        unsafe {
439            use crate::channel::{close_channel, make_channel};
440            use crate::stack::{pop, push};
441            use crate::value::Value;
442            use std::sync::atomic::{AtomicU32, Ordering};
443
444            static RECEIVED_COUNT: AtomicU32 = AtomicU32::new(0);
445
446            // Create channel
447            let stack = std::ptr::null_mut();
448            let stack = make_channel(stack);
449            let (stack, chan_val) = pop(stack);
450            let chan_id = match chan_val {
451                Value::Int(id) => id,
452                _ => panic!("Expected channel ID"),
453            };
454
455            // Sender strand: creates arena string, sends through channel
456            extern "C" fn sender(stack: Stack) -> Stack {
457                use crate::channel::send;
458                use crate::seqstring::arena_string;
459                use crate::stack::{pop, push};
460                use crate::value::Value;
461
462                unsafe {
463                    // Extract channel ID from stack
464                    let (stack, chan_val) = pop(stack);
465                    let chan_id = match chan_val {
466                        Value::Int(id) => id,
467                        _ => panic!("Expected channel ID"),
468                    };
469
470                    // Create arena string
471                    let msg = arena_string("Hello from sender!");
472
473                    // Push string and channel ID for send
474                    let stack = push(stack, Value::String(msg));
475                    let stack = push(stack, Value::Int(chan_id));
476
477                    // Send (will clone to global)
478                    send(stack)
479                }
480            }
481
482            // Receiver strand: receives string from channel
483            extern "C" fn receiver(stack: Stack) -> Stack {
484                use crate::channel::receive;
485                use crate::stack::{pop, push};
486                use crate::value::Value;
487                use std::sync::atomic::Ordering;
488
489                unsafe {
490                    // Extract channel ID from stack
491                    let (stack, chan_val) = pop(stack);
492                    let chan_id = match chan_val {
493                        Value::Int(id) => id,
494                        _ => panic!("Expected channel ID"),
495                    };
496
497                    // Push channel ID for receive
498                    let stack = push(stack, Value::Int(chan_id));
499
500                    // Receive message
501                    let stack = receive(stack);
502
503                    // Pop and verify message
504                    let (stack, msg_val) = pop(stack);
505                    match msg_val {
506                        Value::String(s) => {
507                            assert_eq!(s.as_str(), "Hello from sender!");
508                            RECEIVED_COUNT.fetch_add(1, Ordering::SeqCst);
509                        }
510                        _ => panic!("Expected String"),
511                    }
512
513                    stack
514                }
515            }
516
517            // Spawn sender and receiver
518            let sender_stack = push(std::ptr::null_mut(), Value::Int(chan_id));
519            strand_spawn(sender, sender_stack);
520
521            let receiver_stack = push(std::ptr::null_mut(), Value::Int(chan_id));
522            strand_spawn(receiver, receiver_stack);
523
524            // Wait for both strands
525            wait_all_strands();
526
527            // Verify message was received
528            assert_eq!(
529                RECEIVED_COUNT.load(Ordering::SeqCst),
530                1,
531                "Receiver should have received message"
532            );
533
534            // Clean up channel
535            let stack = push(stack, Value::Int(chan_id));
536            close_channel(stack);
537        }
538    }
539
540    #[test]
541    fn test_no_memory_leak_over_many_iterations() {
542        // PR #11 feedback: Verify 10K+ strand iterations don't cause memory growth
543        unsafe {
544            use crate::arena;
545            use crate::seqstring::arena_string;
546
547            extern "C" fn allocate_strings_and_exit(stack: Stack) -> Stack {
548                // Simulate request processing: many temp allocations
549                for i in 0..50 {
550                    let temp = arena_string(&format!("request header {}", i));
551                    assert!(!temp.as_str().is_empty());
552                    // Strings dropped here but arena memory stays allocated
553                }
554                stack
555            }
556
557            // Run many iterations to detect leaks
558            let iterations = 10_000;
559
560            for i in 0..iterations {
561                // Reset arena before each iteration to start fresh
562                arena::arena_reset();
563
564                // Spawn strand, let it allocate strings, then exit
565                strand_spawn(allocate_strings_and_exit, std::ptr::null_mut());
566
567                // Wait for completion (triggers arena reset)
568                wait_all_strands();
569
570                // Every 1000 iterations, verify arena is actually reset
571                if i % 1000 == 0 {
572                    let stats = arena::arena_stats();
573                    assert_eq!(
574                        stats.allocated_bytes, 0,
575                        "Arena not reset after iteration {} (leaked {} bytes)",
576                        i, stats.allocated_bytes
577                    );
578                }
579            }
580
581            // Final verification: arena should be empty
582            let final_stats = arena::arena_stats();
583            assert_eq!(
584                final_stats.allocated_bytes, 0,
585                "Arena leaked memory after {} iterations ({} bytes)",
586                iterations, final_stats.allocated_bytes
587            );
588
589            println!(
590                "✓ Memory leak test passed: {} iterations with no growth",
591                iterations
592            );
593        }
594    }
595}