loom_rs/runtime.rs
1//! Loom runtime implementation.
2//!
3//! The runtime combines a tokio async runtime with a rayon thread pool,
4//! both configured with CPU pinning.
5//!
6//! # Performance
7//!
8//! This module is designed for zero unnecessary overhead:
9//! - `spawn_async()`: ~10ns overhead (TaskTracker token only)
10//! - `spawn_compute()`: ~100-500ns (cross-thread signaling, 0 bytes after warmup)
11//! - `install()`: ~0ns (zero overhead, direct rayon access)
12//!
13//! # Thread Architecture
14//!
15//! ```text
16//! ┌─────────────────────────────────────────────────────────────┐
17//! │ LoomRuntime │
18//! │ pools: ComputePoolRegistry (per-type lock-free pools) │
19//! │ (One pool per result type, shared across all threads) │
20//! └─────────────────────────────────────────────────────────────┘
21//! │ on_thread_start │ start_handler
22//! ▼ ▼
23//! ┌─────────────────────┐ ┌─────────────────────┐
24//! │ Tokio Workers │ │ Rayon Workers │
25//! │ thread_local! { │ │ thread_local! { │
26//! │ RUNTIME: Weak<> │ │ RUNTIME: Weak<> │
27//! │ } │ │ } │
28//! └─────────────────────┘ └─────────────────────┘
29//! ```
30
31use crate::affinity::{pin_to_cpu, CpuAllocator};
32use crate::bridge::{PooledRayonTask, TaskState};
33use crate::config::LoomConfig;
34use crate::context::{clear_current_runtime, set_current_runtime};
35use crate::cpuset::{available_cpus, format_cpuset, parse_and_validate_cpuset};
36use crate::error::{LoomError, Result};
37use crate::pool::ComputePoolRegistry;
38
39use std::future::Future;
40use std::sync::atomic::{AtomicUsize, Ordering};
41use std::sync::{Arc, Weak};
42use tokio::sync::Notify;
43use tokio_util::task::TaskTracker;
44use tracing::{debug, info, warn};
45
46/// State for tracking in-flight compute tasks.
47///
48/// Combines the task counter with a notification mechanism for efficient
49/// shutdown waiting (avoids spin loops).
50struct ComputeTaskState {
51 /// Number of tasks currently executing on rayon
52 count: AtomicUsize,
53 /// Notified when count reaches 0
54 notify: Notify,
55}
56
57impl ComputeTaskState {
58 fn new() -> Self {
59 Self {
60 count: AtomicUsize::new(0),
61 notify: Notify::new(),
62 }
63 }
64}
65
66/// Guard that decrements compute task counter on drop.
67///
68/// Panic-safe: executes even if the task closure panics.
69///
70/// SAFETY: The state lives in LoomRuntimeInner which outlives all rayon tasks
71/// because block_until_idle waits for compute_tasks to reach 0.
72struct ComputeTaskGuard {
73 state: *const ComputeTaskState,
74}
75
76unsafe impl Send for ComputeTaskGuard {}
77
78impl ComputeTaskGuard {
79 fn new(state: &ComputeTaskState) -> Self {
80 state.count.fetch_add(1, Ordering::Relaxed);
81 Self {
82 state: state as *const ComputeTaskState,
83 }
84 }
85}
86
87impl Drop for ComputeTaskGuard {
88 fn drop(&mut self) {
89 // SAFETY: state outlives rayon tasks due to shutdown waiting
90 unsafe {
91 let prev = (*self.state).count.fetch_sub(1, Ordering::Release);
92 if prev == 1 {
93 // Count just went from 1 to 0, notify waiters
94 (*self.state).notify.notify_waiters();
95 }
96 }
97 }
98}
99
100/// A bespoke thread pool runtime combining tokio and rayon with CPU pinning.
101///
102/// The runtime provides:
103/// - A tokio async runtime for I/O-bound work
104/// - A rayon thread pool for CPU-bound parallel work
105/// - Automatic CPU pinning for both runtimes
106/// - A task tracker for graceful shutdown
107/// - Zero-allocation compute spawning after warmup
108///
109/// # Performance Guarantees
110///
111/// | Method | Overhead | Allocations | Tracked |
112/// |--------|----------|-------------|---------|
113/// | `spawn_async()` | ~10ns | Token only | Yes |
114/// | `spawn_compute()` | ~100-500ns | 0 bytes (after warmup) | Yes |
115/// | `install()` | ~0ns | None | No |
116/// | `rayon_pool()` | 0ns | None | No |
117/// | `tokio_handle()` | 0ns | None | No |
118///
119/// # Examples
120///
121/// ```ignore
122/// use loom_rs::LoomBuilder;
123///
124/// let runtime = LoomBuilder::new()
125/// .prefix("myapp")
126/// .tokio_threads(2)
127/// .rayon_threads(6)
128/// .build()?;
129///
130/// runtime.block_on(async {
131/// // Spawn tracked async I/O task
132/// let io_handle = runtime.spawn_async(async {
133/// fetch_data().await
134/// });
135///
136/// // Spawn tracked compute task and await result
137/// let result = runtime.spawn_compute(|| {
138/// expensive_computation()
139/// }).await;
140///
141/// // Zero-overhead parallel iterators (within tracked context)
142/// let processed = runtime.install(|| {
143/// data.par_iter().map(|x| process(x)).collect()
144/// });
145/// });
146///
147/// // Graceful shutdown from main thread
148/// runtime.block_until_idle();
149/// ```
150pub struct LoomRuntime {
151 inner: Arc<LoomRuntimeInner>,
152}
153
154/// Inner state shared with thread-locals.
155///
156/// This is Arc-wrapped and shared with tokio/rayon worker threads via thread-local
157/// storage, enabling `current_runtime()` to work from any managed thread.
158pub struct LoomRuntimeInner {
159 config: LoomConfig,
160 tokio_runtime: tokio::runtime::Runtime,
161 pub(crate) rayon_pool: rayon::ThreadPool,
162 task_tracker: TaskTracker,
163 /// Track in-flight rayon tasks for graceful shutdown
164 compute_state: ComputeTaskState,
165 /// Per-type object pools for zero-allocation spawn_compute
166 pub(crate) pools: ComputePoolRegistry,
167 /// Number of tokio worker threads
168 tokio_threads: usize,
169 /// Number of rayon worker threads
170 rayon_threads: usize,
171 /// CPUs allocated to tokio workers
172 tokio_cpus: Vec<usize>,
173 /// CPUs allocated to rayon workers
174 rayon_cpus: Vec<usize>,
175}
176
177impl LoomRuntime {
178 /// Create a runtime from a configuration.
179 ///
180 /// This is typically called via `LoomBuilder::build()`.
181 pub(crate) fn from_config(config: LoomConfig, pool_size: usize) -> Result<Self> {
182 // Determine available CPUs
183 let cpus = if let Some(ref cpuset_str) = config.cpuset {
184 parse_and_validate_cpuset(cpuset_str)?
185 } else {
186 #[cfg(feature = "cuda")]
187 if let Some(ref selector) = config.cuda_device {
188 crate::cuda::cpuset_for_cuda_device(selector)?
189 } else {
190 available_cpus()
191 }
192 #[cfg(not(feature = "cuda"))]
193 available_cpus()
194 };
195
196 if cpus.is_empty() {
197 return Err(LoomError::NoCpusAvailable);
198 }
199
200 let total_cpus = cpus.len();
201 let tokio_threads = config.effective_tokio_threads();
202 let rayon_threads = config.effective_rayon_threads(total_cpus);
203
204 // Validate we have enough CPUs
205 let total_threads = tokio_threads + rayon_threads;
206 if total_threads > total_cpus {
207 return Err(LoomError::InsufficientCpus {
208 requested: total_threads,
209 available: total_cpus,
210 });
211 }
212
213 // Split CPUs between tokio and rayon
214 let (tokio_cpus, rayon_cpus) = cpus.split_at(tokio_threads.min(cpus.len()));
215 let tokio_cpus = tokio_cpus.to_vec();
216 let rayon_cpus = if rayon_cpus.is_empty() {
217 // If we don't have dedicated rayon CPUs, share with tokio
218 tokio_cpus.clone()
219 } else {
220 rayon_cpus.to_vec()
221 };
222
223 info!(
224 prefix = %config.prefix,
225 tokio_threads,
226 rayon_threads,
227 total_cpus,
228 pool_size,
229 "building loom runtime"
230 );
231
232 // Use Arc<str> for prefix to avoid cloning on each thread start
233 let prefix: Arc<str> = config.prefix.as_str().into();
234
235 // Create the inner runtime first (without tokio/rayon)
236 // We'll use a two-phase approach with OnceCell-like pattern
237 let inner = Arc::new_cyclic(|weak: &Weak<LoomRuntimeInner>| {
238 let weak_clone = weak.clone();
239
240 // Build tokio runtime with thread-local injection
241 let tokio_runtime = Self::build_tokio_runtime(
242 &prefix,
243 tokio_threads,
244 tokio_cpus.clone(),
245 weak_clone.clone(),
246 )
247 .expect("failed to build tokio runtime");
248
249 // Build rayon pool with thread-local injection
250 let rayon_pool =
251 Self::build_rayon_pool(&prefix, rayon_threads, rayon_cpus.clone(), weak_clone)
252 .expect("failed to build rayon pool");
253
254 LoomRuntimeInner {
255 config,
256 tokio_runtime,
257 rayon_pool,
258 task_tracker: TaskTracker::new(),
259 compute_state: ComputeTaskState::new(),
260 pools: ComputePoolRegistry::new(pool_size),
261 tokio_threads,
262 rayon_threads,
263 tokio_cpus,
264 rayon_cpus,
265 }
266 });
267
268 Ok(Self { inner })
269 }
270
271 fn build_tokio_runtime(
272 prefix: &Arc<str>,
273 num_threads: usize,
274 cpus: Vec<usize>,
275 runtime_weak: Weak<LoomRuntimeInner>,
276 ) -> Result<tokio::runtime::Runtime> {
277 let allocator = Arc::new(CpuAllocator::new(cpus));
278 let prefix_clone = Arc::clone(prefix);
279
280 // Thread name counter
281 let thread_counter = Arc::new(AtomicUsize::new(0));
282 let name_prefix = Arc::clone(prefix);
283
284 let start_weak = runtime_weak.clone();
285 let start_allocator = allocator.clone();
286 let start_prefix = prefix_clone.clone();
287
288 let runtime = tokio::runtime::Builder::new_multi_thread()
289 .worker_threads(num_threads)
290 .thread_name_fn(move || {
291 let id = thread_counter.fetch_add(1, Ordering::SeqCst);
292 format!("{}-tokio-{:04}", name_prefix, id)
293 })
294 .on_thread_start(move || {
295 // Pin CPU
296 let cpu_id = start_allocator.allocate();
297 if let Err(e) = pin_to_cpu(cpu_id) {
298 warn!(%e, %start_prefix, cpu_id, "failed to pin tokio thread");
299 } else {
300 debug!(cpu_id, %start_prefix, "pinned tokio thread to CPU");
301 }
302
303 // Inject runtime reference into thread-local
304 set_current_runtime(start_weak.clone());
305 })
306 .on_thread_stop(|| {
307 clear_current_runtime();
308 })
309 .enable_all()
310 .build()?;
311
312 Ok(runtime)
313 }
314
315 fn build_rayon_pool(
316 prefix: &Arc<str>,
317 num_threads: usize,
318 cpus: Vec<usize>,
319 runtime_weak: Weak<LoomRuntimeInner>,
320 ) -> Result<rayon::ThreadPool> {
321 let allocator = Arc::new(CpuAllocator::new(cpus));
322 let name_prefix = Arc::clone(prefix);
323
324 let start_weak = runtime_weak.clone();
325 let start_allocator = allocator.clone();
326 let start_prefix = Arc::clone(prefix);
327
328 let pool = rayon::ThreadPoolBuilder::new()
329 .num_threads(num_threads)
330 .thread_name(move |i| format!("{}-rayon-{:04}", name_prefix, i))
331 .start_handler(move |thread_index| {
332 // Pin CPU
333 let cpu_id = start_allocator.allocate();
334 debug!(thread_index, cpu_id, %start_prefix, "rayon thread starting");
335 if let Err(e) = pin_to_cpu(cpu_id) {
336 warn!(%e, %start_prefix, cpu_id, thread_index, "failed to pin rayon thread");
337 }
338
339 // Inject runtime reference into thread-local
340 set_current_runtime(start_weak.clone());
341 })
342 .exit_handler(|_thread_index| {
343 clear_current_runtime();
344 })
345 .build()?;
346
347 Ok(pool)
348 }
349
350 /// Get the resolved configuration.
351 pub fn config(&self) -> &LoomConfig {
352 &self.inner.config
353 }
354
355 /// Get the tokio runtime handle.
356 ///
357 /// This can be used to spawn untracked tasks or enter the runtime context.
358 /// For tracked async tasks, prefer `spawn_async()`.
359 ///
360 /// # Performance
361 ///
362 /// Zero overhead - returns a reference.
363 pub fn tokio_handle(&self) -> &tokio::runtime::Handle {
364 self.inner.tokio_runtime.handle()
365 }
366
367 /// Get the rayon thread pool.
368 ///
369 /// This can be used to execute parallel iterators or spawn untracked work directly.
370 /// For tracked compute tasks, prefer `spawn_compute()`.
371 /// For zero-overhead parallel iterators, prefer `install()`.
372 ///
373 /// # Performance
374 ///
375 /// Zero overhead - returns a reference.
376 pub fn rayon_pool(&self) -> &rayon::ThreadPool {
377 &self.inner.rayon_pool
378 }
379
380 /// Get the task tracker for graceful shutdown.
381 ///
382 /// Use this to track spawned tasks and wait for them to complete.
383 pub fn task_tracker(&self) -> &TaskTracker {
384 &self.inner.task_tracker
385 }
386
387 /// Block on a future using the tokio runtime.
388 ///
389 /// This is the main entry point for running async code from the main thread.
390 /// The current runtime is available via `loom_rs::current_runtime()` within
391 /// the block_on scope.
392 ///
393 /// # Examples
394 ///
395 /// ```ignore
396 /// runtime.block_on(async {
397 /// // Async code here
398 /// // loom_rs::current_runtime() works here
399 /// });
400 /// ```
401 pub fn block_on<F: Future>(&self, f: F) -> F::Output {
402 // Set current runtime for the main thread during block_on
403 set_current_runtime(Arc::downgrade(&self.inner));
404 let result = self.inner.tokio_runtime.block_on(f);
405 clear_current_runtime();
406 result
407 }
408
409 /// Spawn a tracked async task on tokio.
410 ///
411 /// The task is tracked for graceful shutdown via `block_until_idle()`.
412 ///
413 /// # Performance
414 ///
415 /// Overhead: ~10ns (TaskTracker token only).
416 ///
417 /// # Examples
418 ///
419 /// ```ignore
420 /// runtime.block_on(async {
421 /// let handle = runtime.spawn_async(async {
422 /// // I/O-bound async work
423 /// fetch_data().await
424 /// });
425 ///
426 /// let result = handle.await.unwrap();
427 /// });
428 /// ```
429 #[inline]
430 pub fn spawn_async<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
431 where
432 F: Future + Send + 'static,
433 F::Output: Send + 'static,
434 {
435 let token = self.inner.task_tracker.token();
436 self.inner.tokio_runtime.spawn(async move {
437 let _guard = token;
438 future.await
439 })
440 }
441
442 /// Spawn CPU-bound work on rayon and await the result.
443 ///
444 /// The task is tracked for graceful shutdown via `block_until_idle()`.
445 /// Automatically uses per-type object pools for zero allocation after warmup.
446 ///
447 /// # Performance
448 ///
449 /// | State | Allocations | Overhead |
450 /// |-------|-------------|----------|
451 /// | Pool hit | 0 bytes | ~100-500ns |
452 /// | Pool miss | ~32 bytes | ~100-500ns |
453 /// | First call per type | Pool + state | ~1µs |
454 ///
455 /// For zero-overhead parallel iterators (within an already-tracked context),
456 /// use `install()` instead.
457 ///
458 /// # Examples
459 ///
460 /// ```ignore
461 /// runtime.block_on(async {
462 /// let result = runtime.spawn_compute(|| {
463 /// // CPU-intensive work
464 /// expensive_computation()
465 /// }).await;
466 /// });
467 /// ```
468 #[inline]
469 pub async fn spawn_compute<F, R>(&self, f: F) -> R
470 where
471 F: FnOnce() -> R + Send + 'static,
472 R: Send + 'static,
473 {
474 self.inner.spawn_compute(f).await
475 }
476
477 /// Execute work on rayon with zero overhead (sync, blocking).
478 ///
479 /// This installs the rayon pool for the current scope, allowing direct use
480 /// of rayon's parallel iterators.
481 ///
482 /// **NOT tracked** - use within an already-tracked task (e.g., inside
483 /// `spawn_async` or `spawn_compute`) for proper shutdown tracking.
484 ///
485 /// # Performance
486 ///
487 /// Zero overhead - direct rayon access.
488 ///
489 /// # Examples
490 ///
491 /// ```ignore
492 /// runtime.block_on(async {
493 /// // This is a tracked context (we're in block_on)
494 /// let processed = runtime.install(|| {
495 /// use rayon::prelude::*;
496 /// data.par_iter().map(|x| process(x)).collect::<Vec<_>>()
497 /// });
498 /// });
499 /// ```
500 #[inline]
501 pub fn install<F, R>(&self, f: F) -> R
502 where
503 F: FnOnce() -> R + Send,
504 R: Send,
505 {
506 self.inner.rayon_pool.install(f)
507 }
508
509 /// Stop accepting new tasks.
510 ///
511 /// After calling this, `spawn_async()` and `spawn_compute()` will still
512 /// work, but the shutdown process has begun. Use `is_idle()` or
513 /// `wait_for_shutdown()` to check/wait for completion.
514 pub fn shutdown(&self) {
515 self.inner.task_tracker.close();
516 }
517
518 /// Check if all tracked tasks have completed.
519 ///
520 /// Returns `true` if `shutdown()` has been called and all tracked async
521 /// tasks and compute tasks have finished.
522 ///
523 /// # Performance
524 ///
525 /// Zero overhead - single atomic load.
526 #[inline]
527 pub fn is_idle(&self) -> bool {
528 self.inner.task_tracker.is_closed()
529 && self.inner.task_tracker.is_empty()
530 && self.inner.compute_state.count.load(Ordering::Acquire) == 0
531 }
532
533 /// Get the number of compute tasks currently in flight.
534 ///
535 /// Useful for debugging shutdown issues or monitoring workload.
536 ///
537 /// # Example
538 ///
539 /// ```ignore
540 /// if runtime.compute_tasks_in_flight() > 0 {
541 /// tracing::warn!("Still waiting for {} compute tasks",
542 /// runtime.compute_tasks_in_flight());
543 /// }
544 /// ```
545 #[inline]
546 pub fn compute_tasks_in_flight(&self) -> usize {
547 self.inner.compute_state.count.load(Ordering::Relaxed)
548 }
549
550 /// Wait for all tracked tasks to complete (async).
551 ///
552 /// Call from within `block_on()`. Requires `shutdown()` to be called first,
553 /// otherwise this will wait forever.
554 ///
555 /// # Examples
556 ///
557 /// ```ignore
558 /// runtime.block_on(async {
559 /// runtime.spawn_async(work());
560 /// runtime.shutdown();
561 /// runtime.wait_for_shutdown().await;
562 /// });
563 /// ```
564 pub async fn wait_for_shutdown(&self) {
565 self.inner.task_tracker.wait().await;
566
567 // Wait for compute tasks efficiently (no spin loop)
568 let mut logged = false;
569 loop {
570 let count = self.inner.compute_state.count.load(Ordering::Acquire);
571 if count == 0 {
572 break;
573 }
574 if !logged {
575 debug!(count, "waiting for compute tasks to complete");
576 logged = true;
577 }
578 self.inner.compute_state.notify.notified().await;
579 }
580 }
581
582 /// Block until all tracked tasks complete (from main thread).
583 ///
584 /// This is the primary shutdown method. It:
585 /// 1. Calls `shutdown()` to close the task tracker
586 /// 2. Waits for all tracked async and compute tasks to finish
587 ///
588 /// # Examples
589 ///
590 /// ```ignore
591 /// runtime.block_on(async {
592 /// runtime.spawn_async(background_work());
593 /// runtime.spawn_compute(|| cpu_work());
594 /// });
595 ///
596 /// // Graceful shutdown from main thread
597 /// runtime.block_until_idle();
598 /// ```
599 pub fn block_until_idle(&self) {
600 self.shutdown();
601 self.block_on(self.wait_for_shutdown());
602 }
603}
604
605impl LoomRuntimeInner {
606 /// Spawn CPU-bound work on rayon and await the result.
607 ///
608 /// Uses per-type object pools for zero allocation after warmup.
609 #[inline]
610 pub async fn spawn_compute<F, R>(&self, f: F) -> R
611 where
612 F: FnOnce() -> R + Send + 'static,
613 R: Send + 'static,
614 {
615 let pool = self.pools.get_or_create::<R>();
616
617 // Try to get state from pool, or allocate new
618 let state = pool.pop().unwrap_or_else(|| Arc::new(TaskState::new()));
619
620 // Create the pooled task
621 let (task, completion, state_for_return) = PooledRayonTask::new(state);
622
623 // Create guard BEFORE spawning - it increments counter in constructor
624 let guard = ComputeTaskGuard::new(&self.compute_state);
625
626 self.rayon_pool.spawn(move || {
627 // Execute work inside guard scope so counter decrements BEFORE completing.
628 // This ensures the async future sees count=0 when it wakes up.
629 let result = {
630 let _guard = guard;
631 f()
632 };
633 completion.complete(result);
634 });
635
636 let result = task.await;
637
638 // Return state to pool for reuse
639 state_for_return.reset();
640 pool.push(state_for_return);
641
642 result
643 }
644}
645
646impl std::fmt::Debug for LoomRuntime {
647 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
648 f.debug_struct("LoomRuntime")
649 .field("config", &self.inner.config)
650 .field(
651 "compute_tasks_in_flight",
652 &self.inner.compute_state.count.load(Ordering::Relaxed),
653 )
654 .finish_non_exhaustive()
655 }
656}
657
658impl std::fmt::Display for LoomRuntime {
659 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
660 write!(
661 f,
662 "LoomRuntime[{}]: tokio({}, cpus={}) rayon({}, cpus={})",
663 self.inner.config.prefix,
664 self.inner.tokio_threads,
665 format_cpuset(&self.inner.tokio_cpus),
666 self.inner.rayon_threads,
667 format_cpuset(&self.inner.rayon_cpus),
668 )
669 }
670}
671
672#[cfg(test)]
673mod tests {
674 use super::*;
675 use crate::pool::DEFAULT_POOL_SIZE;
676
677 fn test_config() -> LoomConfig {
678 LoomConfig {
679 prefix: "test".to_string(),
680 cpuset: None,
681 tokio_threads: Some(1),
682 rayon_threads: Some(1),
683 compute_pool_size: DEFAULT_POOL_SIZE,
684 #[cfg(feature = "cuda")]
685 cuda_device: None,
686 }
687 }
688
689 #[test]
690 fn test_runtime_creation() {
691 let config = test_config();
692 let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
693 assert_eq!(runtime.config().prefix, "test");
694 }
695
696 #[test]
697 fn test_block_on() {
698 let config = test_config();
699 let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
700
701 let result = runtime.block_on(async { 42 });
702 assert_eq!(result, 42);
703 }
704
705 #[test]
706 fn test_spawn_compute() {
707 let config = test_config();
708 let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
709
710 let result =
711 runtime.block_on(async { runtime.spawn_compute(|| (0..100).sum::<i32>()).await });
712 assert_eq!(result, 4950);
713 }
714
715 #[test]
716 fn test_spawn_async() {
717 let config = test_config();
718 let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
719
720 let result = runtime.block_on(async {
721 let handle = runtime.spawn_async(async { 42 });
722 handle.await.unwrap()
723 });
724 assert_eq!(result, 42);
725 }
726
727 #[test]
728 fn test_install() {
729 let config = test_config();
730 let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
731
732 let result = runtime.install(|| {
733 use rayon::prelude::*;
734 (0..100).into_par_iter().sum::<i32>()
735 });
736 assert_eq!(result, 4950);
737 }
738
739 #[test]
740 fn test_shutdown_and_idle() {
741 let config = test_config();
742 let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
743
744 // Initially not idle (tracker not closed)
745 assert!(!runtime.is_idle());
746
747 // After shutdown with no tasks, should be idle
748 runtime.shutdown();
749 assert!(runtime.is_idle());
750 }
751
752 #[test]
753 fn test_block_until_idle() {
754 let config = test_config();
755 let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
756
757 runtime.block_on(async {
758 runtime.spawn_async(async { 42 });
759 runtime.spawn_compute(|| 100).await;
760 });
761
762 runtime.block_until_idle();
763 assert!(runtime.is_idle());
764 }
765
766 #[test]
767 fn test_insufficient_cpus_error() {
768 let mut config = test_config();
769 config.cpuset = Some("0".to_string()); // Only 1 CPU
770 config.tokio_threads = Some(2);
771 config.rayon_threads = Some(2);
772
773 let result = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE);
774 assert!(matches!(result, Err(LoomError::InsufficientCpus { .. })));
775 }
776
777 #[test]
778 fn test_current_runtime_in_block_on() {
779 let config = test_config();
780 let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
781
782 runtime.block_on(async {
783 // current_runtime should work inside block_on
784 let current = crate::context::current_runtime();
785 assert!(current.is_some());
786 });
787
788 // Outside block_on, should be None
789 assert!(crate::context::current_runtime().is_none());
790 }
791
792 #[test]
793 fn test_spawn_compute_pooling() {
794 let config = test_config();
795 let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
796
797 // Warmup - first call allocates
798 runtime.block_on(async {
799 runtime.spawn_compute(|| 1i32).await;
800 });
801
802 // Subsequent calls should reuse pooled state (we can't easily verify this
803 // without internal access, but we can verify it works)
804 runtime.block_on(async {
805 for i in 0..100 {
806 let result = runtime.spawn_compute(move || i).await;
807 assert_eq!(result, i);
808 }
809 });
810 }
811
812 #[test]
813 fn test_spawn_compute_guard_drops_on_scope_exit() {
814 // This test verifies the guard's Drop implementation works correctly.
815 // We can't easily test panic behavior in rayon (panics abort by default),
816 // but we can verify the guard decrements the counter when it goes out of scope.
817 use std::sync::atomic::Ordering;
818
819 let state = super::ComputeTaskState::new();
820
821 // Create a guard (increments counter)
822 {
823 let _guard = super::ComputeTaskGuard::new(&state);
824 assert_eq!(state.count.load(Ordering::Relaxed), 1);
825 }
826 // Guard dropped, counter should be 0
827 assert_eq!(state.count.load(Ordering::Relaxed), 0);
828
829 // Test multiple guards
830 let state = super::ComputeTaskState::new();
831
832 let guard1 = super::ComputeTaskGuard::new(&state);
833 assert_eq!(state.count.load(Ordering::Relaxed), 1);
834
835 let guard2 = super::ComputeTaskGuard::new(&state);
836 assert_eq!(state.count.load(Ordering::Relaxed), 2);
837
838 drop(guard1);
839 assert_eq!(state.count.load(Ordering::Relaxed), 1);
840
841 drop(guard2);
842 assert_eq!(state.count.load(Ordering::Relaxed), 0);
843
844 // The notification mechanism is verified by the fact that wait_for_shutdown
845 // doesn't spin-loop forever when compute tasks complete
846 }
847
848 #[test]
849 fn test_compute_tasks_in_flight() {
850 let config = test_config();
851 let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
852
853 // Initially no tasks
854 assert_eq!(runtime.compute_tasks_in_flight(), 0);
855
856 // After spawning and completing, should be back to 0
857 runtime.block_on(async {
858 runtime.spawn_compute(|| 42).await;
859 });
860 assert_eq!(runtime.compute_tasks_in_flight(), 0);
861 }
862
863 #[test]
864 fn test_display() {
865 let config = test_config();
866 let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
867
868 let display = format!("{}", runtime);
869 assert!(display.starts_with("LoomRuntime[test]:"));
870 assert!(display.contains("tokio(1, cpus="));
871 assert!(display.contains("rayon(1, cpus="));
872 }
873}