loom_rs/runtime.rs
1//! Loom runtime implementation.
2//!
3//! The runtime combines a tokio async runtime with a rayon thread pool,
4//! both configured with CPU pinning.
5//!
6//! # Performance
7//!
8//! This module is designed for zero unnecessary overhead:
9//! - `spawn_async()`: ~10ns overhead (TaskTracker token only)
10//! - `spawn_compute()`: ~100-500ns (cross-thread signaling, 0 bytes after warmup)
11//! - `install()`: ~0ns (zero overhead, direct rayon access)
12//!
13//! # Thread Architecture
14//!
15//! ```text
16//! ┌─────────────────────────────────────────────────────────────┐
17//! │ LoomRuntime │
18//! │ pools: ComputePoolRegistry (per-type lock-free pools) │
19//! │ (One pool per result type, shared across all threads) │
20//! └─────────────────────────────────────────────────────────────┘
21//! │ on_thread_start │ start_handler
22//! ▼ ▼
23//! ┌─────────────────────┐ ┌─────────────────────┐
24//! │ Tokio Workers │ │ Rayon Workers │
25//! │ thread_local! { │ │ thread_local! { │
26//! │ RUNTIME: Weak<> │ │ RUNTIME: Weak<> │
27//! │ } │ │ } │
28//! └─────────────────────┘ └─────────────────────┘
29//! ```
30
31use crate::affinity::{pin_to_cpu, CpuAllocator};
32use crate::bridge::{
33 PooledRayonTask, ScopedCompletion, ScopedComputeFuture, ScopedTaskState, TaskState,
34};
35use crate::config::LoomConfig;
36use crate::context::{clear_current_runtime, set_current_runtime};
37use crate::cpuset::{available_cpus, format_cpuset, parse_and_validate_cpuset};
38use crate::error::{LoomError, Result};
39use crate::mab::{Arm, ComputeHint, Context, FunctionKey, MabKnobs, MabScheduler};
40use crate::metrics::LoomMetrics;
41use crate::pool::ComputePoolRegistry;
42
43use std::future::Future;
44use std::sync::atomic::{AtomicUsize, Ordering};
45use std::sync::{Arc, OnceLock, Weak};
46use std::time::Instant;
47use tokio::sync::Notify;
48use tokio_util::task::TaskTracker;
49use tracing::{debug, info, warn};
50
51/// State for tracking in-flight compute tasks.
52///
53/// Combines the task counter with a notification mechanism for efficient
54/// shutdown waiting (avoids spin loops).
55struct ComputeTaskState {
56 /// Number of tasks currently executing on rayon
57 count: AtomicUsize,
58 /// Notified when count reaches 0
59 notify: Notify,
60}
61
62impl ComputeTaskState {
63 fn new() -> Self {
64 Self {
65 count: AtomicUsize::new(0),
66 notify: Notify::new(),
67 }
68 }
69}
70
71/// Guard for tracking async task metrics.
72///
73/// Panic-safe: task_completed is called even if the future panics.
74struct AsyncMetricsGuard {
75 inner: Arc<LoomRuntimeInner>,
76}
77
78impl AsyncMetricsGuard {
79 fn new(inner: Arc<LoomRuntimeInner>) -> Self {
80 inner.prometheus_metrics.task_started();
81 Self { inner }
82 }
83}
84
85impl Drop for AsyncMetricsGuard {
86 fn drop(&mut self) {
87 self.inner.prometheus_metrics.task_completed();
88 }
89}
90
91/// Guard for tracking compute task state and metrics.
92///
93/// Panic-safe: executes even if the task closure panics.
94///
95/// SAFETY: The state lives in LoomRuntimeInner which outlives all rayon tasks
96/// because block_until_idle waits for compute_tasks to reach 0.
97struct ComputeTaskGuard {
98 state: *const ComputeTaskState,
99 metrics: *const LoomMetrics,
100}
101
102unsafe impl Send for ComputeTaskGuard {}
103
104impl ComputeTaskGuard {
105 /// Create a new guard, tracking submission in MAB metrics.
106 ///
107 /// This should be called BEFORE spawning on rayon.
108 fn new(state: &ComputeTaskState, metrics: &LoomMetrics) -> Self {
109 state.count.fetch_add(1, Ordering::Relaxed);
110 metrics.rayon_submitted();
111 Self {
112 state: state as *const ComputeTaskState,
113 metrics: metrics as *const LoomMetrics,
114 }
115 }
116
117 /// Mark that the rayon task has started executing.
118 ///
119 /// This should be called at the START of the rayon closure.
120 fn started(&self) {
121 // SAFETY: metrics outlives rayon tasks
122 unsafe {
123 (*self.metrics).rayon_started();
124 }
125 }
126}
127
128impl Drop for ComputeTaskGuard {
129 fn drop(&mut self) {
130 // SAFETY: state and metrics outlive rayon tasks due to shutdown waiting
131 unsafe {
132 // Track MAB metrics completion (panic-safe)
133 (*self.metrics).rayon_completed();
134
135 let prev = (*self.state).count.fetch_sub(1, Ordering::Release);
136 if prev == 1 {
137 // Count just went from 1 to 0, notify waiters
138 (*self.state).notify.notify_waiters();
139 }
140 }
141 }
142}
143
144/// A bespoke thread pool runtime combining tokio and rayon with CPU pinning.
145///
146/// The runtime provides:
147/// - A tokio async runtime for I/O-bound work
148/// - A rayon thread pool for CPU-bound parallel work
149/// - Automatic CPU pinning for both runtimes
150/// - A task tracker for graceful shutdown
151/// - Zero-allocation compute spawning after warmup
152///
153/// # Performance Guarantees
154///
155/// | Method | Overhead | Allocations | Tracked |
156/// |--------|----------|-------------|---------|
157/// | `spawn_async()` | ~10ns | Token only | Yes |
158/// | `spawn_compute()` | ~100-500ns | 0 bytes (after warmup) | Yes |
159/// | `install()` | ~0ns | None | No |
160/// | `rayon_pool()` | 0ns | None | No |
161/// | `tokio_handle()` | 0ns | None | No |
162///
163/// # Examples
164///
165/// ```ignore
166/// use loom_rs::LoomBuilder;
167///
168/// let runtime = LoomBuilder::new()
169/// .prefix("myapp")
170/// .tokio_threads(2)
171/// .rayon_threads(6)
172/// .build()?;
173///
174/// runtime.block_on(async {
175/// // Spawn tracked async I/O task
176/// let io_handle = runtime.spawn_async(async {
177/// fetch_data().await
178/// });
179///
180/// // Spawn tracked compute task and await result
181/// let result = runtime.spawn_compute(|| {
182/// expensive_computation()
183/// }).await;
184///
185/// // Zero-overhead parallel iterators (within tracked context)
186/// let processed = runtime.install(|| {
187/// data.par_iter().map(|x| process(x)).collect()
188/// });
189/// });
190///
191/// // Graceful shutdown from main thread
192/// runtime.block_until_idle();
193/// ```
194pub struct LoomRuntime {
195 pub(crate) inner: Arc<LoomRuntimeInner>,
196}
197
198/// Inner state shared with thread-locals.
199///
200/// This is Arc-wrapped and shared with tokio/rayon worker threads via thread-local
201/// storage, enabling `current_runtime()` to work from any managed thread.
202pub(crate) struct LoomRuntimeInner {
203 config: LoomConfig,
204 tokio_runtime: tokio::runtime::Runtime,
205 pub(crate) rayon_pool: rayon::ThreadPool,
206 task_tracker: TaskTracker,
207 /// Track in-flight rayon tasks for graceful shutdown
208 compute_state: ComputeTaskState,
209 /// Per-type object pools for zero-allocation spawn_compute
210 pub(crate) pools: ComputePoolRegistry,
211 /// Number of tokio worker threads
212 pub(crate) tokio_threads: usize,
213 /// Number of rayon worker threads
214 pub(crate) rayon_threads: usize,
215 /// CPUs allocated to tokio workers
216 pub(crate) tokio_cpus: Vec<usize>,
217 /// CPUs allocated to rayon workers
218 pub(crate) rayon_cpus: Vec<usize>,
219 /// Lazily initialized shared MAB scheduler
220 mab_scheduler: OnceLock<Arc<MabScheduler>>,
221 /// MAB knobs configuration
222 pub(crate) mab_knobs: MabKnobs,
223 /// Prometheus metrics - single source of truth for all metrics
224 /// (serves both Prometheus exposition and MAB scheduling)
225 pub(crate) prometheus_metrics: LoomMetrics,
226}
227
228impl LoomRuntime {
229 /// Create a LoomRuntime from an existing inner reference.
230 ///
231 /// This does **not** create a new runtime; it only creates another
232 /// handle that points at the same `LoomRuntimeInner`. As a result,
233 /// multiple `LoomRuntime` values may refer to the same underlying
234 /// runtime state.
235 ///
236 /// This is intended for internal use by `current_runtime()` to wrap the
237 /// thread-local inner reference. Callers must **not** treat the returned
238 /// handle as an independently owned runtime for the purpose of shutdown
239 /// or teardown. Invoking shutdown-related methods from multiple wrappers
240 /// that share the same inner state may lead to unexpected behavior.
241 pub(crate) fn from_inner(inner: Arc<LoomRuntimeInner>) -> Self {
242 Self { inner }
243 }
244
245 /// Create a runtime from a configuration.
246 ///
247 /// This is typically called via `LoomBuilder::build()`.
248 pub(crate) fn from_config(config: LoomConfig) -> Result<Self> {
249 let pool_size = config.compute_pool_size;
250 // Determine available CPUs
251 // Priority: CUDA device cpuset > user cpuset > all available CPUs
252 // Error if both cuda_device and cpuset are specified (mutually exclusive)
253 let cpus = {
254 #[cfg(feature = "cuda")]
255 {
256 // Check for conflicting configuration first
257 if config.cuda_device.is_some() && config.cpuset.is_some() {
258 return Err(LoomError::CudaCpusetConflict);
259 }
260
261 if let Some(ref selector) = config.cuda_device {
262 match crate::cuda::cpuset_for_cuda_device(selector)? {
263 Some(cuda_cpus) => cuda_cpus,
264 None => {
265 // Could not determine CUDA locality, fall back to all CPUs
266 available_cpus()
267 }
268 }
269 } else if let Some(ref cpuset_str) = config.cpuset {
270 parse_and_validate_cpuset(cpuset_str)?
271 } else {
272 available_cpus()
273 }
274 }
275 #[cfg(not(feature = "cuda"))]
276 {
277 if let Some(ref cpuset_str) = config.cpuset {
278 parse_and_validate_cpuset(cpuset_str)?
279 } else {
280 available_cpus()
281 }
282 }
283 };
284
285 if cpus.is_empty() {
286 return Err(LoomError::NoCpusAvailable);
287 }
288
289 let total_cpus = cpus.len();
290 let tokio_threads = config.effective_tokio_threads();
291 let rayon_threads = config.effective_rayon_threads(total_cpus);
292
293 // Validate we have enough CPUs
294 let total_threads = tokio_threads + rayon_threads;
295 if total_threads > total_cpus {
296 return Err(LoomError::InsufficientCpus {
297 requested: total_threads,
298 available: total_cpus,
299 });
300 }
301
302 // Split CPUs between tokio and rayon
303 let (tokio_cpus, rayon_cpus) = cpus.split_at(tokio_threads.min(cpus.len()));
304 let tokio_cpus = tokio_cpus.to_vec();
305 let rayon_cpus = if rayon_cpus.is_empty() {
306 // If we don't have dedicated rayon CPUs, share with tokio
307 tokio_cpus.clone()
308 } else {
309 rayon_cpus.to_vec()
310 };
311
312 info!(
313 prefix = %config.prefix,
314 tokio_threads,
315 rayon_threads,
316 total_cpus,
317 pool_size,
318 "building loom runtime"
319 );
320
321 // Use Arc<str> for prefix to avoid cloning on each thread start
322 let prefix: Arc<str> = config.prefix.as_str().into();
323
324 // Create the inner runtime first (without tokio/rayon)
325 // We'll use a two-phase approach with OnceCell-like pattern
326 let inner = Arc::new_cyclic(|weak: &Weak<LoomRuntimeInner>| {
327 let weak_clone = weak.clone();
328
329 // Build tokio runtime with thread-local injection
330 let tokio_runtime = Self::build_tokio_runtime(
331 &prefix,
332 tokio_threads,
333 tokio_cpus.clone(),
334 weak_clone.clone(),
335 )
336 .expect("failed to build tokio runtime");
337
338 // Build rayon pool with thread-local injection
339 let rayon_pool =
340 Self::build_rayon_pool(&prefix, rayon_threads, rayon_cpus.clone(), weak_clone)
341 .expect("failed to build rayon pool");
342
343 // Extract MAB knobs, using defaults if not configured
344 let mab_knobs = config.mab_knobs.clone().unwrap_or_default();
345
346 // Create Prometheus metrics with the runtime's prefix
347 let prometheus_metrics = LoomMetrics::with_prefix(&config.prefix);
348
349 // Register with provided registry if available
350 if let Some(ref registry) = config.prometheus_registry {
351 if let Err(e) = prometheus_metrics.register(registry) {
352 warn!(%e, "failed to register prometheus metrics");
353 }
354 }
355
356 LoomRuntimeInner {
357 config,
358 tokio_runtime,
359 rayon_pool,
360 task_tracker: TaskTracker::new(),
361 compute_state: ComputeTaskState::new(),
362 pools: ComputePoolRegistry::new(pool_size),
363 tokio_threads,
364 rayon_threads,
365 tokio_cpus,
366 rayon_cpus,
367 mab_scheduler: OnceLock::new(),
368 mab_knobs,
369 prometheus_metrics,
370 }
371 });
372
373 Ok(Self { inner })
374 }
375
376 fn build_tokio_runtime(
377 prefix: &Arc<str>,
378 num_threads: usize,
379 cpus: Vec<usize>,
380 runtime_weak: Weak<LoomRuntimeInner>,
381 ) -> Result<tokio::runtime::Runtime> {
382 let allocator = Arc::new(CpuAllocator::new(cpus));
383 let prefix_clone = Arc::clone(prefix);
384
385 // Thread name counter
386 let thread_counter = Arc::new(AtomicUsize::new(0));
387 let name_prefix = Arc::clone(prefix);
388
389 let start_weak = runtime_weak.clone();
390 let start_allocator = allocator.clone();
391 let start_prefix = prefix_clone.clone();
392
393 let runtime = tokio::runtime::Builder::new_multi_thread()
394 .worker_threads(num_threads)
395 .thread_name_fn(move || {
396 let id = thread_counter.fetch_add(1, Ordering::SeqCst);
397 format!("{}-tokio-{:04}", name_prefix, id)
398 })
399 .on_thread_start(move || {
400 // Pin CPU
401 let cpu_id = start_allocator.allocate();
402 if let Err(e) = pin_to_cpu(cpu_id) {
403 warn!(%e, %start_prefix, cpu_id, "failed to pin tokio thread");
404 } else {
405 debug!(cpu_id, %start_prefix, "pinned tokio thread to CPU");
406 }
407
408 // Inject runtime reference into thread-local
409 set_current_runtime(start_weak.clone());
410 })
411 .on_thread_stop(|| {
412 clear_current_runtime();
413 })
414 .enable_all()
415 .build()?;
416
417 Ok(runtime)
418 }
419
420 fn build_rayon_pool(
421 prefix: &Arc<str>,
422 num_threads: usize,
423 cpus: Vec<usize>,
424 runtime_weak: Weak<LoomRuntimeInner>,
425 ) -> Result<rayon::ThreadPool> {
426 let allocator = Arc::new(CpuAllocator::new(cpus));
427 let name_prefix = Arc::clone(prefix);
428
429 let start_weak = runtime_weak.clone();
430 let start_allocator = allocator.clone();
431 let start_prefix = Arc::clone(prefix);
432
433 let pool = rayon::ThreadPoolBuilder::new()
434 .num_threads(num_threads)
435 .thread_name(move |i| format!("{}-rayon-{:04}", name_prefix, i))
436 .start_handler(move |thread_index| {
437 // Pin CPU
438 let cpu_id = start_allocator.allocate();
439 debug!(thread_index, cpu_id, %start_prefix, "rayon thread starting");
440 if let Err(e) = pin_to_cpu(cpu_id) {
441 warn!(%e, %start_prefix, cpu_id, thread_index, "failed to pin rayon thread");
442 }
443
444 // Inject runtime reference into thread-local
445 set_current_runtime(start_weak.clone());
446 })
447 .exit_handler(|_thread_index| {
448 clear_current_runtime();
449 })
450 .build()?;
451
452 Ok(pool)
453 }
454
455 /// Get the resolved configuration.
456 pub fn config(&self) -> &LoomConfig {
457 &self.inner.config
458 }
459
460 /// Get the tokio runtime handle.
461 ///
462 /// This can be used to spawn untracked tasks or enter the runtime context.
463 /// For tracked async tasks, prefer `spawn_async()`.
464 ///
465 /// # Performance
466 ///
467 /// Zero overhead - returns a reference.
468 pub fn tokio_handle(&self) -> &tokio::runtime::Handle {
469 self.inner.tokio_runtime.handle()
470 }
471
472 /// Get the rayon thread pool.
473 ///
474 /// This can be used to execute parallel iterators or spawn untracked work directly.
475 /// For tracked compute tasks, prefer `spawn_compute()`.
476 /// For zero-overhead parallel iterators, prefer `install()`.
477 ///
478 /// # Performance
479 ///
480 /// Zero overhead - returns a reference.
481 pub fn rayon_pool(&self) -> &rayon::ThreadPool {
482 &self.inner.rayon_pool
483 }
484
485 /// Get the task tracker for graceful shutdown.
486 ///
487 /// Use this to track spawned tasks and wait for them to complete.
488 pub fn task_tracker(&self) -> &TaskTracker {
489 &self.inner.task_tracker
490 }
491
492 /// Block on a future using the tokio runtime.
493 ///
494 /// This is the main entry point for running async code from the main thread.
495 /// The current runtime is available via `loom_rs::current_runtime()` within
496 /// the block_on scope.
497 ///
498 /// # Examples
499 ///
500 /// ```ignore
501 /// runtime.block_on(async {
502 /// // Async code here
503 /// // loom_rs::current_runtime() works here
504 /// });
505 /// ```
506 pub fn block_on<F: Future>(&self, f: F) -> F::Output {
507 // Set current runtime for the main thread during block_on
508 set_current_runtime(Arc::downgrade(&self.inner));
509 let result = self.inner.tokio_runtime.block_on(f);
510 clear_current_runtime();
511 result
512 }
513
514 /// Spawn a tracked async task on tokio.
515 ///
516 /// The task is tracked for graceful shutdown via `block_until_idle()`.
517 ///
518 /// # Performance
519 ///
520 /// Overhead: ~10ns (TaskTracker token only).
521 ///
522 /// # Examples
523 ///
524 /// ```ignore
525 /// runtime.block_on(async {
526 /// let handle = runtime.spawn_async(async {
527 /// // I/O-bound async work
528 /// fetch_data().await
529 /// });
530 ///
531 /// let result = handle.await.unwrap();
532 /// });
533 /// ```
534 #[inline]
535 pub fn spawn_async<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
536 where
537 F: Future + Send + 'static,
538 F::Output: Send + 'static,
539 {
540 // Track task for MAB metrics (panic-safe via guard)
541 let metrics_guard = AsyncMetricsGuard::new(Arc::clone(&self.inner));
542 let token = self.inner.task_tracker.token();
543 self.inner.tokio_runtime.spawn(async move {
544 let _tracker = token;
545 let _metrics = metrics_guard;
546 future.await
547 })
548 }
549
550 /// Spawn CPU-bound work on rayon and await the result.
551 ///
552 /// The task is tracked for graceful shutdown via `block_until_idle()`.
553 /// Automatically uses per-type object pools for zero allocation after warmup.
554 ///
555 /// # Performance
556 ///
557 /// | State | Allocations | Overhead |
558 /// |-------|-------------|----------|
559 /// | Pool hit | 0 bytes | ~100-500ns |
560 /// | Pool miss | ~32 bytes | ~100-500ns |
561 /// | First call per type | Pool + state | ~1µs |
562 ///
563 /// For zero-overhead parallel iterators (within an already-tracked context),
564 /// use `install()` instead.
565 ///
566 /// # Examples
567 ///
568 /// ```ignore
569 /// runtime.block_on(async {
570 /// let result = runtime.spawn_compute(|| {
571 /// // CPU-intensive work
572 /// expensive_computation()
573 /// }).await;
574 /// });
575 /// ```
576 #[inline]
577 pub async fn spawn_compute<F, R>(&self, f: F) -> R
578 where
579 F: FnOnce() -> R + Send + 'static,
580 R: Send + 'static,
581 {
582 self.inner.spawn_compute(f).await
583 }
584
585 /// Spawn work with adaptive inline/offload decision.
586 ///
587 /// Uses MAB (Multi-Armed Bandit) to learn whether this function type should
588 /// run inline on tokio or offload to rayon. Good for handler patterns where
589 /// work duration varies by input.
590 ///
591 /// Unlike `spawn_compute()` which always offloads, this adaptively chooses
592 /// based on learned behavior and current system pressure.
593 ///
594 /// # Performance
595 ///
596 /// | Scenario | Behavior | Overhead |
597 /// |----------|----------|----------|
598 /// | Fast work | Inlines after learning | ~100ns (decision only) |
599 /// | Slow work | Offloads after learning | ~100-500ns (+ offload) |
600 /// | Cold start | Explores both arms | Variable |
601 ///
602 /// # Examples
603 ///
604 /// ```ignore
605 /// runtime.block_on(async {
606 /// // MAB will learn whether this is fast or slow
607 /// let result = runtime.spawn_adaptive(|| {
608 /// process_item(item)
609 /// }).await;
610 /// });
611 /// ```
612 pub async fn spawn_adaptive<F, R>(&self, f: F) -> R
613 where
614 F: FnOnce() -> R + Send + 'static,
615 R: Send + 'static,
616 {
617 self.spawn_adaptive_with_hint(ComputeHint::Unknown, f).await
618 }
619
620 /// Spawn with hint for cold-start guidance.
621 ///
622 /// The hint helps the scheduler make better initial decisions before it has
623 /// learned the actual execution time of this function type.
624 ///
625 /// # Hints
626 ///
627 /// - `ComputeHint::Low` - Expected < 50µs (likely inline-safe)
628 /// - `ComputeHint::Medium` - Expected 50-500µs (borderline)
629 /// - `ComputeHint::High` - Expected > 500µs (should test offload early)
630 /// - `ComputeHint::Unknown` - No hint (default exploration)
631 ///
632 /// # Examples
633 ///
634 /// ```ignore
635 /// use loom_rs::ComputeHint;
636 ///
637 /// runtime.block_on(async {
638 /// // Hint that this is likely slow work
639 /// let result = runtime.spawn_adaptive_with_hint(
640 /// ComputeHint::High,
641 /// || expensive_computation()
642 /// ).await;
643 /// });
644 /// ```
645 pub async fn spawn_adaptive_with_hint<F, R>(&self, hint: ComputeHint, f: F) -> R
646 where
647 F: FnOnce() -> R + Send + 'static,
648 R: Send + 'static,
649 {
650 let ctx = self.collect_context();
651 let key = FunctionKey::from_type::<F>();
652 let scheduler = self.mab_scheduler();
653
654 let (id, arm) = scheduler.choose_with_hint(key, &ctx, hint);
655 let start = Instant::now();
656
657 let result = match arm {
658 Arm::InlineTokio => f(),
659 Arm::OffloadRayon => self.inner.spawn_compute(f).await,
660 };
661
662 let elapsed_us = start.elapsed().as_secs_f64() * 1_000_000.0;
663 scheduler.finish(id, elapsed_us, Some(elapsed_us));
664 result
665 }
666
667 /// Execute work on rayon with zero overhead (sync, blocking).
668 ///
669 /// This installs the rayon pool for the current scope, allowing direct use
670 /// of rayon's parallel iterators.
671 ///
672 /// **NOT tracked** - use within an already-tracked task (e.g., inside
673 /// `spawn_async` or `spawn_compute`) for proper shutdown tracking.
674 ///
675 /// # Performance
676 ///
677 /// Zero overhead - direct rayon access.
678 ///
679 /// # Examples
680 ///
681 /// ```ignore
682 /// runtime.block_on(async {
683 /// // This is a tracked context (we're in block_on)
684 /// let processed = runtime.install(|| {
685 /// use rayon::prelude::*;
686 /// data.par_iter().map(|x| process(x)).collect::<Vec<_>>()
687 /// });
688 /// });
689 /// ```
690 #[inline]
691 pub fn install<F, R>(&self, f: F) -> R
692 where
693 F: FnOnce() -> R + Send,
694 R: Send,
695 {
696 self.inner.rayon_pool.install(f)
697 }
698
699 /// Execute a scoped parallel computation, allowing borrowed data.
700 ///
701 /// Unlike `spawn_compute()` which requires `'static` bounds, `scope_compute`
702 /// allows borrowing local variables from the async context for use in parallel
703 /// work. This is safe because:
704 ///
705 /// 1. The `.await` suspends the async task
706 /// 2. `rayon::scope` blocks until ALL spawned work completes
707 /// 3. Only then does the future resolve
708 /// 4. Therefore, borrowed references remain valid throughout
709 ///
710 /// # Performance
711 ///
712 /// | Aspect | Value |
713 /// |--------|-------|
714 /// | Allocation | ~96 bytes per call (not pooled) |
715 /// | Overhead | Comparable to `spawn_compute()` |
716 ///
717 /// State cannot be pooled because the result type R may contain borrowed
718 /// references tied to the calling scope. Benchmarks show performance is
719 /// within noise of `spawn_compute()` - the overhead is dominated by
720 /// cross-thread communication, not state management.
721 ///
722 /// # Cancellation Safety
723 ///
724 /// If the future is dropped before completion (e.g., via `select!` or timeout),
725 /// the drop will **block** until the rayon scope finishes. This is necessary
726 /// to prevent use-after-free of borrowed data. In normal usage (awaiting to
727 /// completion), there is no blocking overhead.
728 ///
729 /// # Panic Safety
730 ///
731 /// If the closure or any spawned work panics, the panic is captured and
732 /// re-raised when the future is polled. This ensures panics propagate to
733 /// the async context as expected.
734 ///
735 /// # Leaking the Future
736 ///
737 /// **Important:** Do not leak this future via `std::mem::forget` or similar.
738 /// The safety of borrowed data relies on the future's `Drop` implementation
739 /// blocking until the rayon scope completes. Leaking the future would allow
740 /// the rayon work to continue accessing borrowed data after it goes out of
741 /// scope, leading to undefined behavior. This is a known limitation shared
742 /// by other scoped async APIs (e.g., `async-scoped`).
743 ///
744 /// # Examples
745 ///
746 /// ```ignore
747 /// use std::sync::atomic::{AtomicI32, Ordering};
748 ///
749 /// runtime.block_on(async {
750 /// let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
751 /// let sum = AtomicI32::new(0);
752 ///
753 /// // Borrow `data` and `sum` for parallel processing
754 /// runtime.scope_compute(|s| {
755 /// let (left, right) = data.split_at(data.len() / 2);
756 ///
757 /// s.spawn(|_| {
758 /// sum.fetch_add(left.iter().sum(), Ordering::Relaxed);
759 /// });
760 /// s.spawn(|_| {
761 /// sum.fetch_add(right.iter().sum(), Ordering::Relaxed);
762 /// });
763 /// }).await;
764 ///
765 /// // `data` and `sum` are still valid here
766 /// println!("Sum of {:?} = {}", data, sum.load(Ordering::Relaxed));
767 /// });
768 /// ```
769 pub async fn scope_compute<'env, F, R>(&self, f: F) -> R
770 where
771 F: FnOnce(&rayon::Scope<'env>) -> R + Send + 'env,
772 R: Send + 'env,
773 {
774 self.inner.scope_compute(f).await
775 }
776
777 /// Execute scoped work with adaptive sync/async decision.
778 ///
779 /// Uses MAB (Multi-Armed Bandit) to learn whether this function type should:
780 /// - Run synchronously via `install()` (blocks tokio worker, lower overhead)
781 /// - Run asynchronously via `scope_compute()` (frees tokio worker, higher overhead)
782 ///
783 /// Unlike `spawn_adaptive()` which chooses between inline execution and rayon offload,
784 /// `scope_adaptive` always uses `rayon::scope` (needed for parallel spawning with
785 /// borrowed data), but chooses whether to block the tokio worker or use the async bridge.
786 ///
787 /// # Performance
788 ///
789 /// | Scenario | Behavior | Overhead |
790 /// |----------|----------|----------|
791 /// | Fast scoped work | Sync after learning | ~0ns (install overhead only) |
792 /// | Slow scoped work | Async after learning | ~100-500ns (+ bridge) |
793 /// | Cold start | Explores both arms | Variable |
794 ///
795 /// # When to Use
796 ///
797 /// Use `scope_adaptive` when:
798 /// - You need to borrow local data (`'env` lifetime)
799 /// - You want parallel spawning via `rayon::scope`
800 /// - Work duration varies and you want the runtime to learn the best strategy
801 ///
802 /// Use `scope_compute` directly when:
803 /// - Work is always slow (> 500µs)
804 /// - You want consistent async behavior
805 ///
806 /// # Examples
807 ///
808 /// ```ignore
809 /// use std::sync::atomic::{AtomicI32, Ordering};
810 ///
811 /// runtime.block_on(async {
812 /// let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
813 /// let sum = AtomicI32::new(0);
814 ///
815 /// // MAB learns whether this is fast or slow scoped work
816 /// runtime.scope_adaptive(|s| {
817 /// let (left, right) = data.split_at(data.len() / 2);
818 /// let sum_ref = ∑
819 ///
820 /// s.spawn(move |_| {
821 /// sum_ref.fetch_add(left.iter().sum(), Ordering::Relaxed);
822 /// });
823 /// s.spawn(move |_| {
824 /// sum_ref.fetch_add(right.iter().sum(), Ordering::Relaxed);
825 /// });
826 /// }).await;
827 ///
828 /// println!("Sum: {}", sum.load(Ordering::Relaxed));
829 /// });
830 /// ```
831 pub async fn scope_adaptive<'env, F, R>(&self, f: F) -> R
832 where
833 F: FnOnce(&rayon::Scope<'env>) -> R + Send + 'env,
834 R: Send + 'env,
835 {
836 self.scope_adaptive_with_hint(ComputeHint::Unknown, f).await
837 }
838
839 /// Execute scoped work with hint for cold-start guidance.
840 ///
841 /// The hint helps the scheduler make better initial decisions before it has
842 /// learned the actual execution time of this function type.
843 ///
844 /// # Hints
845 ///
846 /// - `ComputeHint::Low` - Expected < 50µs (likely sync-safe)
847 /// - `ComputeHint::Medium` - Expected 50-500µs (borderline)
848 /// - `ComputeHint::High` - Expected > 500µs (should test async early)
849 /// - `ComputeHint::Unknown` - No hint (default exploration)
850 ///
851 /// # Examples
852 ///
853 /// ```ignore
854 /// use loom_rs::ComputeHint;
855 /// use std::sync::atomic::{AtomicI32, Ordering};
856 ///
857 /// runtime.block_on(async {
858 /// let data = vec![1, 2, 3, 4];
859 /// let sum = AtomicI32::new(0);
860 ///
861 /// // Hint that this is likely fast work
862 /// runtime.scope_adaptive_with_hint(ComputeHint::Low, |s| {
863 /// let sum_ref = ∑
864 /// for &val in &data {
865 /// s.spawn(move |_| {
866 /// sum_ref.fetch_add(val, Ordering::Relaxed);
867 /// });
868 /// }
869 /// }).await;
870 /// });
871 /// ```
872 pub async fn scope_adaptive_with_hint<'env, F, R>(&self, hint: ComputeHint, f: F) -> R
873 where
874 F: FnOnce(&rayon::Scope<'env>) -> R + Send + 'env,
875 R: Send + 'env,
876 {
877 let ctx = self.collect_context();
878 // Use from_type_name since F may capture non-'static references
879 let key = FunctionKey::from_type_name::<F>();
880 let scheduler = self.mab_scheduler();
881
882 let (id, arm) = scheduler.choose_with_hint(key, &ctx, hint);
883 let start = Instant::now();
884
885 let result = match arm {
886 Arm::InlineTokio => {
887 // Sync: blocks tokio but no async bridge overhead
888 self.install(|| rayon::scope(|s| f(s)))
889 }
890 Arm::OffloadRayon => {
891 // Async: frees tokio worker during execution
892 self.scope_compute(f).await
893 }
894 };
895
896 let elapsed_us = start.elapsed().as_secs_f64() * 1_000_000.0;
897 scheduler.finish(id, elapsed_us, Some(elapsed_us));
898 result
899 }
900
901 /// Stop accepting new tasks.
902 ///
903 /// After calling this, `spawn_async()` and `spawn_compute()` will still
904 /// work, but the shutdown process has begun. Use `is_idle()` or
905 /// `wait_for_shutdown()` to check/wait for completion.
906 pub fn shutdown(&self) {
907 self.inner.task_tracker.close();
908 }
909
910 /// Check if all tracked tasks have completed.
911 ///
912 /// Returns `true` if `shutdown()` has been called and all tracked async
913 /// tasks and compute tasks have finished.
914 ///
915 /// # Performance
916 ///
917 /// Zero overhead - single atomic load.
918 #[inline]
919 pub fn is_idle(&self) -> bool {
920 self.inner.task_tracker.is_closed()
921 && self.inner.task_tracker.is_empty()
922 && self.inner.compute_state.count.load(Ordering::Acquire) == 0
923 }
924
925 /// Get the number of compute tasks currently in flight.
926 ///
927 /// Useful for debugging shutdown issues or monitoring workload.
928 ///
929 /// # Example
930 ///
931 /// ```ignore
932 /// if runtime.compute_tasks_in_flight() > 0 {
933 /// tracing::warn!("Still waiting for {} compute tasks",
934 /// runtime.compute_tasks_in_flight());
935 /// }
936 /// ```
937 #[inline]
938 pub fn compute_tasks_in_flight(&self) -> usize {
939 self.inner.compute_state.count.load(Ordering::Relaxed)
940 }
941
942 /// Wait for all tracked tasks to complete (async).
943 ///
944 /// Call from within `block_on()`. Requires `shutdown()` to be called first,
945 /// otherwise this will wait forever.
946 ///
947 /// # Examples
948 ///
949 /// ```ignore
950 /// runtime.block_on(async {
951 /// runtime.spawn_async(work());
952 /// runtime.shutdown();
953 /// runtime.wait_for_shutdown().await;
954 /// });
955 /// ```
956 pub async fn wait_for_shutdown(&self) {
957 self.inner.task_tracker.wait().await;
958
959 // Wait for compute tasks efficiently (no spin loop)
960 let mut logged = false;
961 loop {
962 let count = self.inner.compute_state.count.load(Ordering::Acquire);
963 if count == 0 {
964 break;
965 }
966 if !logged {
967 debug!(count, "waiting for compute tasks to complete");
968 logged = true;
969 }
970 self.inner.compute_state.notify.notified().await;
971 }
972 }
973
974 /// Block until all tracked tasks complete (from main thread).
975 ///
976 /// This is the primary shutdown method. It:
977 /// 1. Calls `shutdown()` to close the task tracker
978 /// 2. Waits for all tracked async and compute tasks to finish
979 ///
980 /// # Examples
981 ///
982 /// ```ignore
983 /// runtime.block_on(async {
984 /// runtime.spawn_async(background_work());
985 /// runtime.spawn_compute(|| cpu_work());
986 /// });
987 ///
988 /// // Graceful shutdown from main thread
989 /// runtime.block_until_idle();
990 /// ```
991 pub fn block_until_idle(&self) {
992 self.shutdown();
993 self.block_on(self.wait_for_shutdown());
994 }
995
996 /// Get the shared MAB scheduler for handler patterns.
997 ///
998 /// The scheduler is lazily initialized on first call. Use this when you
999 /// need to make manual scheduling decisions in handler code.
1000 ///
1001 /// # Example
1002 ///
1003 /// ```ignore
1004 /// use loom_rs::mab::{FunctionKey, Arm};
1005 ///
1006 /// let sched = runtime.mab_scheduler();
1007 /// let key = FunctionKey::from_type::<MyHandler>();
1008 /// let ctx = runtime.collect_context();
1009 ///
1010 /// let (id, arm) = sched.choose(key, &ctx);
1011 /// let result = match arm {
1012 /// Arm::InlineTokio => my_work(),
1013 /// Arm::OffloadRayon => runtime.block_on(async {
1014 /// runtime.spawn_compute(|| my_work()).await
1015 /// }),
1016 /// };
1017 /// sched.finish(id, elapsed_us, Some(fn_us));
1018 /// ```
1019 pub fn mab_scheduler(&self) -> Arc<MabScheduler> {
1020 self.inner
1021 .mab_scheduler
1022 .get_or_init(|| {
1023 Arc::new(MabScheduler::with_metrics(
1024 self.inner.mab_knobs.clone(),
1025 self.inner.prometheus_metrics.clone(),
1026 ))
1027 })
1028 .clone()
1029 }
1030
1031 /// Collect current runtime context for MAB scheduling decisions.
1032 ///
1033 /// Returns a snapshot of current metrics including inflight tasks,
1034 /// spawn rate, and queue depth.
1035 pub fn collect_context(&self) -> Context {
1036 self.inner.prometheus_metrics.collect_context(
1037 self.inner.tokio_threads as u32,
1038 self.inner.rayon_threads as u32,
1039 )
1040 }
1041
1042 /// Get the number of tokio worker threads.
1043 pub fn tokio_threads(&self) -> usize {
1044 self.inner.tokio_threads
1045 }
1046
1047 /// Get the number of rayon threads.
1048 pub fn rayon_threads(&self) -> usize {
1049 self.inner.rayon_threads
1050 }
1051
1052 /// Get the Prometheus metrics.
1053 ///
1054 /// The metrics are always collected (zero overhead atomic operations).
1055 /// If a Prometheus registry was provided via `LoomBuilder::prometheus_registry()`,
1056 /// the metrics are also registered for exposition.
1057 pub fn prometheus_metrics(&self) -> &LoomMetrics {
1058 &self.inner.prometheus_metrics
1059 }
1060
1061 /// Get the CPUs allocated to tokio workers.
1062 pub fn tokio_cpus(&self) -> &[usize] {
1063 &self.inner.tokio_cpus
1064 }
1065
1066 /// Get the CPUs allocated to rayon workers.
1067 pub fn rayon_cpus(&self) -> &[usize] {
1068 &self.inner.rayon_cpus
1069 }
1070}
1071
1072impl LoomRuntimeInner {
1073 /// Spawn CPU-bound work on rayon and await the result.
1074 ///
1075 /// Uses per-type object pools for zero allocation after warmup.
1076 #[inline]
1077 pub async fn spawn_compute<F, R>(self: &Arc<Self>, f: F) -> R
1078 where
1079 F: FnOnce() -> R + Send + 'static,
1080 R: Send + 'static,
1081 {
1082 let pool = self.pools.get_or_create::<R>();
1083
1084 // Try to get state from pool, or allocate new
1085 let state = pool.pop().unwrap_or_else(|| Arc::new(TaskState::new()));
1086
1087 // Create the pooled task
1088 let (task, completion, state_for_return) = PooledRayonTask::new(state);
1089
1090 // Create guard BEFORE spawning - it increments counter and tracks MAB metrics
1091 let guard = ComputeTaskGuard::new(&self.compute_state, &self.prometheus_metrics);
1092
1093 self.rayon_pool.spawn(move || {
1094 // Track rayon task start for queue depth calculation
1095 guard.started();
1096
1097 // Execute work inside guard scope so counter decrements BEFORE completing.
1098 // This ensures the async future sees count=0 when it wakes up.
1099 let result = {
1100 let _guard = guard;
1101 f()
1102 };
1103 completion.complete(result);
1104 });
1105
1106 let result = task.await;
1107
1108 // Return state to pool for reuse
1109 state_for_return.reset();
1110 pool.push(state_for_return);
1111
1112 result
1113 }
1114
1115 /// Execute a scoped parallel computation with borrowed data.
1116 ///
1117 /// # Safety Argument
1118 ///
1119 /// The lifetime erasure via transmute is sound because:
1120 /// 1. The async task is suspended at `.await`
1121 /// 2. The future only completes after `rayon::scope` returns
1122 /// 3. `rayon::scope` blocks until ALL spawned work completes
1123 /// 4. Therefore, `'env` references outlive all accesses to borrowed data
1124 ///
1125 /// This is the same safety argument used by `std::thread::scope`.
1126 pub async fn scope_compute<'env, F, R>(self: &Arc<Self>, f: F) -> R
1127 where
1128 F: FnOnce(&rayon::Scope<'env>) -> R + Send + 'env,
1129 R: Send + 'env,
1130 {
1131 // SAFETY: Lifetime erasure is sound because:
1132 // 1. The future holds the async task suspended at .await
1133 // 2. Future only completes after rayon::scope returns
1134 // 3. rayon::scope blocks until all spawned work completes
1135 // 4. Therefore 'env outlives all accesses to borrowed data
1136 //
1137 // Additionally, if the future is dropped before completion (cancellation),
1138 // ScopedComputeFuture::drop blocks until the scope finishes, maintaining
1139 // the safety invariant.
1140
1141 let state = Arc::new(ScopedTaskState::<R>::new());
1142 let future = ScopedComputeFuture::new(state.clone());
1143 let completion = ScopedCompletion::new(state);
1144
1145 // Create guard BEFORE spawning - it increments counter and tracks MAB metrics
1146 let guard = ComputeTaskGuard::new(&self.compute_state, &self.prometheus_metrics);
1147
1148 // Create a closure that captures f, completion, and guard.
1149 // We use catch_unwind to ensure completion is always signaled, even on panic.
1150 let work = move || {
1151 guard.started();
1152 let result = {
1153 let _guard = guard;
1154 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| rayon::scope(|s| f(s))))
1155 };
1156 match result {
1157 Ok(value) => completion.complete(value),
1158 Err(payload) => completion.complete_with_panic(payload),
1159 }
1160 };
1161
1162 // SAFETY: We erase the lifetime to 'static here. This is safe because:
1163 // 1. The ScopedComputeFuture we return will block in Drop if the work
1164 // hasn't completed (cancellation safety)
1165 // 2. Normal await returns only after the work completes
1166 // 3. Therefore, all 'env references remain valid for the duration of
1167 // the work execution
1168 let erased: Box<dyn FnOnce() + Send + 'static> = unsafe {
1169 std::mem::transmute::<Box<dyn FnOnce() + Send + 'env>, Box<dyn FnOnce() + Send + 'static>>(
1170 Box::new(work),
1171 )
1172 };
1173
1174 self.rayon_pool.spawn(move || {
1175 erased();
1176 });
1177
1178 future.await
1179 }
1180}
1181
1182impl std::fmt::Debug for LoomRuntime {
1183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1184 f.debug_struct("LoomRuntime")
1185 .field("config", &self.inner.config)
1186 .field(
1187 "compute_tasks_in_flight",
1188 &self.inner.compute_state.count.load(Ordering::Relaxed),
1189 )
1190 .finish_non_exhaustive()
1191 }
1192}
1193
1194impl std::fmt::Display for LoomRuntime {
1195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1196 write!(
1197 f,
1198 "LoomRuntime[{}]: tokio({}, cpus={}) rayon({}, cpus={})",
1199 self.inner.config.prefix,
1200 self.inner.tokio_threads,
1201 format_cpuset(&self.inner.tokio_cpus),
1202 self.inner.rayon_threads,
1203 format_cpuset(&self.inner.rayon_cpus),
1204 )
1205 }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use super::*;
1211 use crate::pool::DEFAULT_POOL_SIZE;
1212
1213 fn test_config() -> LoomConfig {
1214 LoomConfig {
1215 prefix: "test".to_string(),
1216 cpuset: None,
1217 tokio_threads: Some(1),
1218 rayon_threads: Some(1),
1219 compute_pool_size: DEFAULT_POOL_SIZE,
1220 #[cfg(feature = "cuda")]
1221 cuda_device: None,
1222 mab_knobs: None,
1223 calibration: None,
1224 prometheus_registry: None,
1225 }
1226 }
1227
1228 #[test]
1229 fn test_runtime_creation() {
1230 let config = test_config();
1231 let runtime = LoomRuntime::from_config(config).unwrap();
1232 assert_eq!(runtime.config().prefix, "test");
1233 }
1234
1235 #[test]
1236 fn test_block_on() {
1237 let config = test_config();
1238 let runtime = LoomRuntime::from_config(config).unwrap();
1239
1240 let result = runtime.block_on(async { 42 });
1241 assert_eq!(result, 42);
1242 }
1243
1244 #[test]
1245 fn test_spawn_compute() {
1246 let config = test_config();
1247 let runtime = LoomRuntime::from_config(config).unwrap();
1248
1249 let result =
1250 runtime.block_on(async { runtime.spawn_compute(|| (0..100).sum::<i32>()).await });
1251 assert_eq!(result, 4950);
1252 }
1253
1254 #[test]
1255 fn test_spawn_async() {
1256 let config = test_config();
1257 let runtime = LoomRuntime::from_config(config).unwrap();
1258
1259 let result = runtime.block_on(async {
1260 let handle = runtime.spawn_async(async { 42 });
1261 handle.await.unwrap()
1262 });
1263 assert_eq!(result, 42);
1264 }
1265
1266 #[test]
1267 fn test_install() {
1268 let config = test_config();
1269 let runtime = LoomRuntime::from_config(config).unwrap();
1270
1271 let result = runtime.install(|| {
1272 use rayon::prelude::*;
1273 (0..100).into_par_iter().sum::<i32>()
1274 });
1275 assert_eq!(result, 4950);
1276 }
1277
1278 #[test]
1279 fn test_shutdown_and_idle() {
1280 let config = test_config();
1281 let runtime = LoomRuntime::from_config(config).unwrap();
1282
1283 // Initially not idle (tracker not closed)
1284 assert!(!runtime.is_idle());
1285
1286 // After shutdown with no tasks, should be idle
1287 runtime.shutdown();
1288 assert!(runtime.is_idle());
1289 }
1290
1291 #[test]
1292 fn test_block_until_idle() {
1293 let config = test_config();
1294 let runtime = LoomRuntime::from_config(config).unwrap();
1295
1296 runtime.block_on(async {
1297 runtime.spawn_async(async { 42 });
1298 runtime.spawn_compute(|| 100).await;
1299 });
1300
1301 runtime.block_until_idle();
1302 assert!(runtime.is_idle());
1303 }
1304
1305 #[test]
1306 fn test_insufficient_cpus_error() {
1307 let mut config = test_config();
1308 config.cpuset = Some("0".to_string()); // Only 1 CPU
1309 config.tokio_threads = Some(2);
1310 config.rayon_threads = Some(2);
1311
1312 let result = LoomRuntime::from_config(config);
1313 assert!(matches!(result, Err(LoomError::InsufficientCpus { .. })));
1314 }
1315
1316 #[test]
1317 fn test_current_runtime_in_block_on() {
1318 let config = test_config();
1319 let runtime = LoomRuntime::from_config(config).unwrap();
1320
1321 runtime.block_on(async {
1322 // current_runtime should work inside block_on
1323 let current = crate::context::current_runtime();
1324 assert!(current.is_some());
1325 });
1326
1327 // Outside block_on, should be None
1328 assert!(crate::context::current_runtime().is_none());
1329 }
1330
1331 #[test]
1332 fn test_spawn_compute_pooling() {
1333 let config = test_config();
1334 let runtime = LoomRuntime::from_config(config).unwrap();
1335
1336 // Warmup - first call allocates
1337 runtime.block_on(async {
1338 runtime.spawn_compute(|| 1i32).await;
1339 });
1340
1341 // Subsequent calls should reuse pooled state (we can't easily verify this
1342 // without internal access, but we can verify it works)
1343 runtime.block_on(async {
1344 for i in 0..100 {
1345 let result = runtime.spawn_compute(move || i).await;
1346 assert_eq!(result, i);
1347 }
1348 });
1349 }
1350
1351 #[test]
1352 fn test_spawn_compute_guard_drops_on_scope_exit() {
1353 // This test verifies the guard's Drop implementation works correctly.
1354 // We can't easily test panic behavior in rayon (panics abort by default),
1355 // but we can verify the guard decrements the counter when it goes out of scope.
1356 use crate::metrics::LoomMetrics;
1357 use std::sync::atomic::Ordering;
1358
1359 let state = super::ComputeTaskState::new();
1360 let metrics = LoomMetrics::new();
1361
1362 // Create a guard (increments counter)
1363 {
1364 let _guard = super::ComputeTaskGuard::new(&state, &metrics);
1365 assert_eq!(state.count.load(Ordering::Relaxed), 1);
1366 }
1367 // Guard dropped, counter should be 0
1368 assert_eq!(state.count.load(Ordering::Relaxed), 0);
1369
1370 // Test multiple guards
1371 let state = super::ComputeTaskState::new();
1372
1373 let guard1 = super::ComputeTaskGuard::new(&state, &metrics);
1374 assert_eq!(state.count.load(Ordering::Relaxed), 1);
1375
1376 let guard2 = super::ComputeTaskGuard::new(&state, &metrics);
1377 assert_eq!(state.count.load(Ordering::Relaxed), 2);
1378
1379 drop(guard1);
1380 assert_eq!(state.count.load(Ordering::Relaxed), 1);
1381
1382 drop(guard2);
1383 assert_eq!(state.count.load(Ordering::Relaxed), 0);
1384
1385 // The notification mechanism is verified by the fact that wait_for_shutdown
1386 // doesn't spin-loop forever when compute tasks complete
1387 }
1388
1389 #[test]
1390 fn test_compute_tasks_in_flight() {
1391 let config = test_config();
1392 let runtime = LoomRuntime::from_config(config).unwrap();
1393
1394 // Initially no tasks
1395 assert_eq!(runtime.compute_tasks_in_flight(), 0);
1396
1397 // After spawning and completing, should be back to 0
1398 runtime.block_on(async {
1399 runtime.spawn_compute(|| 42).await;
1400 });
1401 assert_eq!(runtime.compute_tasks_in_flight(), 0);
1402 }
1403
1404 #[test]
1405 fn test_display() {
1406 let config = test_config();
1407 let runtime = LoomRuntime::from_config(config).unwrap();
1408
1409 let display = format!("{}", runtime);
1410 assert!(display.starts_with("LoomRuntime[test]:"));
1411 assert!(display.contains("tokio(1, cpus="));
1412 assert!(display.contains("rayon(1, cpus="));
1413 }
1414
1415 #[test]
1416 fn test_cpuset_only() {
1417 let mut config = test_config();
1418 config.cpuset = Some("0".to_string());
1419 config.tokio_threads = Some(1);
1420 config.rayon_threads = Some(0);
1421
1422 let runtime = LoomRuntime::from_config(config).unwrap();
1423 // Should use the user-provided cpuset
1424 assert_eq!(runtime.inner.tokio_cpus, vec![0]);
1425 }
1426
1427 /// Test that CUDA cpuset conflict error is properly detected.
1428 /// This test requires actual CUDA hardware to verify the conflict.
1429 #[cfg(feature = "cuda-tests")]
1430 #[test]
1431 fn test_cuda_cpuset_conflict_error() {
1432 let mut config = test_config();
1433 config.cuda_device = Some(crate::cuda::CudaDeviceSelector::DeviceId(0));
1434 config.cpuset = Some("0".to_string()); // Conflict: both specified
1435
1436 let result = LoomRuntime::from_config(config);
1437 assert!(
1438 matches!(result, Err(LoomError::CudaCpusetConflict)),
1439 "expected CudaCpusetConflict error, got {:?}",
1440 result
1441 );
1442 }
1443
1444 /// Test that CUDA device alone (without cpuset) works.
1445 #[cfg(feature = "cuda-tests")]
1446 #[test]
1447 fn test_cuda_device_only() {
1448 let mut config = test_config();
1449 config.cuda_device = Some(crate::cuda::CudaDeviceSelector::DeviceId(0));
1450 config.cpuset = None;
1451
1452 let runtime = LoomRuntime::from_config(config).unwrap();
1453 // Should have found CUDA-local CPUs
1454 assert!(!runtime.inner.tokio_cpus.is_empty());
1455 }
1456
1457 // =============================================================================
1458 // spawn_adaptive Tests
1459 // =============================================================================
1460
1461 #[test]
1462 fn test_spawn_adaptive_runs_work() {
1463 let config = test_config();
1464 let runtime = LoomRuntime::from_config(config).unwrap();
1465
1466 let result = runtime.block_on(async { runtime.spawn_adaptive(|| 42).await });
1467
1468 assert_eq!(result, 42);
1469 }
1470
1471 #[test]
1472 fn test_spawn_adaptive_with_hint() {
1473 let config = test_config();
1474 let runtime = LoomRuntime::from_config(config).unwrap();
1475
1476 let result = runtime.block_on(async {
1477 runtime
1478 .spawn_adaptive_with_hint(crate::ComputeHint::Low, || 100)
1479 .await
1480 });
1481
1482 assert_eq!(result, 100);
1483 }
1484
1485 #[test]
1486 fn test_spawn_adaptive_multiple_calls() {
1487 let config = test_config();
1488 let runtime = LoomRuntime::from_config(config).unwrap();
1489
1490 runtime.block_on(async {
1491 // Run many fast tasks to let MAB learn
1492 for i in 0..50 {
1493 let result = runtime.spawn_adaptive(move || i * 2).await;
1494 assert_eq!(result, i * 2);
1495 }
1496 });
1497 }
1498
1499 #[test]
1500 fn test_spawn_adaptive_records_metrics() {
1501 let config = test_config();
1502 let runtime = LoomRuntime::from_config(config).unwrap();
1503
1504 runtime.block_on(async {
1505 // Run some adaptive tasks
1506 for _ in 0..10 {
1507 runtime.spawn_adaptive(|| std::hint::black_box(42)).await;
1508 }
1509 });
1510
1511 // Check that metrics were recorded
1512 let metrics = runtime.prometheus_metrics();
1513 let total_decisions = metrics.inline_decisions.get() + metrics.offload_decisions.get();
1514 assert!(
1515 total_decisions >= 10,
1516 "Should have recorded at least 10 decisions, got {}",
1517 total_decisions
1518 );
1519 }
1520
1521 #[test]
1522 fn test_prometheus_metrics_use_prefix() {
1523 let mut config = test_config();
1524 config.prefix = "myapp".to_string();
1525 let runtime = LoomRuntime::from_config(config).unwrap();
1526
1527 // The metrics should use the prefix from config
1528 // We can verify by checking the registry if one was provided
1529 let registry = prometheus::Registry::new();
1530 runtime
1531 .prometheus_metrics()
1532 .register(®istry)
1533 .expect("registration should succeed");
1534
1535 let families = registry.gather();
1536 // Find a metric with our prefix
1537 let myapp_metric = families.iter().find(|f| f.get_name().starts_with("myapp_"));
1538 assert!(
1539 myapp_metric.is_some(),
1540 "Should find metrics with 'myapp_' prefix"
1541 );
1542
1543 // Should not find metrics with default 'loom_' prefix
1544 let loom_metric = families.iter().find(|f| f.get_name().starts_with("loom_"));
1545 assert!(
1546 loom_metric.is_none(),
1547 "Should not find metrics with 'loom_' prefix"
1548 );
1549 }
1550
1551 // =============================================================================
1552 // scope_compute Tests
1553 // =============================================================================
1554
1555 #[test]
1556 fn test_scope_compute_basic() {
1557 let config = test_config();
1558 let runtime = LoomRuntime::from_config(config).unwrap();
1559
1560 let result = runtime.block_on(async {
1561 runtime
1562 .scope_compute(|_s| {
1563 // Simple computation without spawning
1564 42
1565 })
1566 .await
1567 });
1568
1569 assert_eq!(result, 42);
1570 }
1571
1572 #[test]
1573 fn test_scope_compute_borrow_local_data() {
1574 let config = test_config();
1575 let runtime = LoomRuntime::from_config(config).unwrap();
1576
1577 let result = runtime.block_on(async {
1578 let data = [1, 2, 3, 4, 5, 6, 7, 8];
1579
1580 let sum = runtime
1581 .scope_compute(|_s| {
1582 // Borrow data inside the scope
1583 data.iter().sum::<i32>()
1584 })
1585 .await;
1586
1587 // data is still valid after scope_compute
1588 assert_eq!(data.len(), 8);
1589 sum
1590 });
1591
1592 assert_eq!(result, 36);
1593 }
1594
1595 #[test]
1596 fn test_scope_compute_parallel_with_atomic() {
1597 use std::sync::atomic::{AtomicI32, Ordering};
1598
1599 let mut config = test_config();
1600 config.rayon_threads = Some(2);
1601 let runtime = LoomRuntime::from_config(config).unwrap();
1602
1603 let result = runtime.block_on(async {
1604 let data = [1, 2, 3, 4, 5, 6, 7, 8];
1605 let sum = AtomicI32::new(0);
1606
1607 runtime
1608 .scope_compute(|s| {
1609 let (left, right) = data.split_at(data.len() / 2);
1610 let sum_ref = ∑
1611
1612 s.spawn(move |_| {
1613 let partial: i32 = left.iter().sum();
1614 sum_ref.fetch_add(partial, Ordering::Relaxed);
1615 });
1616 s.spawn(move |_| {
1617 let partial: i32 = right.iter().sum();
1618 sum_ref.fetch_add(partial, Ordering::Relaxed);
1619 });
1620 })
1621 .await;
1622
1623 sum.load(Ordering::Relaxed)
1624 });
1625
1626 assert_eq!(result, 36);
1627 }
1628
1629 #[test]
1630 fn test_scope_compute_nested_spawns() {
1631 use std::sync::atomic::{AtomicI32, Ordering};
1632
1633 let config = test_config();
1634 let runtime = LoomRuntime::from_config(config).unwrap();
1635
1636 let result = runtime.block_on(async {
1637 let data = [1, 2, 3, 4, 5, 6, 7, 8];
1638 let sum = AtomicI32::new(0);
1639
1640 runtime
1641 .scope_compute(|s| {
1642 let data_ref = &data;
1643 let sum_ref = ∑
1644
1645 s.spawn(move |s| {
1646 // Nested spawn
1647 s.spawn(move |_| {
1648 sum_ref
1649 .fetch_add(data_ref[0..2].iter().sum::<i32>(), Ordering::Relaxed);
1650 });
1651 s.spawn(move |_| {
1652 sum_ref
1653 .fetch_add(data_ref[2..4].iter().sum::<i32>(), Ordering::Relaxed);
1654 });
1655 });
1656 s.spawn(move |_| {
1657 sum_ref.fetch_add(data_ref[4..8].iter().sum::<i32>(), Ordering::Relaxed);
1658 });
1659 })
1660 .await;
1661
1662 // rayon::scope guarantees all spawned work completes before returning
1663 sum.load(Ordering::Relaxed)
1664 });
1665
1666 assert_eq!(result, 36);
1667 }
1668
1669 #[test]
1670 fn test_scope_compute_with_rayon_par_iter() {
1671 let config = test_config();
1672 let runtime = LoomRuntime::from_config(config).unwrap();
1673
1674 let result = runtime.block_on(async {
1675 let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1676
1677 runtime
1678 .scope_compute(|_s| {
1679 use rayon::prelude::*;
1680 // Use parallel iterators inside the scope
1681 data.par_iter().map(|x| x * 2).sum::<i32>()
1682 })
1683 .await
1684 });
1685
1686 assert_eq!(result, 110);
1687 }
1688
1689 #[test]
1690 fn test_scope_compute_tracks_compute_tasks() {
1691 let config = test_config();
1692 let runtime = LoomRuntime::from_config(config).unwrap();
1693
1694 // Initially no tasks
1695 assert_eq!(runtime.compute_tasks_in_flight(), 0);
1696
1697 runtime.block_on(async {
1698 runtime.scope_compute(|_s| 42).await;
1699 });
1700
1701 // After completion, should be back to 0
1702 assert_eq!(runtime.compute_tasks_in_flight(), 0);
1703 }
1704
1705 #[test]
1706 fn test_scope_compute_data_still_valid_after() {
1707 let config = test_config();
1708 let runtime = LoomRuntime::from_config(config).unwrap();
1709
1710 runtime.block_on(async {
1711 let mut data = vec![1, 2, 3, 4, 5];
1712
1713 let sum = runtime.scope_compute(|_s| data.iter().sum::<i32>()).await;
1714
1715 assert_eq!(sum, 15);
1716
1717 // data is still valid and can be modified
1718 data.push(6);
1719 assert_eq!(data.len(), 6);
1720
1721 // Can use scope_compute again with the same data
1722 let new_sum = runtime.scope_compute(|_s| data.iter().sum::<i32>()).await;
1723 assert_eq!(new_sum, 21);
1724 });
1725 }
1726
1727 /// Test that scope_compute properly yields to the async executor and doesn't block.
1728 /// This validates the future correctly returns Poll::Pending and wakes up when done.
1729 #[test]
1730 fn test_scope_compute_yields_to_executor() {
1731 use std::sync::atomic::{AtomicBool, Ordering};
1732 use std::sync::Arc;
1733 use std::time::Duration;
1734
1735 let config = test_config();
1736 let runtime = LoomRuntime::from_config(config).unwrap();
1737
1738 runtime.block_on(async {
1739 let concurrent_task_ran = Arc::new(AtomicBool::new(false));
1740 let concurrent_task_ran_clone = concurrent_task_ran.clone();
1741
1742 // Spawn a concurrent async task that should run while scope_compute is waiting
1743 let concurrent_handle = runtime.spawn_async(async move {
1744 // Wait a bit for the scope_compute to start
1745 tokio::time::sleep(Duration::from_millis(5)).await;
1746 concurrent_task_ran_clone.store(true, Ordering::Release);
1747 100
1748 });
1749
1750 // Run scope_compute with work that takes some time
1751 let result = runtime
1752 .scope_compute(|_s| {
1753 // Simulate some work
1754 std::thread::sleep(Duration::from_millis(20));
1755 42
1756 })
1757 .await;
1758
1759 assert_eq!(result, 42);
1760
1761 // Verify the concurrent task actually ran during scope_compute
1762 // (check BEFORE awaiting the handle to prove it ran concurrently)
1763 assert!(
1764 concurrent_task_ran.load(Ordering::Acquire),
1765 "Concurrent task should have run while scope_compute was in progress"
1766 );
1767
1768 // The concurrent task should have completed
1769 let concurrent_result = concurrent_handle.await.unwrap();
1770 assert_eq!(concurrent_result, 100);
1771 });
1772 }
1773
1774 /// Test that mirrors the documentation example for scope_compute.
1775 /// This ensures the example code actually compiles and works.
1776 #[test]
1777 fn test_scope_compute_doc_example() {
1778 use std::sync::atomic::{AtomicI32, Ordering};
1779
1780 let config = test_config();
1781 let runtime = LoomRuntime::from_config(config).unwrap();
1782
1783 let result = runtime.block_on(async {
1784 let data = [1, 2, 3, 4, 5, 6, 7, 8];
1785 let sum = AtomicI32::new(0);
1786
1787 runtime
1788 .scope_compute(|s| {
1789 let (left, right) = data.split_at(data.len() / 2);
1790 let sum_ref = ∑
1791
1792 s.spawn(move |_| {
1793 sum_ref.fetch_add(left.iter().sum::<i32>(), Ordering::Relaxed);
1794 });
1795 s.spawn(move |_| {
1796 sum_ref.fetch_add(right.iter().sum::<i32>(), Ordering::Relaxed);
1797 });
1798 })
1799 .await;
1800
1801 // Verify data is still accessible after scope_compute
1802 assert_eq!(data.len(), 8);
1803 sum.load(Ordering::Relaxed)
1804 });
1805
1806 assert_eq!(result, 36); // 1+2+3+4+5+6+7+8 = 36
1807 }
1808
1809 /// Test that scope_compute works correctly with tokio::select! (cancellation scenario).
1810 /// The future should block on drop until the scope completes.
1811 #[test]
1812 fn test_scope_compute_cancellation_safety() {
1813 use std::sync::atomic::{AtomicBool, Ordering};
1814 use std::sync::Arc;
1815 use std::time::Duration;
1816
1817 let config = test_config();
1818 let runtime = LoomRuntime::from_config(config).unwrap();
1819
1820 runtime.block_on(async {
1821 let scope_completed = Arc::new(AtomicBool::new(false));
1822 let scope_completed_clone = scope_completed.clone();
1823
1824 // Use select! to race scope_compute against a timeout
1825 // The scope_compute will "lose" but should still complete before we continue
1826 let result = tokio::select! {
1827 biased;
1828
1829 _ = tokio::time::sleep(Duration::from_millis(5)) => {
1830 // Timeout wins - this drops the scope_compute future
1831 // But drop should block until scope completes
1832 None
1833 }
1834 result = runtime.scope_compute(|_s| {
1835 // This takes longer than the timeout
1836 std::thread::sleep(Duration::from_millis(50));
1837 scope_completed_clone.store(true, Ordering::Release);
1838 42
1839 }) => {
1840 Some(result)
1841 }
1842 };
1843
1844 // The timeout should have won
1845 assert!(result.is_none(), "Timeout should have won the race");
1846
1847 // But critically, the scope should have completed (drop blocked)
1848 assert!(
1849 scope_completed.load(Ordering::Acquire),
1850 "Scope should have completed even though future was cancelled"
1851 );
1852 });
1853 }
1854
1855 /// Test that panics inside scope_compute are properly propagated to the awaiter.
1856 #[test]
1857 #[should_panic(expected = "intentional panic for testing")]
1858 fn test_scope_compute_panic_propagation() {
1859 let config = test_config();
1860 let runtime = LoomRuntime::from_config(config).unwrap();
1861
1862 runtime.block_on(async {
1863 runtime
1864 .scope_compute(|_s| {
1865 panic!("intentional panic for testing");
1866 })
1867 .await
1868 });
1869 }
1870
1871 /// Test that panics in spawned work inside scope_compute are properly propagated.
1872 #[test]
1873 #[should_panic(expected = "panic in spawned work")]
1874 fn test_scope_compute_spawned_panic_propagation() {
1875 let mut config = test_config();
1876 config.rayon_threads = Some(2);
1877 let runtime = LoomRuntime::from_config(config).unwrap();
1878
1879 runtime.block_on(async {
1880 runtime
1881 .scope_compute(|s| {
1882 s.spawn(|_| {
1883 panic!("panic in spawned work");
1884 });
1885 })
1886 .await
1887 });
1888 }
1889
1890 // =============================================================================
1891 // scope_adaptive Tests
1892 // =============================================================================
1893
1894 #[test]
1895 fn test_scope_adaptive_basic() {
1896 let config = test_config();
1897 let runtime = LoomRuntime::from_config(config).unwrap();
1898
1899 let result = runtime.block_on(async {
1900 runtime
1901 .scope_adaptive(|_s| {
1902 // Simple computation without spawning
1903 42
1904 })
1905 .await
1906 });
1907
1908 assert_eq!(result, 42);
1909 }
1910
1911 #[test]
1912 fn test_scope_adaptive_with_hint() {
1913 let config = test_config();
1914 let runtime = LoomRuntime::from_config(config).unwrap();
1915
1916 let result = runtime.block_on(async {
1917 runtime
1918 .scope_adaptive_with_hint(crate::ComputeHint::Low, |_s| 100)
1919 .await
1920 });
1921
1922 assert_eq!(result, 100);
1923 }
1924
1925 #[test]
1926 fn test_scope_adaptive_borrow_local_data() {
1927 let config = test_config();
1928 let runtime = LoomRuntime::from_config(config).unwrap();
1929
1930 let result = runtime.block_on(async {
1931 let data = [1, 2, 3, 4, 5, 6, 7, 8];
1932
1933 let sum = runtime
1934 .scope_adaptive(|_s| {
1935 // Borrow data inside the scope
1936 data.iter().sum::<i32>()
1937 })
1938 .await;
1939
1940 // data is still valid after scope_adaptive
1941 assert_eq!(data.len(), 8);
1942 sum
1943 });
1944
1945 assert_eq!(result, 36);
1946 }
1947
1948 #[test]
1949 fn test_scope_adaptive_parallel_with_atomic() {
1950 use std::sync::atomic::{AtomicI32, Ordering};
1951
1952 let mut config = test_config();
1953 config.rayon_threads = Some(2);
1954 let runtime = LoomRuntime::from_config(config).unwrap();
1955
1956 let result = runtime.block_on(async {
1957 let data = [1, 2, 3, 4, 5, 6, 7, 8];
1958 let sum = AtomicI32::new(0);
1959
1960 runtime
1961 .scope_adaptive(|s| {
1962 let (left, right) = data.split_at(data.len() / 2);
1963 let sum_ref = ∑
1964
1965 s.spawn(move |_| {
1966 let partial: i32 = left.iter().sum();
1967 sum_ref.fetch_add(partial, Ordering::Relaxed);
1968 });
1969 s.spawn(move |_| {
1970 let partial: i32 = right.iter().sum();
1971 sum_ref.fetch_add(partial, Ordering::Relaxed);
1972 });
1973 })
1974 .await;
1975
1976 sum.load(Ordering::Relaxed)
1977 });
1978
1979 assert_eq!(result, 36);
1980 }
1981
1982 #[test]
1983 fn test_scope_adaptive_records_metrics() {
1984 let config = test_config();
1985 let runtime = LoomRuntime::from_config(config).unwrap();
1986
1987 runtime.block_on(async {
1988 // Run some adaptive tasks
1989 for _ in 0..10 {
1990 runtime.scope_adaptive(|_s| std::hint::black_box(42)).await;
1991 }
1992 });
1993
1994 // Check that metrics were recorded
1995 let metrics = runtime.prometheus_metrics();
1996 let total_decisions = metrics.inline_decisions.get() + metrics.offload_decisions.get();
1997 assert!(
1998 total_decisions >= 10,
1999 "Should have recorded at least 10 decisions, got {}",
2000 total_decisions
2001 );
2002 }
2003
2004 /// Test that panics inside scope_adaptive are properly propagated.
2005 #[test]
2006 #[should_panic(expected = "intentional panic in scope_adaptive")]
2007 fn test_scope_adaptive_panic_propagation() {
2008 let config = test_config();
2009 let runtime = LoomRuntime::from_config(config).unwrap();
2010
2011 runtime.block_on(async {
2012 runtime
2013 .scope_adaptive(|_s| {
2014 panic!("intentional panic in scope_adaptive");
2015 })
2016 .await
2017 });
2018 }
2019}