Skip to main content

loom_rs/
runtime.rs

1//! Loom runtime implementation.
2//!
3//! The runtime combines a tokio async runtime with a rayon thread pool,
4//! both configured with CPU pinning.
5//!
6//! # Performance
7//!
8//! This module is designed for zero unnecessary overhead:
9//! - `spawn_async()`: ~10ns overhead (TaskTracker token only)
10//! - `spawn_compute()`: ~100-500ns (cross-thread signaling, 0 bytes after warmup)
11//! - `install()`: ~0ns (zero overhead, direct rayon access)
12//!
13//! # Thread Architecture
14//!
15//! ```text
16//! ┌─────────────────────────────────────────────────────────────┐
17//! │                     LoomRuntime                              │
18//! │  pools: ComputePoolRegistry (per-type lock-free pools)      │
19//! │  (One pool per result type, shared across all threads)      │
20//! └─────────────────────────────────────────────────────────────┘
21//!          │ on_thread_start           │ start_handler
22//!          ▼                           ▼
23//! ┌─────────────────────┐     ┌─────────────────────┐
24//! │   Tokio Workers     │     │   Rayon Workers     │
25//! │  thread_local! {    │     │  thread_local! {    │
26//! │    RUNTIME: Weak<>  │     │    RUNTIME: Weak<>  │
27//! │  }                  │     │  }                  │
28//! └─────────────────────┘     └─────────────────────┘
29//! ```
30
31use crate::affinity::{pin_to_cpu, CpuAllocator};
32use crate::bridge::{
33    PooledRayonTask, ScopedCompletion, ScopedComputeFuture, ScopedTaskState, TaskState,
34};
35use crate::config::LoomConfig;
36use crate::context::{clear_current_runtime, set_current_runtime};
37use crate::cpuset::{available_cpus, format_cpuset, parse_and_validate_cpuset};
38use crate::error::{LoomError, Result};
39use crate::mab::{Arm, ComputeHint, Context, FunctionKey, MabKnobs, MabScheduler};
40use crate::metrics::LoomMetrics;
41use crate::pool::ComputePoolRegistry;
42
43use std::future::Future;
44use std::sync::atomic::{AtomicUsize, Ordering};
45use std::sync::{Arc, OnceLock, Weak};
46use std::time::Instant;
47use tokio::sync::Notify;
48use tokio_util::task::TaskTracker;
49use tracing::{debug, info, warn};
50
51/// State for tracking in-flight compute tasks.
52///
53/// Combines the task counter with a notification mechanism for efficient
54/// shutdown waiting (avoids spin loops).
55struct ComputeTaskState {
56    /// Number of tasks currently executing on rayon
57    count: AtomicUsize,
58    /// Notified when count reaches 0
59    notify: Notify,
60}
61
62impl ComputeTaskState {
63    fn new() -> Self {
64        Self {
65            count: AtomicUsize::new(0),
66            notify: Notify::new(),
67        }
68    }
69}
70
71/// Guard for tracking async task metrics.
72///
73/// Panic-safe: task_completed is called even if the future panics.
74struct AsyncMetricsGuard {
75    inner: Arc<LoomRuntimeInner>,
76}
77
78impl AsyncMetricsGuard {
79    fn new(inner: Arc<LoomRuntimeInner>) -> Self {
80        inner.prometheus_metrics.task_started();
81        Self { inner }
82    }
83}
84
85impl Drop for AsyncMetricsGuard {
86    fn drop(&mut self) {
87        self.inner.prometheus_metrics.task_completed();
88    }
89}
90
91/// Guard for tracking compute task state and metrics.
92///
93/// Panic-safe: executes even if the task closure panics.
94///
95/// SAFETY: The state lives in LoomRuntimeInner which outlives all rayon tasks
96/// because block_until_idle waits for compute_tasks to reach 0.
97struct ComputeTaskGuard {
98    state: *const ComputeTaskState,
99    metrics: *const LoomMetrics,
100}
101
102unsafe impl Send for ComputeTaskGuard {}
103
104impl ComputeTaskGuard {
105    /// Create a new guard, tracking submission in MAB metrics.
106    ///
107    /// This should be called BEFORE spawning on rayon.
108    fn new(state: &ComputeTaskState, metrics: &LoomMetrics) -> Self {
109        state.count.fetch_add(1, Ordering::Relaxed);
110        metrics.rayon_submitted();
111        Self {
112            state: state as *const ComputeTaskState,
113            metrics: metrics as *const LoomMetrics,
114        }
115    }
116
117    /// Mark that the rayon task has started executing.
118    ///
119    /// This should be called at the START of the rayon closure.
120    fn started(&self) {
121        // SAFETY: metrics outlives rayon tasks
122        unsafe {
123            (*self.metrics).rayon_started();
124        }
125    }
126}
127
128impl Drop for ComputeTaskGuard {
129    fn drop(&mut self) {
130        // SAFETY: state and metrics outlive rayon tasks due to shutdown waiting
131        unsafe {
132            // Track MAB metrics completion (panic-safe)
133            (*self.metrics).rayon_completed();
134
135            let prev = (*self.state).count.fetch_sub(1, Ordering::Release);
136            if prev == 1 {
137                // Count just went from 1 to 0, notify waiters
138                (*self.state).notify.notify_waiters();
139            }
140        }
141    }
142}
143
144/// A bespoke thread pool runtime combining tokio and rayon with CPU pinning.
145///
146/// The runtime provides:
147/// - A tokio async runtime for I/O-bound work
148/// - A rayon thread pool for CPU-bound parallel work
149/// - Automatic CPU pinning for both runtimes
150/// - A task tracker for graceful shutdown
151/// - Zero-allocation compute spawning after warmup
152///
153/// # Performance Guarantees
154///
155/// | Method | Overhead | Allocations | Tracked |
156/// |--------|----------|-------------|---------|
157/// | `spawn_async()` | ~10ns | Token only | Yes |
158/// | `spawn_compute()` | ~100-500ns | 0 bytes (after warmup) | Yes |
159/// | `install()` | ~0ns | None | No |
160/// | `rayon_pool()` | 0ns | None | No |
161/// | `tokio_handle()` | 0ns | None | No |
162///
163/// # Examples
164///
165/// ```ignore
166/// use loom_rs::LoomBuilder;
167///
168/// let runtime = LoomBuilder::new()
169///     .prefix("myapp")
170///     .tokio_threads(2)
171///     .rayon_threads(6)
172///     .build()?;
173///
174/// runtime.block_on(async {
175///     // Spawn tracked async I/O task
176///     let io_handle = runtime.spawn_async(async {
177///         fetch_data().await
178///     });
179///
180///     // Spawn tracked compute task and await result
181///     let result = runtime.spawn_compute(|| {
182///         expensive_computation()
183///     }).await;
184///
185///     // Zero-overhead parallel iterators (within tracked context)
186///     let processed = runtime.install(|| {
187///         data.par_iter().map(|x| process(x)).collect()
188///     });
189/// });
190///
191/// // Graceful shutdown from main thread
192/// runtime.block_until_idle();
193/// ```
194pub struct LoomRuntime {
195    pub(crate) inner: Arc<LoomRuntimeInner>,
196}
197
198/// Inner state shared with thread-locals.
199///
200/// This is Arc-wrapped and shared with tokio/rayon worker threads via thread-local
201/// storage, enabling `current_runtime()` to work from any managed thread.
202pub(crate) struct LoomRuntimeInner {
203    config: LoomConfig,
204    tokio_runtime: tokio::runtime::Runtime,
205    pub(crate) rayon_pool: rayon::ThreadPool,
206    task_tracker: TaskTracker,
207    /// Track in-flight rayon tasks for graceful shutdown
208    compute_state: ComputeTaskState,
209    /// Per-type object pools for zero-allocation spawn_compute
210    pub(crate) pools: ComputePoolRegistry,
211    /// Number of tokio worker threads
212    pub(crate) tokio_threads: usize,
213    /// Number of rayon worker threads
214    pub(crate) rayon_threads: usize,
215    /// CPUs allocated to tokio workers
216    pub(crate) tokio_cpus: Vec<usize>,
217    /// CPUs allocated to rayon workers
218    pub(crate) rayon_cpus: Vec<usize>,
219    /// Lazily initialized shared MAB scheduler
220    mab_scheduler: OnceLock<Arc<MabScheduler>>,
221    /// MAB knobs configuration
222    pub(crate) mab_knobs: MabKnobs,
223    /// Prometheus metrics - single source of truth for all metrics
224    /// (serves both Prometheus exposition and MAB scheduling)
225    pub(crate) prometheus_metrics: LoomMetrics,
226}
227
228impl LoomRuntime {
229    /// Create a LoomRuntime from an existing inner reference.
230    ///
231    /// This does **not** create a new runtime; it only creates another
232    /// handle that points at the same `LoomRuntimeInner`. As a result,
233    /// multiple `LoomRuntime` values may refer to the same underlying
234    /// runtime state.
235    ///
236    /// This is intended for internal use by `current_runtime()` to wrap the
237    /// thread-local inner reference. Callers must **not** treat the returned
238    /// handle as an independently owned runtime for the purpose of shutdown
239    /// or teardown. Invoking shutdown-related methods from multiple wrappers
240    /// that share the same inner state may lead to unexpected behavior.
241    pub(crate) fn from_inner(inner: Arc<LoomRuntimeInner>) -> Self {
242        Self { inner }
243    }
244
245    /// Create a runtime from a configuration.
246    ///
247    /// This is typically called via `LoomBuilder::build()`.
248    pub(crate) fn from_config(config: LoomConfig) -> Result<Self> {
249        let pool_size = config.compute_pool_size;
250        // Determine available CPUs
251        // Priority: CUDA device cpuset > user cpuset > all available CPUs
252        // Error if both cuda_device and cpuset are specified (mutually exclusive)
253        let cpus = {
254            #[cfg(feature = "cuda")]
255            {
256                // Check for conflicting configuration first
257                if config.cuda_device.is_some() && config.cpuset.is_some() {
258                    return Err(LoomError::CudaCpusetConflict);
259                }
260
261                if let Some(ref selector) = config.cuda_device {
262                    match crate::cuda::cpuset_for_cuda_device(selector)? {
263                        Some(cuda_cpus) => cuda_cpus,
264                        None => {
265                            // Could not determine CUDA locality, fall back to all CPUs
266                            available_cpus()
267                        }
268                    }
269                } else if let Some(ref cpuset_str) = config.cpuset {
270                    parse_and_validate_cpuset(cpuset_str)?
271                } else {
272                    available_cpus()
273                }
274            }
275            #[cfg(not(feature = "cuda"))]
276            {
277                if let Some(ref cpuset_str) = config.cpuset {
278                    parse_and_validate_cpuset(cpuset_str)?
279                } else {
280                    available_cpus()
281                }
282            }
283        };
284
285        if cpus.is_empty() {
286            return Err(LoomError::NoCpusAvailable);
287        }
288
289        let total_cpus = cpus.len();
290        let tokio_threads = config.effective_tokio_threads();
291        let rayon_threads = config.effective_rayon_threads(total_cpus);
292
293        // Validate we have enough CPUs
294        let total_threads = tokio_threads + rayon_threads;
295        if total_threads > total_cpus {
296            return Err(LoomError::InsufficientCpus {
297                requested: total_threads,
298                available: total_cpus,
299            });
300        }
301
302        // Split CPUs between tokio and rayon
303        let (tokio_cpus, rayon_cpus) = cpus.split_at(tokio_threads.min(cpus.len()));
304        let tokio_cpus = tokio_cpus.to_vec();
305        let rayon_cpus = if rayon_cpus.is_empty() {
306            // If we don't have dedicated rayon CPUs, share with tokio
307            tokio_cpus.clone()
308        } else {
309            rayon_cpus.to_vec()
310        };
311
312        info!(
313            prefix = %config.prefix,
314            tokio_threads,
315            rayon_threads,
316            total_cpus,
317            pool_size,
318            "building loom runtime"
319        );
320
321        // Use Arc<str> for prefix to avoid cloning on each thread start
322        let prefix: Arc<str> = config.prefix.as_str().into();
323
324        // Create the inner runtime first (without tokio/rayon)
325        // We'll use a two-phase approach with OnceCell-like pattern
326        let inner = Arc::new_cyclic(|weak: &Weak<LoomRuntimeInner>| {
327            let weak_clone = weak.clone();
328
329            // Build tokio runtime with thread-local injection
330            let tokio_runtime = Self::build_tokio_runtime(
331                &prefix,
332                tokio_threads,
333                tokio_cpus.clone(),
334                weak_clone.clone(),
335            )
336            .expect("failed to build tokio runtime");
337
338            // Build rayon pool with thread-local injection
339            let rayon_pool =
340                Self::build_rayon_pool(&prefix, rayon_threads, rayon_cpus.clone(), weak_clone)
341                    .expect("failed to build rayon pool");
342
343            // Extract MAB knobs, using defaults if not configured
344            let mab_knobs = config.mab_knobs.clone().unwrap_or_default();
345
346            // Create Prometheus metrics with the runtime's prefix
347            let prometheus_metrics = LoomMetrics::with_prefix(&config.prefix);
348
349            // Register with provided registry if available
350            if let Some(ref registry) = config.prometheus_registry {
351                if let Err(e) = prometheus_metrics.register(registry) {
352                    warn!(%e, "failed to register prometheus metrics");
353                }
354            }
355
356            LoomRuntimeInner {
357                config,
358                tokio_runtime,
359                rayon_pool,
360                task_tracker: TaskTracker::new(),
361                compute_state: ComputeTaskState::new(),
362                pools: ComputePoolRegistry::new(pool_size),
363                tokio_threads,
364                rayon_threads,
365                tokio_cpus,
366                rayon_cpus,
367                mab_scheduler: OnceLock::new(),
368                mab_knobs,
369                prometheus_metrics,
370            }
371        });
372
373        Ok(Self { inner })
374    }
375
376    fn build_tokio_runtime(
377        prefix: &Arc<str>,
378        num_threads: usize,
379        cpus: Vec<usize>,
380        runtime_weak: Weak<LoomRuntimeInner>,
381    ) -> Result<tokio::runtime::Runtime> {
382        let allocator = Arc::new(CpuAllocator::new(cpus));
383        let prefix_clone = Arc::clone(prefix);
384
385        // Thread name counter
386        let thread_counter = Arc::new(AtomicUsize::new(0));
387        let name_prefix = Arc::clone(prefix);
388
389        let start_weak = runtime_weak.clone();
390        let start_allocator = allocator.clone();
391        let start_prefix = prefix_clone.clone();
392
393        let runtime = tokio::runtime::Builder::new_multi_thread()
394            .worker_threads(num_threads)
395            .thread_name_fn(move || {
396                let id = thread_counter.fetch_add(1, Ordering::SeqCst);
397                format!("{}-tokio-{:04}", name_prefix, id)
398            })
399            .on_thread_start(move || {
400                // Pin CPU
401                let cpu_id = start_allocator.allocate();
402                if let Err(e) = pin_to_cpu(cpu_id) {
403                    warn!(%e, %start_prefix, cpu_id, "failed to pin tokio thread");
404                } else {
405                    debug!(cpu_id, %start_prefix, "pinned tokio thread to CPU");
406                }
407
408                // Inject runtime reference into thread-local
409                set_current_runtime(start_weak.clone());
410            })
411            .on_thread_stop(|| {
412                clear_current_runtime();
413            })
414            .enable_all()
415            .build()?;
416
417        Ok(runtime)
418    }
419
420    fn build_rayon_pool(
421        prefix: &Arc<str>,
422        num_threads: usize,
423        cpus: Vec<usize>,
424        runtime_weak: Weak<LoomRuntimeInner>,
425    ) -> Result<rayon::ThreadPool> {
426        let allocator = Arc::new(CpuAllocator::new(cpus));
427        let name_prefix = Arc::clone(prefix);
428
429        let start_weak = runtime_weak.clone();
430        let start_allocator = allocator.clone();
431        let start_prefix = Arc::clone(prefix);
432
433        let pool = rayon::ThreadPoolBuilder::new()
434            .num_threads(num_threads)
435            .thread_name(move |i| format!("{}-rayon-{:04}", name_prefix, i))
436            .start_handler(move |thread_index| {
437                // Pin CPU
438                let cpu_id = start_allocator.allocate();
439                debug!(thread_index, cpu_id, %start_prefix, "rayon thread starting");
440                if let Err(e) = pin_to_cpu(cpu_id) {
441                    warn!(%e, %start_prefix, cpu_id, thread_index, "failed to pin rayon thread");
442                }
443
444                // Inject runtime reference into thread-local
445                set_current_runtime(start_weak.clone());
446            })
447            .exit_handler(|_thread_index| {
448                clear_current_runtime();
449            })
450            .build()?;
451
452        Ok(pool)
453    }
454
455    /// Get the resolved configuration.
456    pub fn config(&self) -> &LoomConfig {
457        &self.inner.config
458    }
459
460    /// Get the tokio runtime handle.
461    ///
462    /// This can be used to spawn untracked tasks or enter the runtime context.
463    /// For tracked async tasks, prefer `spawn_async()`.
464    ///
465    /// # Performance
466    ///
467    /// Zero overhead - returns a reference.
468    pub fn tokio_handle(&self) -> &tokio::runtime::Handle {
469        self.inner.tokio_runtime.handle()
470    }
471
472    /// Get the rayon thread pool.
473    ///
474    /// This can be used to execute parallel iterators or spawn untracked work directly.
475    /// For tracked compute tasks, prefer `spawn_compute()`.
476    /// For zero-overhead parallel iterators, prefer `install()`.
477    ///
478    /// # Performance
479    ///
480    /// Zero overhead - returns a reference.
481    pub fn rayon_pool(&self) -> &rayon::ThreadPool {
482        &self.inner.rayon_pool
483    }
484
485    /// Get the task tracker for graceful shutdown.
486    ///
487    /// Use this to track spawned tasks and wait for them to complete.
488    pub fn task_tracker(&self) -> &TaskTracker {
489        &self.inner.task_tracker
490    }
491
492    /// Block on a future using the tokio runtime.
493    ///
494    /// This is the main entry point for running async code from the main thread.
495    /// The current runtime is available via `loom_rs::current_runtime()` within
496    /// the block_on scope.
497    ///
498    /// # Examples
499    ///
500    /// ```ignore
501    /// runtime.block_on(async {
502    ///     // Async code here
503    ///     // loom_rs::current_runtime() works here
504    /// });
505    /// ```
506    pub fn block_on<F: Future>(&self, f: F) -> F::Output {
507        // Set current runtime for the main thread during block_on
508        set_current_runtime(Arc::downgrade(&self.inner));
509        let result = self.inner.tokio_runtime.block_on(f);
510        clear_current_runtime();
511        result
512    }
513
514    /// Spawn a tracked async task on tokio.
515    ///
516    /// The task is tracked for graceful shutdown via `block_until_idle()`.
517    ///
518    /// # Performance
519    ///
520    /// Overhead: ~10ns (TaskTracker token only).
521    ///
522    /// # Examples
523    ///
524    /// ```ignore
525    /// runtime.block_on(async {
526    ///     let handle = runtime.spawn_async(async {
527    ///         // I/O-bound async work
528    ///         fetch_data().await
529    ///     });
530    ///
531    ///     let result = handle.await.unwrap();
532    /// });
533    /// ```
534    #[inline]
535    pub fn spawn_async<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
536    where
537        F: Future + Send + 'static,
538        F::Output: Send + 'static,
539    {
540        // Track task for MAB metrics (panic-safe via guard)
541        let metrics_guard = AsyncMetricsGuard::new(Arc::clone(&self.inner));
542        let token = self.inner.task_tracker.token();
543        self.inner.tokio_runtime.spawn(async move {
544            let _tracker = token;
545            let _metrics = metrics_guard;
546            future.await
547        })
548    }
549
550    /// Spawn CPU-bound work on rayon and await the result.
551    ///
552    /// The task is tracked for graceful shutdown via `block_until_idle()`.
553    /// Automatically uses per-type object pools for zero allocation after warmup.
554    ///
555    /// # Performance
556    ///
557    /// | State | Allocations | Overhead |
558    /// |-------|-------------|----------|
559    /// | Pool hit | 0 bytes | ~100-500ns |
560    /// | Pool miss | ~32 bytes | ~100-500ns |
561    /// | First call per type | Pool + state | ~1µs |
562    ///
563    /// For zero-overhead parallel iterators (within an already-tracked context),
564    /// use `install()` instead.
565    ///
566    /// # Examples
567    ///
568    /// ```ignore
569    /// runtime.block_on(async {
570    ///     let result = runtime.spawn_compute(|| {
571    ///         // CPU-intensive work
572    ///         expensive_computation()
573    ///     }).await;
574    /// });
575    /// ```
576    #[inline]
577    pub async fn spawn_compute<F, R>(&self, f: F) -> R
578    where
579        F: FnOnce() -> R + Send + 'static,
580        R: Send + 'static,
581    {
582        self.inner.spawn_compute(f).await
583    }
584
585    /// Spawn work with adaptive inline/offload decision.
586    ///
587    /// Uses MAB (Multi-Armed Bandit) to learn whether this function type should
588    /// run inline on tokio or offload to rayon. Good for handler patterns where
589    /// work duration varies by input.
590    ///
591    /// Unlike `spawn_compute()` which always offloads, this adaptively chooses
592    /// based on learned behavior and current system pressure.
593    ///
594    /// # Performance
595    ///
596    /// | Scenario | Behavior | Overhead |
597    /// |----------|----------|----------|
598    /// | Fast work | Inlines after learning | ~100ns (decision only) |
599    /// | Slow work | Offloads after learning | ~100-500ns (+ offload) |
600    /// | Cold start | Explores both arms | Variable |
601    ///
602    /// # Examples
603    ///
604    /// ```ignore
605    /// runtime.block_on(async {
606    ///     // MAB will learn whether this is fast or slow
607    ///     let result = runtime.spawn_adaptive(|| {
608    ///         process_item(item)
609    ///     }).await;
610    /// });
611    /// ```
612    pub async fn spawn_adaptive<F, R>(&self, f: F) -> R
613    where
614        F: FnOnce() -> R + Send + 'static,
615        R: Send + 'static,
616    {
617        self.spawn_adaptive_with_hint(ComputeHint::Unknown, f).await
618    }
619
620    /// Spawn with hint for cold-start guidance.
621    ///
622    /// The hint helps the scheduler make better initial decisions before it has
623    /// learned the actual execution time of this function type.
624    ///
625    /// # Hints
626    ///
627    /// - `ComputeHint::Low` - Expected < 50µs (likely inline-safe)
628    /// - `ComputeHint::Medium` - Expected 50-500µs (borderline)
629    /// - `ComputeHint::High` - Expected > 500µs (should test offload early)
630    /// - `ComputeHint::Unknown` - No hint (default exploration)
631    ///
632    /// # Examples
633    ///
634    /// ```ignore
635    /// use loom_rs::ComputeHint;
636    ///
637    /// runtime.block_on(async {
638    ///     // Hint that this is likely slow work
639    ///     let result = runtime.spawn_adaptive_with_hint(
640    ///         ComputeHint::High,
641    ///         || expensive_computation()
642    ///     ).await;
643    /// });
644    /// ```
645    pub async fn spawn_adaptive_with_hint<F, R>(&self, hint: ComputeHint, f: F) -> R
646    where
647        F: FnOnce() -> R + Send + 'static,
648        R: Send + 'static,
649    {
650        let ctx = self.collect_context();
651        let key = FunctionKey::from_type::<F>();
652        let scheduler = self.mab_scheduler();
653
654        let (id, arm) = scheduler.choose_with_hint(key, &ctx, hint);
655        let start = Instant::now();
656
657        let result = match arm {
658            Arm::InlineTokio => f(),
659            Arm::OffloadRayon => self.inner.spawn_compute(f).await,
660        };
661
662        let elapsed_us = start.elapsed().as_secs_f64() * 1_000_000.0;
663        scheduler.finish(id, elapsed_us, Some(elapsed_us));
664        result
665    }
666
667    /// Execute work on rayon with zero overhead (sync, blocking).
668    ///
669    /// This installs the rayon pool for the current scope, allowing direct use
670    /// of rayon's parallel iterators.
671    ///
672    /// **NOT tracked** - use within an already-tracked task (e.g., inside
673    /// `spawn_async` or `spawn_compute`) for proper shutdown tracking.
674    ///
675    /// # Performance
676    ///
677    /// Zero overhead - direct rayon access.
678    ///
679    /// # Examples
680    ///
681    /// ```ignore
682    /// runtime.block_on(async {
683    ///     // This is a tracked context (we're in block_on)
684    ///     let processed = runtime.install(|| {
685    ///         use rayon::prelude::*;
686    ///         data.par_iter().map(|x| process(x)).collect::<Vec<_>>()
687    ///     });
688    /// });
689    /// ```
690    #[inline]
691    pub fn install<F, R>(&self, f: F) -> R
692    where
693        F: FnOnce() -> R + Send,
694        R: Send,
695    {
696        self.inner.rayon_pool.install(f)
697    }
698
699    /// Execute a scoped parallel computation, allowing borrowed data.
700    ///
701    /// Unlike `spawn_compute()` which requires `'static` bounds, `scope_compute`
702    /// allows borrowing local variables from the async context for use in parallel
703    /// work. This is safe because:
704    ///
705    /// 1. The `.await` suspends the async task
706    /// 2. `rayon::scope` blocks until ALL spawned work completes
707    /// 3. Only then does the future resolve
708    /// 4. Therefore, borrowed references remain valid throughout
709    ///
710    /// # Performance
711    ///
712    /// | Aspect | Value |
713    /// |--------|-------|
714    /// | Allocation | ~96 bytes per call (not pooled) |
715    /// | Overhead | Comparable to `spawn_compute()` |
716    ///
717    /// State cannot be pooled because the result type R may contain borrowed
718    /// references tied to the calling scope. Benchmarks show performance is
719    /// within noise of `spawn_compute()` - the overhead is dominated by
720    /// cross-thread communication, not state management.
721    ///
722    /// # Cancellation Safety
723    ///
724    /// If the future is dropped before completion (e.g., via `select!` or timeout),
725    /// the drop will **block** until the rayon scope finishes. This is necessary
726    /// to prevent use-after-free of borrowed data. In normal usage (awaiting to
727    /// completion), there is no blocking overhead.
728    ///
729    /// # Panic Safety
730    ///
731    /// If the closure or any spawned work panics, the panic is captured and
732    /// re-raised when the future is polled. This ensures panics propagate to
733    /// the async context as expected.
734    ///
735    /// # Leaking the Future
736    ///
737    /// **Important:** Do not leak this future via `std::mem::forget` or similar.
738    /// The safety of borrowed data relies on the future's `Drop` implementation
739    /// blocking until the rayon scope completes. Leaking the future would allow
740    /// the rayon work to continue accessing borrowed data after it goes out of
741    /// scope, leading to undefined behavior. This is a known limitation shared
742    /// by other scoped async APIs (e.g., `async-scoped`).
743    ///
744    /// # Examples
745    ///
746    /// ```ignore
747    /// use std::sync::atomic::{AtomicI32, Ordering};
748    ///
749    /// runtime.block_on(async {
750    ///     let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
751    ///     let sum = AtomicI32::new(0);
752    ///
753    ///     // Borrow `data` and `sum` for parallel processing
754    ///     runtime.scope_compute(|s| {
755    ///         let (left, right) = data.split_at(data.len() / 2);
756    ///
757    ///         s.spawn(|_| {
758    ///             sum.fetch_add(left.iter().sum(), Ordering::Relaxed);
759    ///         });
760    ///         s.spawn(|_| {
761    ///             sum.fetch_add(right.iter().sum(), Ordering::Relaxed);
762    ///         });
763    ///     }).await;
764    ///
765    ///     // `data` and `sum` are still valid here
766    ///     println!("Sum of {:?} = {}", data, sum.load(Ordering::Relaxed));
767    /// });
768    /// ```
769    pub async fn scope_compute<'env, F, R>(&self, f: F) -> R
770    where
771        F: FnOnce(&rayon::Scope<'env>) -> R + Send + 'env,
772        R: Send + 'env,
773    {
774        self.inner.scope_compute(f).await
775    }
776
777    /// Execute scoped work with adaptive sync/async decision.
778    ///
779    /// Uses MAB (Multi-Armed Bandit) to learn whether this function type should:
780    /// - Run synchronously via `install()` (blocks tokio worker, lower overhead)
781    /// - Run asynchronously via `scope_compute()` (frees tokio worker, higher overhead)
782    ///
783    /// Unlike `spawn_adaptive()` which chooses between inline execution and rayon offload,
784    /// `scope_adaptive` always uses `rayon::scope` (needed for parallel spawning with
785    /// borrowed data), but chooses whether to block the tokio worker or use the async bridge.
786    ///
787    /// # Performance
788    ///
789    /// | Scenario | Behavior | Overhead |
790    /// |----------|----------|----------|
791    /// | Fast scoped work | Sync after learning | ~0ns (install overhead only) |
792    /// | Slow scoped work | Async after learning | ~100-500ns (+ bridge) |
793    /// | Cold start | Explores both arms | Variable |
794    ///
795    /// # When to Use
796    ///
797    /// Use `scope_adaptive` when:
798    /// - You need to borrow local data (`'env` lifetime)
799    /// - You want parallel spawning via `rayon::scope`
800    /// - Work duration varies and you want the runtime to learn the best strategy
801    ///
802    /// Use `scope_compute` directly when:
803    /// - Work is always slow (> 500µs)
804    /// - You want consistent async behavior
805    ///
806    /// # Examples
807    ///
808    /// ```ignore
809    /// use std::sync::atomic::{AtomicI32, Ordering};
810    ///
811    /// runtime.block_on(async {
812    ///     let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
813    ///     let sum = AtomicI32::new(0);
814    ///
815    ///     // MAB learns whether this is fast or slow scoped work
816    ///     runtime.scope_adaptive(|s| {
817    ///         let (left, right) = data.split_at(data.len() / 2);
818    ///         let sum_ref = &sum;
819    ///
820    ///         s.spawn(move |_| {
821    ///             sum_ref.fetch_add(left.iter().sum(), Ordering::Relaxed);
822    ///         });
823    ///         s.spawn(move |_| {
824    ///             sum_ref.fetch_add(right.iter().sum(), Ordering::Relaxed);
825    ///         });
826    ///     }).await;
827    ///
828    ///     println!("Sum: {}", sum.load(Ordering::Relaxed));
829    /// });
830    /// ```
831    pub async fn scope_adaptive<'env, F, R>(&self, f: F) -> R
832    where
833        F: FnOnce(&rayon::Scope<'env>) -> R + Send + 'env,
834        R: Send + 'env,
835    {
836        self.scope_adaptive_with_hint(ComputeHint::Unknown, f).await
837    }
838
839    /// Execute scoped work with hint for cold-start guidance.
840    ///
841    /// The hint helps the scheduler make better initial decisions before it has
842    /// learned the actual execution time of this function type.
843    ///
844    /// # Hints
845    ///
846    /// - `ComputeHint::Low` - Expected < 50µs (likely sync-safe)
847    /// - `ComputeHint::Medium` - Expected 50-500µs (borderline)
848    /// - `ComputeHint::High` - Expected > 500µs (should test async early)
849    /// - `ComputeHint::Unknown` - No hint (default exploration)
850    ///
851    /// # Examples
852    ///
853    /// ```ignore
854    /// use loom_rs::ComputeHint;
855    /// use std::sync::atomic::{AtomicI32, Ordering};
856    ///
857    /// runtime.block_on(async {
858    ///     let data = vec![1, 2, 3, 4];
859    ///     let sum = AtomicI32::new(0);
860    ///
861    ///     // Hint that this is likely fast work
862    ///     runtime.scope_adaptive_with_hint(ComputeHint::Low, |s| {
863    ///         let sum_ref = &sum;
864    ///         for &val in &data {
865    ///             s.spawn(move |_| {
866    ///                 sum_ref.fetch_add(val, Ordering::Relaxed);
867    ///             });
868    ///         }
869    ///     }).await;
870    /// });
871    /// ```
872    pub async fn scope_adaptive_with_hint<'env, F, R>(&self, hint: ComputeHint, f: F) -> R
873    where
874        F: FnOnce(&rayon::Scope<'env>) -> R + Send + 'env,
875        R: Send + 'env,
876    {
877        let ctx = self.collect_context();
878        // Use from_type_name since F may capture non-'static references
879        let key = FunctionKey::from_type_name::<F>();
880        let scheduler = self.mab_scheduler();
881
882        let (id, arm) = scheduler.choose_with_hint(key, &ctx, hint);
883        let start = Instant::now();
884
885        let result = match arm {
886            Arm::InlineTokio => {
887                // Sync: blocks tokio but no async bridge overhead
888                self.install(|| rayon::scope(|s| f(s)))
889            }
890            Arm::OffloadRayon => {
891                // Async: frees tokio worker during execution
892                self.scope_compute(f).await
893            }
894        };
895
896        let elapsed_us = start.elapsed().as_secs_f64() * 1_000_000.0;
897        scheduler.finish(id, elapsed_us, Some(elapsed_us));
898        result
899    }
900
901    /// Stop accepting new tasks.
902    ///
903    /// After calling this, `spawn_async()` and `spawn_compute()` will still
904    /// work, but the shutdown process has begun. Use `is_idle()` or
905    /// `wait_for_shutdown()` to check/wait for completion.
906    pub fn shutdown(&self) {
907        self.inner.task_tracker.close();
908    }
909
910    /// Check if all tracked tasks have completed.
911    ///
912    /// Returns `true` if `shutdown()` has been called and all tracked async
913    /// tasks and compute tasks have finished.
914    ///
915    /// # Performance
916    ///
917    /// Zero overhead - single atomic load.
918    #[inline]
919    pub fn is_idle(&self) -> bool {
920        self.inner.task_tracker.is_closed()
921            && self.inner.task_tracker.is_empty()
922            && self.inner.compute_state.count.load(Ordering::Acquire) == 0
923    }
924
925    /// Get the number of compute tasks currently in flight.
926    ///
927    /// Useful for debugging shutdown issues or monitoring workload.
928    ///
929    /// # Example
930    ///
931    /// ```ignore
932    /// if runtime.compute_tasks_in_flight() > 0 {
933    ///     tracing::warn!("Still waiting for {} compute tasks",
934    ///         runtime.compute_tasks_in_flight());
935    /// }
936    /// ```
937    #[inline]
938    pub fn compute_tasks_in_flight(&self) -> usize {
939        self.inner.compute_state.count.load(Ordering::Relaxed)
940    }
941
942    /// Wait for all tracked tasks to complete (async).
943    ///
944    /// Call from within `block_on()`. Requires `shutdown()` to be called first,
945    /// otherwise this will wait forever.
946    ///
947    /// # Examples
948    ///
949    /// ```ignore
950    /// runtime.block_on(async {
951    ///     runtime.spawn_async(work());
952    ///     runtime.shutdown();
953    ///     runtime.wait_for_shutdown().await;
954    /// });
955    /// ```
956    pub async fn wait_for_shutdown(&self) {
957        self.inner.task_tracker.wait().await;
958
959        // Wait for compute tasks efficiently (no spin loop)
960        let mut logged = false;
961        loop {
962            let count = self.inner.compute_state.count.load(Ordering::Acquire);
963            if count == 0 {
964                break;
965            }
966            if !logged {
967                debug!(count, "waiting for compute tasks to complete");
968                logged = true;
969            }
970            self.inner.compute_state.notify.notified().await;
971        }
972    }
973
974    /// Block until all tracked tasks complete (from main thread).
975    ///
976    /// This is the primary shutdown method. It:
977    /// 1. Calls `shutdown()` to close the task tracker
978    /// 2. Waits for all tracked async and compute tasks to finish
979    ///
980    /// # Examples
981    ///
982    /// ```ignore
983    /// runtime.block_on(async {
984    ///     runtime.spawn_async(background_work());
985    ///     runtime.spawn_compute(|| cpu_work());
986    /// });
987    ///
988    /// // Graceful shutdown from main thread
989    /// runtime.block_until_idle();
990    /// ```
991    pub fn block_until_idle(&self) {
992        self.shutdown();
993        self.block_on(self.wait_for_shutdown());
994    }
995
996    /// Get the shared MAB scheduler for handler patterns.
997    ///
998    /// The scheduler is lazily initialized on first call. Use this when you
999    /// need to make manual scheduling decisions in handler code.
1000    ///
1001    /// # Example
1002    ///
1003    /// ```ignore
1004    /// use loom_rs::mab::{FunctionKey, Arm};
1005    ///
1006    /// let sched = runtime.mab_scheduler();
1007    /// let key = FunctionKey::from_type::<MyHandler>();
1008    /// let ctx = runtime.collect_context();
1009    ///
1010    /// let (id, arm) = sched.choose(key, &ctx);
1011    /// let result = match arm {
1012    ///     Arm::InlineTokio => my_work(),
1013    ///     Arm::OffloadRayon => runtime.block_on(async {
1014    ///         runtime.spawn_compute(|| my_work()).await
1015    ///     }),
1016    /// };
1017    /// sched.finish(id, elapsed_us, Some(fn_us));
1018    /// ```
1019    pub fn mab_scheduler(&self) -> Arc<MabScheduler> {
1020        self.inner
1021            .mab_scheduler
1022            .get_or_init(|| {
1023                Arc::new(MabScheduler::with_metrics(
1024                    self.inner.mab_knobs.clone(),
1025                    self.inner.prometheus_metrics.clone(),
1026                ))
1027            })
1028            .clone()
1029    }
1030
1031    /// Collect current runtime context for MAB scheduling decisions.
1032    ///
1033    /// Returns a snapshot of current metrics including inflight tasks,
1034    /// spawn rate, and queue depth.
1035    pub fn collect_context(&self) -> Context {
1036        self.inner.prometheus_metrics.collect_context(
1037            self.inner.tokio_threads as u32,
1038            self.inner.rayon_threads as u32,
1039        )
1040    }
1041
1042    /// Get the number of tokio worker threads.
1043    pub fn tokio_threads(&self) -> usize {
1044        self.inner.tokio_threads
1045    }
1046
1047    /// Get the number of rayon threads.
1048    pub fn rayon_threads(&self) -> usize {
1049        self.inner.rayon_threads
1050    }
1051
1052    /// Get the Prometheus metrics.
1053    ///
1054    /// The metrics are always collected (zero overhead atomic operations).
1055    /// If a Prometheus registry was provided via `LoomBuilder::prometheus_registry()`,
1056    /// the metrics are also registered for exposition.
1057    pub fn prometheus_metrics(&self) -> &LoomMetrics {
1058        &self.inner.prometheus_metrics
1059    }
1060
1061    /// Get the CPUs allocated to tokio workers.
1062    pub fn tokio_cpus(&self) -> &[usize] {
1063        &self.inner.tokio_cpus
1064    }
1065
1066    /// Get the CPUs allocated to rayon workers.
1067    pub fn rayon_cpus(&self) -> &[usize] {
1068        &self.inner.rayon_cpus
1069    }
1070}
1071
1072impl LoomRuntimeInner {
1073    /// Spawn CPU-bound work on rayon and await the result.
1074    ///
1075    /// Uses per-type object pools for zero allocation after warmup.
1076    #[inline]
1077    pub async fn spawn_compute<F, R>(self: &Arc<Self>, f: F) -> R
1078    where
1079        F: FnOnce() -> R + Send + 'static,
1080        R: Send + 'static,
1081    {
1082        let pool = self.pools.get_or_create::<R>();
1083
1084        // Try to get state from pool, or allocate new
1085        let state = pool.pop().unwrap_or_else(|| Arc::new(TaskState::new()));
1086
1087        // Create the pooled task
1088        let (task, completion, state_for_return) = PooledRayonTask::new(state);
1089
1090        // Create guard BEFORE spawning - it increments counter and tracks MAB metrics
1091        let guard = ComputeTaskGuard::new(&self.compute_state, &self.prometheus_metrics);
1092
1093        self.rayon_pool.spawn(move || {
1094            // Track rayon task start for queue depth calculation
1095            guard.started();
1096
1097            // Execute work inside guard scope so counter decrements BEFORE completing.
1098            // This ensures the async future sees count=0 when it wakes up.
1099            let result = {
1100                let _guard = guard;
1101                f()
1102            };
1103            completion.complete(result);
1104        });
1105
1106        let result = task.await;
1107
1108        // Return state to pool for reuse
1109        state_for_return.reset();
1110        pool.push(state_for_return);
1111
1112        result
1113    }
1114
1115    /// Execute a scoped parallel computation with borrowed data.
1116    ///
1117    /// # Safety Argument
1118    ///
1119    /// The lifetime erasure via transmute is sound because:
1120    /// 1. The async task is suspended at `.await`
1121    /// 2. The future only completes after `rayon::scope` returns
1122    /// 3. `rayon::scope` blocks until ALL spawned work completes
1123    /// 4. Therefore, `'env` references outlive all accesses to borrowed data
1124    ///
1125    /// This is the same safety argument used by `std::thread::scope`.
1126    pub async fn scope_compute<'env, F, R>(self: &Arc<Self>, f: F) -> R
1127    where
1128        F: FnOnce(&rayon::Scope<'env>) -> R + Send + 'env,
1129        R: Send + 'env,
1130    {
1131        // SAFETY: Lifetime erasure is sound because:
1132        // 1. The future holds the async task suspended at .await
1133        // 2. Future only completes after rayon::scope returns
1134        // 3. rayon::scope blocks until all spawned work completes
1135        // 4. Therefore 'env outlives all accesses to borrowed data
1136        //
1137        // Additionally, if the future is dropped before completion (cancellation),
1138        // ScopedComputeFuture::drop blocks until the scope finishes, maintaining
1139        // the safety invariant.
1140
1141        let state = Arc::new(ScopedTaskState::<R>::new());
1142        let future = ScopedComputeFuture::new(state.clone());
1143        let completion = ScopedCompletion::new(state);
1144
1145        // Create guard BEFORE spawning - it increments counter and tracks MAB metrics
1146        let guard = ComputeTaskGuard::new(&self.compute_state, &self.prometheus_metrics);
1147
1148        // Create a closure that captures f, completion, and guard.
1149        // We use catch_unwind to ensure completion is always signaled, even on panic.
1150        let work = move || {
1151            guard.started();
1152            let result = {
1153                let _guard = guard;
1154                std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| rayon::scope(|s| f(s))))
1155            };
1156            match result {
1157                Ok(value) => completion.complete(value),
1158                Err(payload) => completion.complete_with_panic(payload),
1159            }
1160        };
1161
1162        // SAFETY: We erase the lifetime to 'static here. This is safe because:
1163        // 1. The ScopedComputeFuture we return will block in Drop if the work
1164        //    hasn't completed (cancellation safety)
1165        // 2. Normal await returns only after the work completes
1166        // 3. Therefore, all 'env references remain valid for the duration of
1167        //    the work execution
1168        let erased: Box<dyn FnOnce() + Send + 'static> = unsafe {
1169            std::mem::transmute::<Box<dyn FnOnce() + Send + 'env>, Box<dyn FnOnce() + Send + 'static>>(
1170                Box::new(work),
1171            )
1172        };
1173
1174        self.rayon_pool.spawn(move || {
1175            erased();
1176        });
1177
1178        future.await
1179    }
1180}
1181
1182impl std::fmt::Debug for LoomRuntime {
1183    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1184        f.debug_struct("LoomRuntime")
1185            .field("config", &self.inner.config)
1186            .field(
1187                "compute_tasks_in_flight",
1188                &self.inner.compute_state.count.load(Ordering::Relaxed),
1189            )
1190            .finish_non_exhaustive()
1191    }
1192}
1193
1194impl std::fmt::Display for LoomRuntime {
1195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1196        write!(
1197            f,
1198            "LoomRuntime[{}]: tokio({}, cpus={}) rayon({}, cpus={})",
1199            self.inner.config.prefix,
1200            self.inner.tokio_threads,
1201            format_cpuset(&self.inner.tokio_cpus),
1202            self.inner.rayon_threads,
1203            format_cpuset(&self.inner.rayon_cpus),
1204        )
1205    }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use super::*;
1211    use crate::pool::DEFAULT_POOL_SIZE;
1212
1213    fn test_config() -> LoomConfig {
1214        LoomConfig {
1215            prefix: "test".to_string(),
1216            cpuset: None,
1217            tokio_threads: Some(1),
1218            rayon_threads: Some(1),
1219            compute_pool_size: DEFAULT_POOL_SIZE,
1220            #[cfg(feature = "cuda")]
1221            cuda_device: None,
1222            mab_knobs: None,
1223            calibration: None,
1224            prometheus_registry: None,
1225        }
1226    }
1227
1228    #[test]
1229    fn test_runtime_creation() {
1230        let config = test_config();
1231        let runtime = LoomRuntime::from_config(config).unwrap();
1232        assert_eq!(runtime.config().prefix, "test");
1233    }
1234
1235    #[test]
1236    fn test_block_on() {
1237        let config = test_config();
1238        let runtime = LoomRuntime::from_config(config).unwrap();
1239
1240        let result = runtime.block_on(async { 42 });
1241        assert_eq!(result, 42);
1242    }
1243
1244    #[test]
1245    fn test_spawn_compute() {
1246        let config = test_config();
1247        let runtime = LoomRuntime::from_config(config).unwrap();
1248
1249        let result =
1250            runtime.block_on(async { runtime.spawn_compute(|| (0..100).sum::<i32>()).await });
1251        assert_eq!(result, 4950);
1252    }
1253
1254    #[test]
1255    fn test_spawn_async() {
1256        let config = test_config();
1257        let runtime = LoomRuntime::from_config(config).unwrap();
1258
1259        let result = runtime.block_on(async {
1260            let handle = runtime.spawn_async(async { 42 });
1261            handle.await.unwrap()
1262        });
1263        assert_eq!(result, 42);
1264    }
1265
1266    #[test]
1267    fn test_install() {
1268        let config = test_config();
1269        let runtime = LoomRuntime::from_config(config).unwrap();
1270
1271        let result = runtime.install(|| {
1272            use rayon::prelude::*;
1273            (0..100).into_par_iter().sum::<i32>()
1274        });
1275        assert_eq!(result, 4950);
1276    }
1277
1278    #[test]
1279    fn test_shutdown_and_idle() {
1280        let config = test_config();
1281        let runtime = LoomRuntime::from_config(config).unwrap();
1282
1283        // Initially not idle (tracker not closed)
1284        assert!(!runtime.is_idle());
1285
1286        // After shutdown with no tasks, should be idle
1287        runtime.shutdown();
1288        assert!(runtime.is_idle());
1289    }
1290
1291    #[test]
1292    fn test_block_until_idle() {
1293        let config = test_config();
1294        let runtime = LoomRuntime::from_config(config).unwrap();
1295
1296        runtime.block_on(async {
1297            runtime.spawn_async(async { 42 });
1298            runtime.spawn_compute(|| 100).await;
1299        });
1300
1301        runtime.block_until_idle();
1302        assert!(runtime.is_idle());
1303    }
1304
1305    #[test]
1306    fn test_insufficient_cpus_error() {
1307        let mut config = test_config();
1308        config.cpuset = Some("0".to_string()); // Only 1 CPU
1309        config.tokio_threads = Some(2);
1310        config.rayon_threads = Some(2);
1311
1312        let result = LoomRuntime::from_config(config);
1313        assert!(matches!(result, Err(LoomError::InsufficientCpus { .. })));
1314    }
1315
1316    #[test]
1317    fn test_current_runtime_in_block_on() {
1318        let config = test_config();
1319        let runtime = LoomRuntime::from_config(config).unwrap();
1320
1321        runtime.block_on(async {
1322            // current_runtime should work inside block_on
1323            let current = crate::context::current_runtime();
1324            assert!(current.is_some());
1325        });
1326
1327        // Outside block_on, should be None
1328        assert!(crate::context::current_runtime().is_none());
1329    }
1330
1331    #[test]
1332    fn test_spawn_compute_pooling() {
1333        let config = test_config();
1334        let runtime = LoomRuntime::from_config(config).unwrap();
1335
1336        // Warmup - first call allocates
1337        runtime.block_on(async {
1338            runtime.spawn_compute(|| 1i32).await;
1339        });
1340
1341        // Subsequent calls should reuse pooled state (we can't easily verify this
1342        // without internal access, but we can verify it works)
1343        runtime.block_on(async {
1344            for i in 0..100 {
1345                let result = runtime.spawn_compute(move || i).await;
1346                assert_eq!(result, i);
1347            }
1348        });
1349    }
1350
1351    #[test]
1352    fn test_spawn_compute_guard_drops_on_scope_exit() {
1353        // This test verifies the guard's Drop implementation works correctly.
1354        // We can't easily test panic behavior in rayon (panics abort by default),
1355        // but we can verify the guard decrements the counter when it goes out of scope.
1356        use crate::metrics::LoomMetrics;
1357        use std::sync::atomic::Ordering;
1358
1359        let state = super::ComputeTaskState::new();
1360        let metrics = LoomMetrics::new();
1361
1362        // Create a guard (increments counter)
1363        {
1364            let _guard = super::ComputeTaskGuard::new(&state, &metrics);
1365            assert_eq!(state.count.load(Ordering::Relaxed), 1);
1366        }
1367        // Guard dropped, counter should be 0
1368        assert_eq!(state.count.load(Ordering::Relaxed), 0);
1369
1370        // Test multiple guards
1371        let state = super::ComputeTaskState::new();
1372
1373        let guard1 = super::ComputeTaskGuard::new(&state, &metrics);
1374        assert_eq!(state.count.load(Ordering::Relaxed), 1);
1375
1376        let guard2 = super::ComputeTaskGuard::new(&state, &metrics);
1377        assert_eq!(state.count.load(Ordering::Relaxed), 2);
1378
1379        drop(guard1);
1380        assert_eq!(state.count.load(Ordering::Relaxed), 1);
1381
1382        drop(guard2);
1383        assert_eq!(state.count.load(Ordering::Relaxed), 0);
1384
1385        // The notification mechanism is verified by the fact that wait_for_shutdown
1386        // doesn't spin-loop forever when compute tasks complete
1387    }
1388
1389    #[test]
1390    fn test_compute_tasks_in_flight() {
1391        let config = test_config();
1392        let runtime = LoomRuntime::from_config(config).unwrap();
1393
1394        // Initially no tasks
1395        assert_eq!(runtime.compute_tasks_in_flight(), 0);
1396
1397        // After spawning and completing, should be back to 0
1398        runtime.block_on(async {
1399            runtime.spawn_compute(|| 42).await;
1400        });
1401        assert_eq!(runtime.compute_tasks_in_flight(), 0);
1402    }
1403
1404    #[test]
1405    fn test_display() {
1406        let config = test_config();
1407        let runtime = LoomRuntime::from_config(config).unwrap();
1408
1409        let display = format!("{}", runtime);
1410        assert!(display.starts_with("LoomRuntime[test]:"));
1411        assert!(display.contains("tokio(1, cpus="));
1412        assert!(display.contains("rayon(1, cpus="));
1413    }
1414
1415    #[test]
1416    fn test_cpuset_only() {
1417        let mut config = test_config();
1418        config.cpuset = Some("0".to_string());
1419        config.tokio_threads = Some(1);
1420        config.rayon_threads = Some(0);
1421
1422        let runtime = LoomRuntime::from_config(config).unwrap();
1423        // Should use the user-provided cpuset
1424        assert_eq!(runtime.inner.tokio_cpus, vec![0]);
1425    }
1426
1427    /// Test that CUDA cpuset conflict error is properly detected.
1428    /// This test requires actual CUDA hardware to verify the conflict.
1429    #[cfg(feature = "cuda-tests")]
1430    #[test]
1431    fn test_cuda_cpuset_conflict_error() {
1432        let mut config = test_config();
1433        config.cuda_device = Some(crate::cuda::CudaDeviceSelector::DeviceId(0));
1434        config.cpuset = Some("0".to_string()); // Conflict: both specified
1435
1436        let result = LoomRuntime::from_config(config);
1437        assert!(
1438            matches!(result, Err(LoomError::CudaCpusetConflict)),
1439            "expected CudaCpusetConflict error, got {:?}",
1440            result
1441        );
1442    }
1443
1444    /// Test that CUDA device alone (without cpuset) works.
1445    #[cfg(feature = "cuda-tests")]
1446    #[test]
1447    fn test_cuda_device_only() {
1448        let mut config = test_config();
1449        config.cuda_device = Some(crate::cuda::CudaDeviceSelector::DeviceId(0));
1450        config.cpuset = None;
1451
1452        let runtime = LoomRuntime::from_config(config).unwrap();
1453        // Should have found CUDA-local CPUs
1454        assert!(!runtime.inner.tokio_cpus.is_empty());
1455    }
1456
1457    // =============================================================================
1458    // spawn_adaptive Tests
1459    // =============================================================================
1460
1461    #[test]
1462    fn test_spawn_adaptive_runs_work() {
1463        let config = test_config();
1464        let runtime = LoomRuntime::from_config(config).unwrap();
1465
1466        let result = runtime.block_on(async { runtime.spawn_adaptive(|| 42).await });
1467
1468        assert_eq!(result, 42);
1469    }
1470
1471    #[test]
1472    fn test_spawn_adaptive_with_hint() {
1473        let config = test_config();
1474        let runtime = LoomRuntime::from_config(config).unwrap();
1475
1476        let result = runtime.block_on(async {
1477            runtime
1478                .spawn_adaptive_with_hint(crate::ComputeHint::Low, || 100)
1479                .await
1480        });
1481
1482        assert_eq!(result, 100);
1483    }
1484
1485    #[test]
1486    fn test_spawn_adaptive_multiple_calls() {
1487        let config = test_config();
1488        let runtime = LoomRuntime::from_config(config).unwrap();
1489
1490        runtime.block_on(async {
1491            // Run many fast tasks to let MAB learn
1492            for i in 0..50 {
1493                let result = runtime.spawn_adaptive(move || i * 2).await;
1494                assert_eq!(result, i * 2);
1495            }
1496        });
1497    }
1498
1499    #[test]
1500    fn test_spawn_adaptive_records_metrics() {
1501        let config = test_config();
1502        let runtime = LoomRuntime::from_config(config).unwrap();
1503
1504        runtime.block_on(async {
1505            // Run some adaptive tasks
1506            for _ in 0..10 {
1507                runtime.spawn_adaptive(|| std::hint::black_box(42)).await;
1508            }
1509        });
1510
1511        // Check that metrics were recorded
1512        let metrics = runtime.prometheus_metrics();
1513        let total_decisions = metrics.inline_decisions.get() + metrics.offload_decisions.get();
1514        assert!(
1515            total_decisions >= 10,
1516            "Should have recorded at least 10 decisions, got {}",
1517            total_decisions
1518        );
1519    }
1520
1521    #[test]
1522    fn test_prometheus_metrics_use_prefix() {
1523        let mut config = test_config();
1524        config.prefix = "myapp".to_string();
1525        let runtime = LoomRuntime::from_config(config).unwrap();
1526
1527        // The metrics should use the prefix from config
1528        // We can verify by checking the registry if one was provided
1529        let registry = prometheus::Registry::new();
1530        runtime
1531            .prometheus_metrics()
1532            .register(&registry)
1533            .expect("registration should succeed");
1534
1535        let families = registry.gather();
1536        // Find a metric with our prefix
1537        let myapp_metric = families.iter().find(|f| f.get_name().starts_with("myapp_"));
1538        assert!(
1539            myapp_metric.is_some(),
1540            "Should find metrics with 'myapp_' prefix"
1541        );
1542
1543        // Should not find metrics with default 'loom_' prefix
1544        let loom_metric = families.iter().find(|f| f.get_name().starts_with("loom_"));
1545        assert!(
1546            loom_metric.is_none(),
1547            "Should not find metrics with 'loom_' prefix"
1548        );
1549    }
1550
1551    // =============================================================================
1552    // scope_compute Tests
1553    // =============================================================================
1554
1555    #[test]
1556    fn test_scope_compute_basic() {
1557        let config = test_config();
1558        let runtime = LoomRuntime::from_config(config).unwrap();
1559
1560        let result = runtime.block_on(async {
1561            runtime
1562                .scope_compute(|_s| {
1563                    // Simple computation without spawning
1564                    42
1565                })
1566                .await
1567        });
1568
1569        assert_eq!(result, 42);
1570    }
1571
1572    #[test]
1573    fn test_scope_compute_borrow_local_data() {
1574        let config = test_config();
1575        let runtime = LoomRuntime::from_config(config).unwrap();
1576
1577        let result = runtime.block_on(async {
1578            let data = [1, 2, 3, 4, 5, 6, 7, 8];
1579
1580            let sum = runtime
1581                .scope_compute(|_s| {
1582                    // Borrow data inside the scope
1583                    data.iter().sum::<i32>()
1584                })
1585                .await;
1586
1587            // data is still valid after scope_compute
1588            assert_eq!(data.len(), 8);
1589            sum
1590        });
1591
1592        assert_eq!(result, 36);
1593    }
1594
1595    #[test]
1596    fn test_scope_compute_parallel_with_atomic() {
1597        use std::sync::atomic::{AtomicI32, Ordering};
1598
1599        let mut config = test_config();
1600        config.rayon_threads = Some(2);
1601        let runtime = LoomRuntime::from_config(config).unwrap();
1602
1603        let result = runtime.block_on(async {
1604            let data = [1, 2, 3, 4, 5, 6, 7, 8];
1605            let sum = AtomicI32::new(0);
1606
1607            runtime
1608                .scope_compute(|s| {
1609                    let (left, right) = data.split_at(data.len() / 2);
1610                    let sum_ref = &sum;
1611
1612                    s.spawn(move |_| {
1613                        let partial: i32 = left.iter().sum();
1614                        sum_ref.fetch_add(partial, Ordering::Relaxed);
1615                    });
1616                    s.spawn(move |_| {
1617                        let partial: i32 = right.iter().sum();
1618                        sum_ref.fetch_add(partial, Ordering::Relaxed);
1619                    });
1620                })
1621                .await;
1622
1623            sum.load(Ordering::Relaxed)
1624        });
1625
1626        assert_eq!(result, 36);
1627    }
1628
1629    #[test]
1630    fn test_scope_compute_nested_spawns() {
1631        use std::sync::atomic::{AtomicI32, Ordering};
1632
1633        let config = test_config();
1634        let runtime = LoomRuntime::from_config(config).unwrap();
1635
1636        let result = runtime.block_on(async {
1637            let data = [1, 2, 3, 4, 5, 6, 7, 8];
1638            let sum = AtomicI32::new(0);
1639
1640            runtime
1641                .scope_compute(|s| {
1642                    let data_ref = &data;
1643                    let sum_ref = &sum;
1644
1645                    s.spawn(move |s| {
1646                        // Nested spawn
1647                        s.spawn(move |_| {
1648                            sum_ref
1649                                .fetch_add(data_ref[0..2].iter().sum::<i32>(), Ordering::Relaxed);
1650                        });
1651                        s.spawn(move |_| {
1652                            sum_ref
1653                                .fetch_add(data_ref[2..4].iter().sum::<i32>(), Ordering::Relaxed);
1654                        });
1655                    });
1656                    s.spawn(move |_| {
1657                        sum_ref.fetch_add(data_ref[4..8].iter().sum::<i32>(), Ordering::Relaxed);
1658                    });
1659                })
1660                .await;
1661
1662            // rayon::scope guarantees all spawned work completes before returning
1663            sum.load(Ordering::Relaxed)
1664        });
1665
1666        assert_eq!(result, 36);
1667    }
1668
1669    #[test]
1670    fn test_scope_compute_with_rayon_par_iter() {
1671        let config = test_config();
1672        let runtime = LoomRuntime::from_config(config).unwrap();
1673
1674        let result = runtime.block_on(async {
1675            let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1676
1677            runtime
1678                .scope_compute(|_s| {
1679                    use rayon::prelude::*;
1680                    // Use parallel iterators inside the scope
1681                    data.par_iter().map(|x| x * 2).sum::<i32>()
1682                })
1683                .await
1684        });
1685
1686        assert_eq!(result, 110);
1687    }
1688
1689    #[test]
1690    fn test_scope_compute_tracks_compute_tasks() {
1691        let config = test_config();
1692        let runtime = LoomRuntime::from_config(config).unwrap();
1693
1694        // Initially no tasks
1695        assert_eq!(runtime.compute_tasks_in_flight(), 0);
1696
1697        runtime.block_on(async {
1698            runtime.scope_compute(|_s| 42).await;
1699        });
1700
1701        // After completion, should be back to 0
1702        assert_eq!(runtime.compute_tasks_in_flight(), 0);
1703    }
1704
1705    #[test]
1706    fn test_scope_compute_data_still_valid_after() {
1707        let config = test_config();
1708        let runtime = LoomRuntime::from_config(config).unwrap();
1709
1710        runtime.block_on(async {
1711            let mut data = vec![1, 2, 3, 4, 5];
1712
1713            let sum = runtime.scope_compute(|_s| data.iter().sum::<i32>()).await;
1714
1715            assert_eq!(sum, 15);
1716
1717            // data is still valid and can be modified
1718            data.push(6);
1719            assert_eq!(data.len(), 6);
1720
1721            // Can use scope_compute again with the same data
1722            let new_sum = runtime.scope_compute(|_s| data.iter().sum::<i32>()).await;
1723            assert_eq!(new_sum, 21);
1724        });
1725    }
1726
1727    /// Test that scope_compute properly yields to the async executor and doesn't block.
1728    /// This validates the future correctly returns Poll::Pending and wakes up when done.
1729    #[test]
1730    fn test_scope_compute_yields_to_executor() {
1731        use std::sync::atomic::{AtomicBool, Ordering};
1732        use std::sync::Arc;
1733        use std::time::Duration;
1734
1735        let config = test_config();
1736        let runtime = LoomRuntime::from_config(config).unwrap();
1737
1738        runtime.block_on(async {
1739            let concurrent_task_ran = Arc::new(AtomicBool::new(false));
1740            let concurrent_task_ran_clone = concurrent_task_ran.clone();
1741
1742            // Spawn a concurrent async task that should run while scope_compute is waiting
1743            let concurrent_handle = runtime.spawn_async(async move {
1744                // Wait a bit for the scope_compute to start
1745                tokio::time::sleep(Duration::from_millis(5)).await;
1746                concurrent_task_ran_clone.store(true, Ordering::Release);
1747                100
1748            });
1749
1750            // Run scope_compute with work that takes some time
1751            let result = runtime
1752                .scope_compute(|_s| {
1753                    // Simulate some work
1754                    std::thread::sleep(Duration::from_millis(20));
1755                    42
1756                })
1757                .await;
1758
1759            assert_eq!(result, 42);
1760
1761            // Verify the concurrent task actually ran during scope_compute
1762            // (check BEFORE awaiting the handle to prove it ran concurrently)
1763            assert!(
1764                concurrent_task_ran.load(Ordering::Acquire),
1765                "Concurrent task should have run while scope_compute was in progress"
1766            );
1767
1768            // The concurrent task should have completed
1769            let concurrent_result = concurrent_handle.await.unwrap();
1770            assert_eq!(concurrent_result, 100);
1771        });
1772    }
1773
1774    /// Test that mirrors the documentation example for scope_compute.
1775    /// This ensures the example code actually compiles and works.
1776    #[test]
1777    fn test_scope_compute_doc_example() {
1778        use std::sync::atomic::{AtomicI32, Ordering};
1779
1780        let config = test_config();
1781        let runtime = LoomRuntime::from_config(config).unwrap();
1782
1783        let result = runtime.block_on(async {
1784            let data = [1, 2, 3, 4, 5, 6, 7, 8];
1785            let sum = AtomicI32::new(0);
1786
1787            runtime
1788                .scope_compute(|s| {
1789                    let (left, right) = data.split_at(data.len() / 2);
1790                    let sum_ref = &sum;
1791
1792                    s.spawn(move |_| {
1793                        sum_ref.fetch_add(left.iter().sum::<i32>(), Ordering::Relaxed);
1794                    });
1795                    s.spawn(move |_| {
1796                        sum_ref.fetch_add(right.iter().sum::<i32>(), Ordering::Relaxed);
1797                    });
1798                })
1799                .await;
1800
1801            // Verify data is still accessible after scope_compute
1802            assert_eq!(data.len(), 8);
1803            sum.load(Ordering::Relaxed)
1804        });
1805
1806        assert_eq!(result, 36); // 1+2+3+4+5+6+7+8 = 36
1807    }
1808
1809    /// Test that scope_compute works correctly with tokio::select! (cancellation scenario).
1810    /// The future should block on drop until the scope completes.
1811    #[test]
1812    fn test_scope_compute_cancellation_safety() {
1813        use std::sync::atomic::{AtomicBool, Ordering};
1814        use std::sync::Arc;
1815        use std::time::Duration;
1816
1817        let config = test_config();
1818        let runtime = LoomRuntime::from_config(config).unwrap();
1819
1820        runtime.block_on(async {
1821            let scope_completed = Arc::new(AtomicBool::new(false));
1822            let scope_completed_clone = scope_completed.clone();
1823
1824            // Use select! to race scope_compute against a timeout
1825            // The scope_compute will "lose" but should still complete before we continue
1826            let result = tokio::select! {
1827                biased;
1828
1829                _ = tokio::time::sleep(Duration::from_millis(5)) => {
1830                    // Timeout wins - this drops the scope_compute future
1831                    // But drop should block until scope completes
1832                    None
1833                }
1834                result = runtime.scope_compute(|_s| {
1835                    // This takes longer than the timeout
1836                    std::thread::sleep(Duration::from_millis(50));
1837                    scope_completed_clone.store(true, Ordering::Release);
1838                    42
1839                }) => {
1840                    Some(result)
1841                }
1842            };
1843
1844            // The timeout should have won
1845            assert!(result.is_none(), "Timeout should have won the race");
1846
1847            // But critically, the scope should have completed (drop blocked)
1848            assert!(
1849                scope_completed.load(Ordering::Acquire),
1850                "Scope should have completed even though future was cancelled"
1851            );
1852        });
1853    }
1854
1855    /// Test that panics inside scope_compute are properly propagated to the awaiter.
1856    #[test]
1857    #[should_panic(expected = "intentional panic for testing")]
1858    fn test_scope_compute_panic_propagation() {
1859        let config = test_config();
1860        let runtime = LoomRuntime::from_config(config).unwrap();
1861
1862        runtime.block_on(async {
1863            runtime
1864                .scope_compute(|_s| {
1865                    panic!("intentional panic for testing");
1866                })
1867                .await
1868        });
1869    }
1870
1871    /// Test that panics in spawned work inside scope_compute are properly propagated.
1872    #[test]
1873    #[should_panic(expected = "panic in spawned work")]
1874    fn test_scope_compute_spawned_panic_propagation() {
1875        let mut config = test_config();
1876        config.rayon_threads = Some(2);
1877        let runtime = LoomRuntime::from_config(config).unwrap();
1878
1879        runtime.block_on(async {
1880            runtime
1881                .scope_compute(|s| {
1882                    s.spawn(|_| {
1883                        panic!("panic in spawned work");
1884                    });
1885                })
1886                .await
1887        });
1888    }
1889
1890    // =============================================================================
1891    // scope_adaptive Tests
1892    // =============================================================================
1893
1894    #[test]
1895    fn test_scope_adaptive_basic() {
1896        let config = test_config();
1897        let runtime = LoomRuntime::from_config(config).unwrap();
1898
1899        let result = runtime.block_on(async {
1900            runtime
1901                .scope_adaptive(|_s| {
1902                    // Simple computation without spawning
1903                    42
1904                })
1905                .await
1906        });
1907
1908        assert_eq!(result, 42);
1909    }
1910
1911    #[test]
1912    fn test_scope_adaptive_with_hint() {
1913        let config = test_config();
1914        let runtime = LoomRuntime::from_config(config).unwrap();
1915
1916        let result = runtime.block_on(async {
1917            runtime
1918                .scope_adaptive_with_hint(crate::ComputeHint::Low, |_s| 100)
1919                .await
1920        });
1921
1922        assert_eq!(result, 100);
1923    }
1924
1925    #[test]
1926    fn test_scope_adaptive_borrow_local_data() {
1927        let config = test_config();
1928        let runtime = LoomRuntime::from_config(config).unwrap();
1929
1930        let result = runtime.block_on(async {
1931            let data = [1, 2, 3, 4, 5, 6, 7, 8];
1932
1933            let sum = runtime
1934                .scope_adaptive(|_s| {
1935                    // Borrow data inside the scope
1936                    data.iter().sum::<i32>()
1937                })
1938                .await;
1939
1940            // data is still valid after scope_adaptive
1941            assert_eq!(data.len(), 8);
1942            sum
1943        });
1944
1945        assert_eq!(result, 36);
1946    }
1947
1948    #[test]
1949    fn test_scope_adaptive_parallel_with_atomic() {
1950        use std::sync::atomic::{AtomicI32, Ordering};
1951
1952        let mut config = test_config();
1953        config.rayon_threads = Some(2);
1954        let runtime = LoomRuntime::from_config(config).unwrap();
1955
1956        let result = runtime.block_on(async {
1957            let data = [1, 2, 3, 4, 5, 6, 7, 8];
1958            let sum = AtomicI32::new(0);
1959
1960            runtime
1961                .scope_adaptive(|s| {
1962                    let (left, right) = data.split_at(data.len() / 2);
1963                    let sum_ref = &sum;
1964
1965                    s.spawn(move |_| {
1966                        let partial: i32 = left.iter().sum();
1967                        sum_ref.fetch_add(partial, Ordering::Relaxed);
1968                    });
1969                    s.spawn(move |_| {
1970                        let partial: i32 = right.iter().sum();
1971                        sum_ref.fetch_add(partial, Ordering::Relaxed);
1972                    });
1973                })
1974                .await;
1975
1976            sum.load(Ordering::Relaxed)
1977        });
1978
1979        assert_eq!(result, 36);
1980    }
1981
1982    #[test]
1983    fn test_scope_adaptive_records_metrics() {
1984        let config = test_config();
1985        let runtime = LoomRuntime::from_config(config).unwrap();
1986
1987        runtime.block_on(async {
1988            // Run some adaptive tasks
1989            for _ in 0..10 {
1990                runtime.scope_adaptive(|_s| std::hint::black_box(42)).await;
1991            }
1992        });
1993
1994        // Check that metrics were recorded
1995        let metrics = runtime.prometheus_metrics();
1996        let total_decisions = metrics.inline_decisions.get() + metrics.offload_decisions.get();
1997        assert!(
1998            total_decisions >= 10,
1999            "Should have recorded at least 10 decisions, got {}",
2000            total_decisions
2001        );
2002    }
2003
2004    /// Test that panics inside scope_adaptive are properly propagated.
2005    #[test]
2006    #[should_panic(expected = "intentional panic in scope_adaptive")]
2007    fn test_scope_adaptive_panic_propagation() {
2008        let config = test_config();
2009        let runtime = LoomRuntime::from_config(config).unwrap();
2010
2011        runtime.block_on(async {
2012            runtime
2013                .scope_adaptive(|_s| {
2014                    panic!("intentional panic in scope_adaptive");
2015                })
2016                .await
2017        });
2018    }
2019}