seq_runtime/scheduler.rs
1//! Scheduler - Green Thread Management with May
2//!
3//! CSP-style concurrency for Seq using May coroutines.
4//! Each strand is a lightweight green thread that can communicate via channels.
5//!
6//! ## Non-Blocking Guarantee
7//!
8//! Channel operations (`send`, `receive`) use May's cooperative blocking and NEVER
9//! block OS threads. However, I/O operations (`write_line`, `read_line` in io.rs)
10//! currently use blocking syscalls. Future work will make all I/O non-blocking.
11//!
12//! ## Panic Behavior
13//!
14//! Functions panic on invalid input (null stacks, negative IDs, closed channels).
15//! In a production system, consider implementing error channels or Result-based
16//! error handling instead of panicking.
17
18use crate::pool;
19use crate::stack::{Stack, StackNode};
20use may::coroutine;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::{Condvar, Mutex, Once};
23
24static SCHEDULER_INIT: Once = Once::new();
25
26// Strand lifecycle tracking
27//
28// Design rationale:
29// - ACTIVE_STRANDS: Lock-free atomic counter for the hot path (spawn/complete)
30// Every strand increments on spawn, decrements on complete. This is extremely
31// fast (lock-free atomic ops) and suitable for high-frequency operations.
32//
33// - SHUTDOWN_CONDVAR/MUTEX: Event-driven synchronization for the cold path (shutdown wait)
34// Used only when waiting for all strands to complete (program shutdown).
35// Condvar provides event-driven wakeup instead of polling, which is critical
36// for a systems language - no CPU waste, proper OS-level blocking.
37//
38// Why not track JoinHandles?
39// Strands are like Erlang processes - potentially hundreds of thousands of concurrent
40// entities with independent lifecycles. Storing handles would require global mutable
41// state with synchronization overhead on the hot path. The counter + condvar approach
42// keeps the hot path lock-free while providing proper shutdown synchronization.
43static ACTIVE_STRANDS: AtomicUsize = AtomicUsize::new(0);
44static SHUTDOWN_CONDVAR: Condvar = Condvar::new();
45static SHUTDOWN_MUTEX: Mutex<()> = Mutex::new(());
46
47// Unique strand ID generation
48static NEXT_STRAND_ID: AtomicU64 = AtomicU64::new(1);
49
50/// Initialize the scheduler
51///
52/// # Safety
53/// Safe to call multiple times (idempotent via Once).
54/// Configures May coroutines with appropriate stack size for LLVM-generated code.
55#[unsafe(no_mangle)]
56pub unsafe extern "C" fn patch_seq_scheduler_init() {
57 SCHEDULER_INIT.call_once(|| {
58 // Configure stack size for coroutines
59 // Default is 0x1000 words (32KB on 64-bit), which is too small for LLVM-generated code
60 // Using 8MB (0x100000 words) - balanced between safety and May's maximum limit
61 // May has internal maximum (attempting 64MB causes ExceedsMaximumSize panic)
62 may::config().set_stack_size(0x100000);
63 });
64}
65
66/// Run the scheduler and wait for all coroutines to complete
67///
68/// # Safety
69/// Returns the final stack (always null for now since May handles all scheduling).
70/// This function blocks until all spawned strands have completed.
71///
72/// Uses a condition variable for event-driven shutdown synchronization rather than
73/// polling. The mutex is only held during the wait protocol, not during strand
74/// execution, so there's no contention on the hot path.
75#[unsafe(no_mangle)]
76pub unsafe extern "C" fn patch_seq_scheduler_run() -> Stack {
77 let mut guard = SHUTDOWN_MUTEX.lock().expect(
78 "scheduler_run: shutdown mutex poisoned - strand panicked during shutdown synchronization",
79 );
80
81 // Wait for all strands to complete
82 // The condition variable will be notified when the last strand exits
83 while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
84 guard = SHUTDOWN_CONDVAR
85 .wait(guard)
86 .expect("scheduler_run: condvar wait failed - strand panicked during shutdown wait");
87 }
88
89 // All strands have completed
90 std::ptr::null_mut()
91}
92
93/// Shutdown the scheduler
94///
95/// # Safety
96/// Safe to call. May doesn't require explicit shutdown, so this is a no-op.
97#[unsafe(no_mangle)]
98pub unsafe extern "C" fn patch_seq_scheduler_shutdown() {
99 // May doesn't require explicit shutdown
100 // This function exists for API symmetry with init
101}
102
103/// Spawn a strand (coroutine) with initial stack
104///
105/// # Safety
106/// - `entry` must be a valid function pointer that can safely execute on any thread
107/// - `initial_stack` must be either null or a valid pointer to a `StackNode` that:
108/// - Was heap-allocated (e.g., via Box)
109/// - Has a 'static lifetime or lives longer than the coroutine
110/// - Is safe to access from the spawned thread
111/// - The caller transfers ownership of `initial_stack` to the coroutine
112/// - Returns a unique strand ID (positive integer)
113///
114/// # Memory Management
115/// The spawned coroutine takes ownership of `initial_stack` and will automatically
116/// free the final stack returned by `entry` upon completion.
117#[unsafe(no_mangle)]
118pub unsafe extern "C" fn patch_seq_strand_spawn(
119 entry: extern "C" fn(Stack) -> Stack,
120 initial_stack: Stack,
121) -> i64 {
122 // Generate unique strand ID
123 let strand_id = NEXT_STRAND_ID.fetch_add(1, Ordering::Relaxed);
124
125 // Increment active strand counter
126 ACTIVE_STRANDS.fetch_add(1, Ordering::Release);
127
128 // Function pointers are already Send, no wrapper needed
129 let entry_fn = entry;
130
131 // Convert pointer to usize (which is Send)
132 // This is necessary because *mut T is !Send, but the caller guarantees thread safety
133 let stack_addr = initial_stack as usize;
134
135 unsafe {
136 coroutine::spawn(move || {
137 // Reconstruct pointer from address
138 let stack_ptr = stack_addr as *mut StackNode;
139
140 // Debug assertion: validate stack pointer alignment and reasonable address
141 debug_assert!(
142 stack_ptr.is_null() || stack_addr.is_multiple_of(std::mem::align_of::<StackNode>()),
143 "Stack pointer must be null or properly aligned"
144 );
145 debug_assert!(
146 stack_ptr.is_null() || stack_addr > 0x1000,
147 "Stack pointer appears to be in invalid memory region (< 0x1000)"
148 );
149
150 // Execute the entry function
151 let final_stack = entry_fn(stack_ptr);
152
153 // Clean up the final stack to prevent memory leak
154 free_stack(final_stack);
155
156 // Decrement active strand counter
157 // If this was the last strand, notify anyone waiting for shutdown
158 // Use AcqRel to establish proper synchronization (both acquire and release barriers)
159 let prev_count = ACTIVE_STRANDS.fetch_sub(1, Ordering::AcqRel);
160 if prev_count == 1 {
161 // We were the last strand - acquire mutex and signal shutdown
162 // The mutex must be held when calling notify to prevent missed wakeups
163 let _guard = SHUTDOWN_MUTEX.lock()
164 .expect("strand_spawn: shutdown mutex poisoned - strand panicked during shutdown notification");
165 SHUTDOWN_CONDVAR.notify_all();
166 }
167 });
168 }
169
170 strand_id as i64
171}
172
173/// Free a stack allocated by the runtime
174///
175/// # Safety
176/// - `stack` must be either:
177/// - A null pointer (safe, will be a no-op)
178/// - A valid pointer returned by runtime stack functions (push, etc.)
179/// - The pointer must not have been previously freed
180/// - After calling this function, the pointer is invalid and must not be used
181/// - This function takes ownership and returns nodes to the pool
182///
183/// # Performance
184/// Returns nodes to thread-local pool for reuse instead of freeing to heap
185fn free_stack(mut stack: Stack) {
186 if !stack.is_null() {
187 use crate::value::Value;
188 unsafe {
189 // Walk the stack and return each node to the pool
190 while !stack.is_null() {
191 let next = (*stack).next;
192 // Drop the value, then return node to pool
193 // We need to drop the value to free any heap allocations (String, Variant)
194 drop(std::mem::replace(&mut (*stack).value, Value::Int(0)));
195 // Return node to pool for reuse
196 pool::pool_free(stack);
197 stack = next;
198 }
199 }
200 }
201
202 // Reset the thread-local arena to free all arena-allocated strings
203 // This is safe because:
204 // - Any arena strings in Values have been dropped above
205 // - Global strings are unaffected (they have their own allocations)
206 // - Channel sends clone to global, so no cross-strand arena pointers
207 crate::arena::arena_reset();
208}
209
210/// Legacy spawn_strand function (kept for compatibility)
211///
212/// # Safety
213/// `entry` must be a valid function pointer that can safely execute on any thread.
214#[unsafe(no_mangle)]
215pub unsafe extern "C" fn patch_seq_spawn_strand(entry: extern "C" fn(Stack) -> Stack) {
216 unsafe {
217 patch_seq_strand_spawn(entry, std::ptr::null_mut());
218 }
219}
220
221/// Yield execution to allow other coroutines to run
222///
223/// # Safety
224/// Always safe to call from within a May coroutine.
225#[unsafe(no_mangle)]
226pub unsafe extern "C" fn patch_seq_yield_strand() {
227 coroutine::yield_now();
228}
229
230/// Wait for all strands to complete
231///
232/// # Safety
233/// Always safe to call. Blocks until all spawned strands have completed.
234///
235/// Uses event-driven synchronization via condition variable - no polling overhead.
236#[unsafe(no_mangle)]
237pub unsafe extern "C" fn patch_seq_wait_all_strands() {
238 let mut guard = SHUTDOWN_MUTEX.lock()
239 .expect("wait_all_strands: shutdown mutex poisoned - strand panicked during shutdown synchronization");
240
241 // Wait for all strands to complete
242 // The condition variable will be notified when the last strand exits
243 while ACTIVE_STRANDS.load(Ordering::Acquire) > 0 {
244 guard = SHUTDOWN_CONDVAR
245 .wait(guard)
246 .expect("wait_all_strands: condvar wait failed - strand panicked during shutdown wait");
247 }
248}
249
250// Public re-exports with short names for internal use
251pub use patch_seq_scheduler_init as scheduler_init;
252pub use patch_seq_scheduler_run as scheduler_run;
253pub use patch_seq_scheduler_shutdown as scheduler_shutdown;
254pub use patch_seq_spawn_strand as spawn_strand;
255pub use patch_seq_strand_spawn as strand_spawn;
256pub use patch_seq_wait_all_strands as wait_all_strands;
257pub use patch_seq_yield_strand as yield_strand;
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262 use crate::stack::push;
263 use crate::value::Value;
264 use std::sync::atomic::{AtomicU32, Ordering};
265
266 #[test]
267 fn test_spawn_strand() {
268 unsafe {
269 static COUNTER: AtomicU32 = AtomicU32::new(0);
270
271 extern "C" fn test_entry(_stack: Stack) -> Stack {
272 COUNTER.fetch_add(1, Ordering::SeqCst);
273 std::ptr::null_mut()
274 }
275
276 for _ in 0..100 {
277 spawn_strand(test_entry);
278 }
279
280 std::thread::sleep(std::time::Duration::from_millis(200));
281 assert_eq!(COUNTER.load(Ordering::SeqCst), 100);
282 }
283 }
284
285 #[test]
286 fn test_scheduler_init_idempotent() {
287 unsafe {
288 // Should be safe to call multiple times
289 scheduler_init();
290 scheduler_init();
291 scheduler_init();
292 }
293 }
294
295 #[test]
296 fn test_free_stack_null() {
297 // Freeing null should be a no-op
298 free_stack(std::ptr::null_mut());
299 }
300
301 #[test]
302 fn test_free_stack_valid() {
303 unsafe {
304 // Create a stack, then free it
305 let stack = push(std::ptr::null_mut(), Value::Int(42));
306 free_stack(stack);
307 // If we get here without crashing, test passed
308 }
309 }
310
311 #[test]
312 fn test_strand_spawn_with_stack() {
313 unsafe {
314 static COUNTER: AtomicU32 = AtomicU32::new(0);
315
316 extern "C" fn test_entry(stack: Stack) -> Stack {
317 COUNTER.fetch_add(1, Ordering::SeqCst);
318 // Return the stack as-is (caller will free it)
319 stack
320 }
321
322 let initial_stack = push(std::ptr::null_mut(), Value::Int(99));
323 strand_spawn(test_entry, initial_stack);
324
325 std::thread::sleep(std::time::Duration::from_millis(200));
326 assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
327 }
328 }
329
330 #[test]
331 fn test_scheduler_shutdown() {
332 unsafe {
333 scheduler_init();
334 scheduler_shutdown();
335 // Should not crash
336 }
337 }
338
339 #[test]
340 fn test_many_strands_stress() {
341 unsafe {
342 static COUNTER: AtomicU32 = AtomicU32::new(0);
343
344 extern "C" fn increment(_stack: Stack) -> Stack {
345 COUNTER.fetch_add(1, Ordering::SeqCst);
346 std::ptr::null_mut()
347 }
348
349 // Reset counter for this test
350 COUNTER.store(0, Ordering::SeqCst);
351
352 // Spawn many strands to stress test synchronization
353 for _ in 0..1000 {
354 strand_spawn(increment, std::ptr::null_mut());
355 }
356
357 // Wait for all to complete
358 wait_all_strands();
359
360 // Verify all strands executed
361 assert_eq!(COUNTER.load(Ordering::SeqCst), 1000);
362 }
363 }
364
365 #[test]
366 fn test_strand_ids_are_unique() {
367 unsafe {
368 use std::collections::HashSet;
369
370 extern "C" fn noop(_stack: Stack) -> Stack {
371 std::ptr::null_mut()
372 }
373
374 // Spawn strands and collect their IDs
375 let mut ids = Vec::new();
376 for _ in 0..100 {
377 let id = strand_spawn(noop, std::ptr::null_mut());
378 ids.push(id);
379 }
380
381 // Wait for completion
382 wait_all_strands();
383
384 // Verify all IDs are unique
385 let unique_ids: HashSet<_> = ids.iter().collect();
386 assert_eq!(unique_ids.len(), 100, "All strand IDs should be unique");
387
388 // Verify all IDs are positive
389 assert!(
390 ids.iter().all(|&id| id > 0),
391 "All strand IDs should be positive"
392 );
393 }
394 }
395
396 #[test]
397 fn test_arena_reset_with_strands() {
398 unsafe {
399 use crate::arena;
400 use crate::seqstring::arena_string;
401
402 extern "C" fn create_temp_strings(stack: Stack) -> Stack {
403 // Create many temporary arena strings (simulating request parsing)
404 for i in 0..100 {
405 let temp = arena_string(&format!("temporary string {}", i));
406 // Use the string temporarily
407 assert!(!temp.as_str().is_empty());
408 // String is dropped, but memory stays in arena
409 }
410
411 // Arena should have allocated memory
412 let stats = arena::arena_stats();
413 assert!(stats.allocated_bytes > 0, "Arena should have allocations");
414
415 stack // Return empty stack
416 }
417
418 // Reset arena before test
419 arena::arena_reset();
420
421 // Spawn strand that creates many temp strings
422 strand_spawn(create_temp_strings, std::ptr::null_mut());
423
424 // Wait for strand to complete (which calls free_stack -> arena_reset)
425 wait_all_strands();
426
427 // After strand exits, arena should be reset
428 let stats_after = arena::arena_stats();
429 assert_eq!(
430 stats_after.allocated_bytes, 0,
431 "Arena should be reset after strand exits"
432 );
433 }
434 }
435
436 #[test]
437 fn test_arena_with_channel_send() {
438 unsafe {
439 use crate::channel::{close_channel, make_channel};
440 use crate::stack::{pop, push};
441 use crate::value::Value;
442 use std::sync::atomic::{AtomicU32, Ordering};
443
444 static RECEIVED_COUNT: AtomicU32 = AtomicU32::new(0);
445
446 // Create channel
447 let stack = std::ptr::null_mut();
448 let stack = make_channel(stack);
449 let (stack, chan_val) = pop(stack);
450 let chan_id = match chan_val {
451 Value::Int(id) => id,
452 _ => panic!("Expected channel ID"),
453 };
454
455 // Sender strand: creates arena string, sends through channel
456 extern "C" fn sender(stack: Stack) -> Stack {
457 use crate::channel::send;
458 use crate::seqstring::arena_string;
459 use crate::stack::{pop, push};
460 use crate::value::Value;
461
462 unsafe {
463 // Extract channel ID from stack
464 let (stack, chan_val) = pop(stack);
465 let chan_id = match chan_val {
466 Value::Int(id) => id,
467 _ => panic!("Expected channel ID"),
468 };
469
470 // Create arena string
471 let msg = arena_string("Hello from sender!");
472
473 // Push string and channel ID for send
474 let stack = push(stack, Value::String(msg));
475 let stack = push(stack, Value::Int(chan_id));
476
477 // Send (will clone to global)
478 send(stack)
479 }
480 }
481
482 // Receiver strand: receives string from channel
483 extern "C" fn receiver(stack: Stack) -> Stack {
484 use crate::channel::receive;
485 use crate::stack::{pop, push};
486 use crate::value::Value;
487 use std::sync::atomic::Ordering;
488
489 unsafe {
490 // Extract channel ID from stack
491 let (stack, chan_val) = pop(stack);
492 let chan_id = match chan_val {
493 Value::Int(id) => id,
494 _ => panic!("Expected channel ID"),
495 };
496
497 // Push channel ID for receive
498 let stack = push(stack, Value::Int(chan_id));
499
500 // Receive message
501 let stack = receive(stack);
502
503 // Pop and verify message
504 let (stack, msg_val) = pop(stack);
505 match msg_val {
506 Value::String(s) => {
507 assert_eq!(s.as_str(), "Hello from sender!");
508 RECEIVED_COUNT.fetch_add(1, Ordering::SeqCst);
509 }
510 _ => panic!("Expected String"),
511 }
512
513 stack
514 }
515 }
516
517 // Spawn sender and receiver
518 let sender_stack = push(std::ptr::null_mut(), Value::Int(chan_id));
519 strand_spawn(sender, sender_stack);
520
521 let receiver_stack = push(std::ptr::null_mut(), Value::Int(chan_id));
522 strand_spawn(receiver, receiver_stack);
523
524 // Wait for both strands
525 wait_all_strands();
526
527 // Verify message was received
528 assert_eq!(
529 RECEIVED_COUNT.load(Ordering::SeqCst),
530 1,
531 "Receiver should have received message"
532 );
533
534 // Clean up channel
535 let stack = push(stack, Value::Int(chan_id));
536 close_channel(stack);
537 }
538 }
539
540 #[test]
541 fn test_no_memory_leak_over_many_iterations() {
542 // PR #11 feedback: Verify 10K+ strand iterations don't cause memory growth
543 unsafe {
544 use crate::arena;
545 use crate::seqstring::arena_string;
546
547 extern "C" fn allocate_strings_and_exit(stack: Stack) -> Stack {
548 // Simulate request processing: many temp allocations
549 for i in 0..50 {
550 let temp = arena_string(&format!("request header {}", i));
551 assert!(!temp.as_str().is_empty());
552 // Strings dropped here but arena memory stays allocated
553 }
554 stack
555 }
556
557 // Run many iterations to detect leaks
558 let iterations = 10_000;
559
560 for i in 0..iterations {
561 // Reset arena before each iteration to start fresh
562 arena::arena_reset();
563
564 // Spawn strand, let it allocate strings, then exit
565 strand_spawn(allocate_strings_and_exit, std::ptr::null_mut());
566
567 // Wait for completion (triggers arena reset)
568 wait_all_strands();
569
570 // Every 1000 iterations, verify arena is actually reset
571 if i % 1000 == 0 {
572 let stats = arena::arena_stats();
573 assert_eq!(
574 stats.allocated_bytes, 0,
575 "Arena not reset after iteration {} (leaked {} bytes)",
576 i, stats.allocated_bytes
577 );
578 }
579 }
580
581 // Final verification: arena should be empty
582 let final_stats = arena::arena_stats();
583 assert_eq!(
584 final_stats.allocated_bytes, 0,
585 "Arena leaked memory after {} iterations ({} bytes)",
586 iterations, final_stats.allocated_bytes
587 );
588
589 println!(
590 "✓ Memory leak test passed: {} iterations with no growth",
591 iterations
592 );
593 }
594 }
595}