Skip to main content

loom_rs/
runtime.rs

1//! Loom runtime implementation.
2//!
3//! The runtime combines a tokio async runtime with a rayon thread pool,
4//! both configured with CPU pinning.
5//!
6//! # Performance
7//!
8//! This module is designed for zero unnecessary overhead:
9//! - `spawn_async()`: ~10ns overhead (TaskTracker token only)
10//! - `spawn_compute()`: ~100-500ns (cross-thread signaling, 0 bytes after warmup)
11//! - `install()`: ~0ns (zero overhead, direct rayon access)
12//!
13//! # Thread Architecture
14//!
15//! ```text
16//! ┌─────────────────────────────────────────────────────────────┐
17//! │                     LoomRuntime                              │
18//! │  pools: ComputePoolRegistry (per-type lock-free pools)      │
19//! │  (One pool per result type, shared across all threads)      │
20//! └─────────────────────────────────────────────────────────────┘
21//!          │ on_thread_start           │ start_handler
22//!          ▼                           ▼
23//! ┌─────────────────────┐     ┌─────────────────────┐
24//! │   Tokio Workers     │     │   Rayon Workers     │
25//! │  thread_local! {    │     │  thread_local! {    │
26//! │    RUNTIME: Weak<>  │     │    RUNTIME: Weak<>  │
27//! │  }                  │     │  }                  │
28//! └─────────────────────┘     └─────────────────────┘
29//! ```
30
31use crate::affinity::{pin_to_cpu, CpuAllocator};
32use crate::bridge::{PooledRayonTask, TaskState};
33use crate::config::LoomConfig;
34use crate::context::{clear_current_runtime, set_current_runtime};
35use crate::cpuset::{available_cpus, format_cpuset, parse_and_validate_cpuset};
36use crate::error::{LoomError, Result};
37use crate::pool::ComputePoolRegistry;
38
39use std::future::Future;
40use std::sync::atomic::{AtomicUsize, Ordering};
41use std::sync::{Arc, Weak};
42use tokio::sync::Notify;
43use tokio_util::task::TaskTracker;
44use tracing::{debug, info, warn};
45
46/// State for tracking in-flight compute tasks.
47///
48/// Combines the task counter with a notification mechanism for efficient
49/// shutdown waiting (avoids spin loops).
50struct ComputeTaskState {
51    /// Number of tasks currently executing on rayon
52    count: AtomicUsize,
53    /// Notified when count reaches 0
54    notify: Notify,
55}
56
57impl ComputeTaskState {
58    fn new() -> Self {
59        Self {
60            count: AtomicUsize::new(0),
61            notify: Notify::new(),
62        }
63    }
64}
65
66/// Guard that decrements compute task counter on drop.
67///
68/// Panic-safe: executes even if the task closure panics.
69///
70/// SAFETY: The state lives in LoomRuntimeInner which outlives all rayon tasks
71/// because block_until_idle waits for compute_tasks to reach 0.
72struct ComputeTaskGuard {
73    state: *const ComputeTaskState,
74}
75
76unsafe impl Send for ComputeTaskGuard {}
77
78impl ComputeTaskGuard {
79    fn new(state: &ComputeTaskState) -> Self {
80        state.count.fetch_add(1, Ordering::Relaxed);
81        Self {
82            state: state as *const ComputeTaskState,
83        }
84    }
85}
86
87impl Drop for ComputeTaskGuard {
88    fn drop(&mut self) {
89        // SAFETY: state outlives rayon tasks due to shutdown waiting
90        unsafe {
91            let prev = (*self.state).count.fetch_sub(1, Ordering::Release);
92            if prev == 1 {
93                // Count just went from 1 to 0, notify waiters
94                (*self.state).notify.notify_waiters();
95            }
96        }
97    }
98}
99
100/// A bespoke thread pool runtime combining tokio and rayon with CPU pinning.
101///
102/// The runtime provides:
103/// - A tokio async runtime for I/O-bound work
104/// - A rayon thread pool for CPU-bound parallel work
105/// - Automatic CPU pinning for both runtimes
106/// - A task tracker for graceful shutdown
107/// - Zero-allocation compute spawning after warmup
108///
109/// # Performance Guarantees
110///
111/// | Method | Overhead | Allocations | Tracked |
112/// |--------|----------|-------------|---------|
113/// | `spawn_async()` | ~10ns | Token only | Yes |
114/// | `spawn_compute()` | ~100-500ns | 0 bytes (after warmup) | Yes |
115/// | `install()` | ~0ns | None | No |
116/// | `rayon_pool()` | 0ns | None | No |
117/// | `tokio_handle()` | 0ns | None | No |
118///
119/// # Examples
120///
121/// ```ignore
122/// use loom_rs::LoomBuilder;
123///
124/// let runtime = LoomBuilder::new()
125///     .prefix("myapp")
126///     .tokio_threads(2)
127///     .rayon_threads(6)
128///     .build()?;
129///
130/// runtime.block_on(async {
131///     // Spawn tracked async I/O task
132///     let io_handle = runtime.spawn_async(async {
133///         fetch_data().await
134///     });
135///
136///     // Spawn tracked compute task and await result
137///     let result = runtime.spawn_compute(|| {
138///         expensive_computation()
139///     }).await;
140///
141///     // Zero-overhead parallel iterators (within tracked context)
142///     let processed = runtime.install(|| {
143///         data.par_iter().map(|x| process(x)).collect()
144///     });
145/// });
146///
147/// // Graceful shutdown from main thread
148/// runtime.block_until_idle();
149/// ```
150pub struct LoomRuntime {
151    inner: Arc<LoomRuntimeInner>,
152}
153
154/// Inner state shared with thread-locals.
155///
156/// This is Arc-wrapped and shared with tokio/rayon worker threads via thread-local
157/// storage, enabling `current_runtime()` to work from any managed thread.
158pub struct LoomRuntimeInner {
159    config: LoomConfig,
160    tokio_runtime: tokio::runtime::Runtime,
161    pub(crate) rayon_pool: rayon::ThreadPool,
162    task_tracker: TaskTracker,
163    /// Track in-flight rayon tasks for graceful shutdown
164    compute_state: ComputeTaskState,
165    /// Per-type object pools for zero-allocation spawn_compute
166    pub(crate) pools: ComputePoolRegistry,
167    /// Number of tokio worker threads
168    tokio_threads: usize,
169    /// Number of rayon worker threads
170    rayon_threads: usize,
171    /// CPUs allocated to tokio workers
172    tokio_cpus: Vec<usize>,
173    /// CPUs allocated to rayon workers
174    rayon_cpus: Vec<usize>,
175}
176
177impl LoomRuntime {
178    /// Create a runtime from a configuration.
179    ///
180    /// This is typically called via `LoomBuilder::build()`.
181    pub(crate) fn from_config(config: LoomConfig, pool_size: usize) -> Result<Self> {
182        // Determine available CPUs
183        let cpus = if let Some(ref cpuset_str) = config.cpuset {
184            parse_and_validate_cpuset(cpuset_str)?
185        } else {
186            #[cfg(feature = "cuda")]
187            if let Some(ref selector) = config.cuda_device {
188                crate::cuda::cpuset_for_cuda_device(selector)?
189            } else {
190                available_cpus()
191            }
192            #[cfg(not(feature = "cuda"))]
193            available_cpus()
194        };
195
196        if cpus.is_empty() {
197            return Err(LoomError::NoCpusAvailable);
198        }
199
200        let total_cpus = cpus.len();
201        let tokio_threads = config.effective_tokio_threads();
202        let rayon_threads = config.effective_rayon_threads(total_cpus);
203
204        // Validate we have enough CPUs
205        let total_threads = tokio_threads + rayon_threads;
206        if total_threads > total_cpus {
207            return Err(LoomError::InsufficientCpus {
208                requested: total_threads,
209                available: total_cpus,
210            });
211        }
212
213        // Split CPUs between tokio and rayon
214        let (tokio_cpus, rayon_cpus) = cpus.split_at(tokio_threads.min(cpus.len()));
215        let tokio_cpus = tokio_cpus.to_vec();
216        let rayon_cpus = if rayon_cpus.is_empty() {
217            // If we don't have dedicated rayon CPUs, share with tokio
218            tokio_cpus.clone()
219        } else {
220            rayon_cpus.to_vec()
221        };
222
223        info!(
224            prefix = %config.prefix,
225            tokio_threads,
226            rayon_threads,
227            total_cpus,
228            pool_size,
229            "building loom runtime"
230        );
231
232        // Use Arc<str> for prefix to avoid cloning on each thread start
233        let prefix: Arc<str> = config.prefix.as_str().into();
234
235        // Create the inner runtime first (without tokio/rayon)
236        // We'll use a two-phase approach with OnceCell-like pattern
237        let inner = Arc::new_cyclic(|weak: &Weak<LoomRuntimeInner>| {
238            let weak_clone = weak.clone();
239
240            // Build tokio runtime with thread-local injection
241            let tokio_runtime = Self::build_tokio_runtime(
242                &prefix,
243                tokio_threads,
244                tokio_cpus.clone(),
245                weak_clone.clone(),
246            )
247            .expect("failed to build tokio runtime");
248
249            // Build rayon pool with thread-local injection
250            let rayon_pool =
251                Self::build_rayon_pool(&prefix, rayon_threads, rayon_cpus.clone(), weak_clone)
252                    .expect("failed to build rayon pool");
253
254            LoomRuntimeInner {
255                config,
256                tokio_runtime,
257                rayon_pool,
258                task_tracker: TaskTracker::new(),
259                compute_state: ComputeTaskState::new(),
260                pools: ComputePoolRegistry::new(pool_size),
261                tokio_threads,
262                rayon_threads,
263                tokio_cpus,
264                rayon_cpus,
265            }
266        });
267
268        Ok(Self { inner })
269    }
270
271    fn build_tokio_runtime(
272        prefix: &Arc<str>,
273        num_threads: usize,
274        cpus: Vec<usize>,
275        runtime_weak: Weak<LoomRuntimeInner>,
276    ) -> Result<tokio::runtime::Runtime> {
277        let allocator = Arc::new(CpuAllocator::new(cpus));
278        let prefix_clone = Arc::clone(prefix);
279
280        // Thread name counter
281        let thread_counter = Arc::new(AtomicUsize::new(0));
282        let name_prefix = Arc::clone(prefix);
283
284        let start_weak = runtime_weak.clone();
285        let start_allocator = allocator.clone();
286        let start_prefix = prefix_clone.clone();
287
288        let runtime = tokio::runtime::Builder::new_multi_thread()
289            .worker_threads(num_threads)
290            .thread_name_fn(move || {
291                let id = thread_counter.fetch_add(1, Ordering::SeqCst);
292                format!("{}-tokio-{:04}", name_prefix, id)
293            })
294            .on_thread_start(move || {
295                // Pin CPU
296                let cpu_id = start_allocator.allocate();
297                if let Err(e) = pin_to_cpu(cpu_id) {
298                    warn!(%e, %start_prefix, cpu_id, "failed to pin tokio thread");
299                } else {
300                    debug!(cpu_id, %start_prefix, "pinned tokio thread to CPU");
301                }
302
303                // Inject runtime reference into thread-local
304                set_current_runtime(start_weak.clone());
305            })
306            .on_thread_stop(|| {
307                clear_current_runtime();
308            })
309            .enable_all()
310            .build()?;
311
312        Ok(runtime)
313    }
314
315    fn build_rayon_pool(
316        prefix: &Arc<str>,
317        num_threads: usize,
318        cpus: Vec<usize>,
319        runtime_weak: Weak<LoomRuntimeInner>,
320    ) -> Result<rayon::ThreadPool> {
321        let allocator = Arc::new(CpuAllocator::new(cpus));
322        let name_prefix = Arc::clone(prefix);
323
324        let start_weak = runtime_weak.clone();
325        let start_allocator = allocator.clone();
326        let start_prefix = Arc::clone(prefix);
327
328        let pool = rayon::ThreadPoolBuilder::new()
329            .num_threads(num_threads)
330            .thread_name(move |i| format!("{}-rayon-{:04}", name_prefix, i))
331            .start_handler(move |thread_index| {
332                // Pin CPU
333                let cpu_id = start_allocator.allocate();
334                debug!(thread_index, cpu_id, %start_prefix, "rayon thread starting");
335                if let Err(e) = pin_to_cpu(cpu_id) {
336                    warn!(%e, %start_prefix, cpu_id, thread_index, "failed to pin rayon thread");
337                }
338
339                // Inject runtime reference into thread-local
340                set_current_runtime(start_weak.clone());
341            })
342            .exit_handler(|_thread_index| {
343                clear_current_runtime();
344            })
345            .build()?;
346
347        Ok(pool)
348    }
349
350    /// Get the resolved configuration.
351    pub fn config(&self) -> &LoomConfig {
352        &self.inner.config
353    }
354
355    /// Get the tokio runtime handle.
356    ///
357    /// This can be used to spawn untracked tasks or enter the runtime context.
358    /// For tracked async tasks, prefer `spawn_async()`.
359    ///
360    /// # Performance
361    ///
362    /// Zero overhead - returns a reference.
363    pub fn tokio_handle(&self) -> &tokio::runtime::Handle {
364        self.inner.tokio_runtime.handle()
365    }
366
367    /// Get the rayon thread pool.
368    ///
369    /// This can be used to execute parallel iterators or spawn untracked work directly.
370    /// For tracked compute tasks, prefer `spawn_compute()`.
371    /// For zero-overhead parallel iterators, prefer `install()`.
372    ///
373    /// # Performance
374    ///
375    /// Zero overhead - returns a reference.
376    pub fn rayon_pool(&self) -> &rayon::ThreadPool {
377        &self.inner.rayon_pool
378    }
379
380    /// Get the task tracker for graceful shutdown.
381    ///
382    /// Use this to track spawned tasks and wait for them to complete.
383    pub fn task_tracker(&self) -> &TaskTracker {
384        &self.inner.task_tracker
385    }
386
387    /// Block on a future using the tokio runtime.
388    ///
389    /// This is the main entry point for running async code from the main thread.
390    /// The current runtime is available via `loom_rs::current_runtime()` within
391    /// the block_on scope.
392    ///
393    /// # Examples
394    ///
395    /// ```ignore
396    /// runtime.block_on(async {
397    ///     // Async code here
398    ///     // loom_rs::current_runtime() works here
399    /// });
400    /// ```
401    pub fn block_on<F: Future>(&self, f: F) -> F::Output {
402        // Set current runtime for the main thread during block_on
403        set_current_runtime(Arc::downgrade(&self.inner));
404        let result = self.inner.tokio_runtime.block_on(f);
405        clear_current_runtime();
406        result
407    }
408
409    /// Spawn a tracked async task on tokio.
410    ///
411    /// The task is tracked for graceful shutdown via `block_until_idle()`.
412    ///
413    /// # Performance
414    ///
415    /// Overhead: ~10ns (TaskTracker token only).
416    ///
417    /// # Examples
418    ///
419    /// ```ignore
420    /// runtime.block_on(async {
421    ///     let handle = runtime.spawn_async(async {
422    ///         // I/O-bound async work
423    ///         fetch_data().await
424    ///     });
425    ///
426    ///     let result = handle.await.unwrap();
427    /// });
428    /// ```
429    #[inline]
430    pub fn spawn_async<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
431    where
432        F: Future + Send + 'static,
433        F::Output: Send + 'static,
434    {
435        let token = self.inner.task_tracker.token();
436        self.inner.tokio_runtime.spawn(async move {
437            let _guard = token;
438            future.await
439        })
440    }
441
442    /// Spawn CPU-bound work on rayon and await the result.
443    ///
444    /// The task is tracked for graceful shutdown via `block_until_idle()`.
445    /// Automatically uses per-type object pools for zero allocation after warmup.
446    ///
447    /// # Performance
448    ///
449    /// | State | Allocations | Overhead |
450    /// |-------|-------------|----------|
451    /// | Pool hit | 0 bytes | ~100-500ns |
452    /// | Pool miss | ~32 bytes | ~100-500ns |
453    /// | First call per type | Pool + state | ~1µs |
454    ///
455    /// For zero-overhead parallel iterators (within an already-tracked context),
456    /// use `install()` instead.
457    ///
458    /// # Examples
459    ///
460    /// ```ignore
461    /// runtime.block_on(async {
462    ///     let result = runtime.spawn_compute(|| {
463    ///         // CPU-intensive work
464    ///         expensive_computation()
465    ///     }).await;
466    /// });
467    /// ```
468    #[inline]
469    pub async fn spawn_compute<F, R>(&self, f: F) -> R
470    where
471        F: FnOnce() -> R + Send + 'static,
472        R: Send + 'static,
473    {
474        self.inner.spawn_compute(f).await
475    }
476
477    /// Execute work on rayon with zero overhead (sync, blocking).
478    ///
479    /// This installs the rayon pool for the current scope, allowing direct use
480    /// of rayon's parallel iterators.
481    ///
482    /// **NOT tracked** - use within an already-tracked task (e.g., inside
483    /// `spawn_async` or `spawn_compute`) for proper shutdown tracking.
484    ///
485    /// # Performance
486    ///
487    /// Zero overhead - direct rayon access.
488    ///
489    /// # Examples
490    ///
491    /// ```ignore
492    /// runtime.block_on(async {
493    ///     // This is a tracked context (we're in block_on)
494    ///     let processed = runtime.install(|| {
495    ///         use rayon::prelude::*;
496    ///         data.par_iter().map(|x| process(x)).collect::<Vec<_>>()
497    ///     });
498    /// });
499    /// ```
500    #[inline]
501    pub fn install<F, R>(&self, f: F) -> R
502    where
503        F: FnOnce() -> R + Send,
504        R: Send,
505    {
506        self.inner.rayon_pool.install(f)
507    }
508
509    /// Stop accepting new tasks.
510    ///
511    /// After calling this, `spawn_async()` and `spawn_compute()` will still
512    /// work, but the shutdown process has begun. Use `is_idle()` or
513    /// `wait_for_shutdown()` to check/wait for completion.
514    pub fn shutdown(&self) {
515        self.inner.task_tracker.close();
516    }
517
518    /// Check if all tracked tasks have completed.
519    ///
520    /// Returns `true` if `shutdown()` has been called and all tracked async
521    /// tasks and compute tasks have finished.
522    ///
523    /// # Performance
524    ///
525    /// Zero overhead - single atomic load.
526    #[inline]
527    pub fn is_idle(&self) -> bool {
528        self.inner.task_tracker.is_closed()
529            && self.inner.task_tracker.is_empty()
530            && self.inner.compute_state.count.load(Ordering::Acquire) == 0
531    }
532
533    /// Get the number of compute tasks currently in flight.
534    ///
535    /// Useful for debugging shutdown issues or monitoring workload.
536    ///
537    /// # Example
538    ///
539    /// ```ignore
540    /// if runtime.compute_tasks_in_flight() > 0 {
541    ///     tracing::warn!("Still waiting for {} compute tasks",
542    ///         runtime.compute_tasks_in_flight());
543    /// }
544    /// ```
545    #[inline]
546    pub fn compute_tasks_in_flight(&self) -> usize {
547        self.inner.compute_state.count.load(Ordering::Relaxed)
548    }
549
550    /// Wait for all tracked tasks to complete (async).
551    ///
552    /// Call from within `block_on()`. Requires `shutdown()` to be called first,
553    /// otherwise this will wait forever.
554    ///
555    /// # Examples
556    ///
557    /// ```ignore
558    /// runtime.block_on(async {
559    ///     runtime.spawn_async(work());
560    ///     runtime.shutdown();
561    ///     runtime.wait_for_shutdown().await;
562    /// });
563    /// ```
564    pub async fn wait_for_shutdown(&self) {
565        self.inner.task_tracker.wait().await;
566
567        // Wait for compute tasks efficiently (no spin loop)
568        let mut logged = false;
569        loop {
570            let count = self.inner.compute_state.count.load(Ordering::Acquire);
571            if count == 0 {
572                break;
573            }
574            if !logged {
575                debug!(count, "waiting for compute tasks to complete");
576                logged = true;
577            }
578            self.inner.compute_state.notify.notified().await;
579        }
580    }
581
582    /// Block until all tracked tasks complete (from main thread).
583    ///
584    /// This is the primary shutdown method. It:
585    /// 1. Calls `shutdown()` to close the task tracker
586    /// 2. Waits for all tracked async and compute tasks to finish
587    ///
588    /// # Examples
589    ///
590    /// ```ignore
591    /// runtime.block_on(async {
592    ///     runtime.spawn_async(background_work());
593    ///     runtime.spawn_compute(|| cpu_work());
594    /// });
595    ///
596    /// // Graceful shutdown from main thread
597    /// runtime.block_until_idle();
598    /// ```
599    pub fn block_until_idle(&self) {
600        self.shutdown();
601        self.block_on(self.wait_for_shutdown());
602    }
603}
604
605impl LoomRuntimeInner {
606    /// Spawn CPU-bound work on rayon and await the result.
607    ///
608    /// Uses per-type object pools for zero allocation after warmup.
609    #[inline]
610    pub async fn spawn_compute<F, R>(&self, f: F) -> R
611    where
612        F: FnOnce() -> R + Send + 'static,
613        R: Send + 'static,
614    {
615        let pool = self.pools.get_or_create::<R>();
616
617        // Try to get state from pool, or allocate new
618        let state = pool.pop().unwrap_or_else(|| Arc::new(TaskState::new()));
619
620        // Create the pooled task
621        let (task, completion, state_for_return) = PooledRayonTask::new(state);
622
623        // Create guard BEFORE spawning - it increments counter in constructor
624        let guard = ComputeTaskGuard::new(&self.compute_state);
625
626        self.rayon_pool.spawn(move || {
627            // Execute work inside guard scope so counter decrements BEFORE completing.
628            // This ensures the async future sees count=0 when it wakes up.
629            let result = {
630                let _guard = guard;
631                f()
632            };
633            completion.complete(result);
634        });
635
636        let result = task.await;
637
638        // Return state to pool for reuse
639        state_for_return.reset();
640        pool.push(state_for_return);
641
642        result
643    }
644}
645
646impl std::fmt::Debug for LoomRuntime {
647    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
648        f.debug_struct("LoomRuntime")
649            .field("config", &self.inner.config)
650            .field(
651                "compute_tasks_in_flight",
652                &self.inner.compute_state.count.load(Ordering::Relaxed),
653            )
654            .finish_non_exhaustive()
655    }
656}
657
658impl std::fmt::Display for LoomRuntime {
659    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
660        write!(
661            f,
662            "LoomRuntime[{}]: tokio({}, cpus={}) rayon({}, cpus={})",
663            self.inner.config.prefix,
664            self.inner.tokio_threads,
665            format_cpuset(&self.inner.tokio_cpus),
666            self.inner.rayon_threads,
667            format_cpuset(&self.inner.rayon_cpus),
668        )
669    }
670}
671
672#[cfg(test)]
673mod tests {
674    use super::*;
675    use crate::pool::DEFAULT_POOL_SIZE;
676
677    fn test_config() -> LoomConfig {
678        LoomConfig {
679            prefix: "test".to_string(),
680            cpuset: None,
681            tokio_threads: Some(1),
682            rayon_threads: Some(1),
683            compute_pool_size: DEFAULT_POOL_SIZE,
684            #[cfg(feature = "cuda")]
685            cuda_device: None,
686        }
687    }
688
689    #[test]
690    fn test_runtime_creation() {
691        let config = test_config();
692        let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
693        assert_eq!(runtime.config().prefix, "test");
694    }
695
696    #[test]
697    fn test_block_on() {
698        let config = test_config();
699        let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
700
701        let result = runtime.block_on(async { 42 });
702        assert_eq!(result, 42);
703    }
704
705    #[test]
706    fn test_spawn_compute() {
707        let config = test_config();
708        let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
709
710        let result =
711            runtime.block_on(async { runtime.spawn_compute(|| (0..100).sum::<i32>()).await });
712        assert_eq!(result, 4950);
713    }
714
715    #[test]
716    fn test_spawn_async() {
717        let config = test_config();
718        let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
719
720        let result = runtime.block_on(async {
721            let handle = runtime.spawn_async(async { 42 });
722            handle.await.unwrap()
723        });
724        assert_eq!(result, 42);
725    }
726
727    #[test]
728    fn test_install() {
729        let config = test_config();
730        let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
731
732        let result = runtime.install(|| {
733            use rayon::prelude::*;
734            (0..100).into_par_iter().sum::<i32>()
735        });
736        assert_eq!(result, 4950);
737    }
738
739    #[test]
740    fn test_shutdown_and_idle() {
741        let config = test_config();
742        let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
743
744        // Initially not idle (tracker not closed)
745        assert!(!runtime.is_idle());
746
747        // After shutdown with no tasks, should be idle
748        runtime.shutdown();
749        assert!(runtime.is_idle());
750    }
751
752    #[test]
753    fn test_block_until_idle() {
754        let config = test_config();
755        let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
756
757        runtime.block_on(async {
758            runtime.spawn_async(async { 42 });
759            runtime.spawn_compute(|| 100).await;
760        });
761
762        runtime.block_until_idle();
763        assert!(runtime.is_idle());
764    }
765
766    #[test]
767    fn test_insufficient_cpus_error() {
768        let mut config = test_config();
769        config.cpuset = Some("0".to_string()); // Only 1 CPU
770        config.tokio_threads = Some(2);
771        config.rayon_threads = Some(2);
772
773        let result = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE);
774        assert!(matches!(result, Err(LoomError::InsufficientCpus { .. })));
775    }
776
777    #[test]
778    fn test_current_runtime_in_block_on() {
779        let config = test_config();
780        let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
781
782        runtime.block_on(async {
783            // current_runtime should work inside block_on
784            let current = crate::context::current_runtime();
785            assert!(current.is_some());
786        });
787
788        // Outside block_on, should be None
789        assert!(crate::context::current_runtime().is_none());
790    }
791
792    #[test]
793    fn test_spawn_compute_pooling() {
794        let config = test_config();
795        let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
796
797        // Warmup - first call allocates
798        runtime.block_on(async {
799            runtime.spawn_compute(|| 1i32).await;
800        });
801
802        // Subsequent calls should reuse pooled state (we can't easily verify this
803        // without internal access, but we can verify it works)
804        runtime.block_on(async {
805            for i in 0..100 {
806                let result = runtime.spawn_compute(move || i).await;
807                assert_eq!(result, i);
808            }
809        });
810    }
811
812    #[test]
813    fn test_spawn_compute_guard_drops_on_scope_exit() {
814        // This test verifies the guard's Drop implementation works correctly.
815        // We can't easily test panic behavior in rayon (panics abort by default),
816        // but we can verify the guard decrements the counter when it goes out of scope.
817        use std::sync::atomic::Ordering;
818
819        let state = super::ComputeTaskState::new();
820
821        // Create a guard (increments counter)
822        {
823            let _guard = super::ComputeTaskGuard::new(&state);
824            assert_eq!(state.count.load(Ordering::Relaxed), 1);
825        }
826        // Guard dropped, counter should be 0
827        assert_eq!(state.count.load(Ordering::Relaxed), 0);
828
829        // Test multiple guards
830        let state = super::ComputeTaskState::new();
831
832        let guard1 = super::ComputeTaskGuard::new(&state);
833        assert_eq!(state.count.load(Ordering::Relaxed), 1);
834
835        let guard2 = super::ComputeTaskGuard::new(&state);
836        assert_eq!(state.count.load(Ordering::Relaxed), 2);
837
838        drop(guard1);
839        assert_eq!(state.count.load(Ordering::Relaxed), 1);
840
841        drop(guard2);
842        assert_eq!(state.count.load(Ordering::Relaxed), 0);
843
844        // The notification mechanism is verified by the fact that wait_for_shutdown
845        // doesn't spin-loop forever when compute tasks complete
846    }
847
848    #[test]
849    fn test_compute_tasks_in_flight() {
850        let config = test_config();
851        let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
852
853        // Initially no tasks
854        assert_eq!(runtime.compute_tasks_in_flight(), 0);
855
856        // After spawning and completing, should be back to 0
857        runtime.block_on(async {
858            runtime.spawn_compute(|| 42).await;
859        });
860        assert_eq!(runtime.compute_tasks_in_flight(), 0);
861    }
862
863    #[test]
864    fn test_display() {
865        let config = test_config();
866        let runtime = LoomRuntime::from_config(config, DEFAULT_POOL_SIZE).unwrap();
867
868        let display = format!("{}", runtime);
869        assert!(display.starts_with("LoomRuntime[test]:"));
870        assert!(display.contains("tokio(1, cpus="));
871        assert!(display.contains("rayon(1, cpus="));
872    }
873}