Expand description
Compute module for CPU-intensive operations using Rayon
This module provides a dedicated compute thread pool for CPU-bound work, integrating Rayon’s fork-join parallelism with Tokio’s async runtime.
Key features:
- Dedicated Rayon thread pool for compute operations
- Seamless async-to-sync bridging via tokio-rayon
- Scope-based parallelism for complex computational graphs
- Metrics and monitoring for compute operations
§Rayon-Tokio Integration Strategy
§Overview
This document describes the integration strategy for combining Tokio’s asynchronous runtime with Rayon’s data-parallel compute capabilities in the Dynamo runtime. The core philosophy is simple:
- Tokio handles I/O-bound operations, waiting, and coordination
- Rayon handles CPU-bound operations and data parallelism
- Multiple async tasks can concurrently submit different types of work to the shared Rayon thread pool
§Architecture
+---------------------------------------------------------------+
| Tokio Runtime |
| +-----------+ +-----------+ +-----------+ |
| | Async | | Async | | Async | |
| | Task 1 | | Task 2 | | Task 3 | |
| | | | | | | |
| | Receives | | Processes | | Handles | |
| | requests | | streams | | batches | |
| +-----+-----+ +-----+-----+ +-----+-----+ |
| | | | |
| +----------------+----------------+ |
| | |
| tokio_rayon::spawn |
| | |
+-------------------------+------------------------------------+
|
v
+---------------------------------------------------------------+
| Rayon Thread Pool |
| |
| +----------------------------------------------------------+ |
| | Work-Stealing Thread Pool (N threads) | |
| | | |
| | +---------+ +-----------+ +------------------+ | |
| | | scope() | | par_iter()| | join() | | |
| | | tasks | | chunks | | computations | | |
| | +---------+ +-----------+ +------------------+ | |
| | | |
| | All patterns share the same thread pool | |
| +----------------------------------------------------------+ |
+---------------------------------------------------------------+§When to Use Tokio vs Rayon
§Use Tokio (async/await) when
- Waiting for I/O: Network requests, file I/O, database queries
- Coordinating tasks: Channels, synchronization, signaling
- Stream processing: Items arrive over time with delays
- Resource pooling: Connection pools, async locks
- Service orchestration: Managing component lifecycles
§Use Rayon (compute pool) when
- Batch processing: You have all data ready for parallel processing
- CPU-intensive work: Computation takes >1ms per item
- Data transformation: Tokenization, serialization, compression
- Parallel algorithms: Matrix operations, sorting, searching
- Map-reduce patterns: Aggregations over large datasets
§Decision Thresholds
- Use Rayon when processing ≥10 items in parallel
- Use
spawn_blockingwhen CPU work takes >1ms - Keep Tokio for operations with >100μs waits between items
- Use Rayon when you can saturate multiple CPU cores
§Overhead Considerations
Based on benchmarks, the async bridge between Tokio and Rayon has:
- ~25μs overhead for small tasks (due to channel communication)
- ~4% overhead for tasks taking >2ms
- Negligible overhead for tasks taking >10ms
For minimal overhead when using Rayon from async context:
- Small tasks (<100μs): Run directly on Tokio
- Medium tasks (100μs-1ms): Use
spawn_blocking+pool.execute_sync() - Large tasks (>1ms): Use
pool.execute()for convenience
§Concurrent Usage Patterns
The key insight is that multiple async tasks can concurrently use the same Rayon thread pool with different parallelization patterns. Rayon’s work-stealing scheduler efficiently distributes work regardless of the pattern used.
§Pattern 1: Concurrent Scope and ParIter
use std::sync::Arc;
use dynamo_runtime::compute::ComputePool;
async fn concurrent_compute_tasks(pool: Arc<ComputePool>) {
// Task 1: Using scope for dynamic task spawning
let task1 = tokio::spawn({
let pool = pool.clone();
async move {
pool.execute_scoped(|scope| {
// Dynamically spawn tasks based on runtime conditions
for i in 0..num_tasks {
scope.spawn(move |_| {
expensive_computation(i)
});
}
}).await
}
});
// Task 2: Using parallel iterators for batch processing
let task2 = tokio::spawn({
let pool = pool.clone();
async move {
pool.install(|| {
// Process data in parallel chunks
data.par_chunks(100)
.map(|chunk| transform_chunk(chunk))
.collect::<Vec<_>>()
}).await
}
});
// Task 3: Using join for binary parallelism
let task3 = tokio::spawn({
let pool = pool.clone();
async move {
pool.join(
|| compute_left_branch(),
|| compute_right_branch(),
).await
}
});
// All three tasks run concurrently, sharing the Rayon thread pool
let (r1, r2, r3) = tokio::join!(task1, task2, task3);
}§Pattern 2: Stream Processing with Batch Compute
/// Example: Process async stream with CPU-intensive batch operations
async fn stream_with_compute(
pool: Arc<ComputePool>,
stream: impl Stream<Item = Vec<Data>>,
) {
// Use for_each_concurrent for proper stream consumption
stream.for_each_concurrent(4, |batch| {
let pool = pool.clone();
async move {
// Process batch using parallel iterators
let result = pool.install(move || {
batch.par_iter()
.map(|item| process_item(item))
.collect::<Vec<_>>()
}).await.unwrap();
// Async I/O to send results
send_results(result).await;
}
}).await;
}§Pattern 3: Mixed Workload Service
/// Real-world example: LLM service with mixed workloads
struct LLMService {
runtime: Arc<Runtime>,
tokenizer: Arc<Tokenizer>,
}
impl LLMService {
async fn run(&self) {
let pool = self.runtime.compute_pool()
.expect("Compute pool required");
// Tokenization service - uses parallel iterators
let tokenization_task = {
let pool = pool.clone();
let tokenizer = self.tokenizer.clone();
tokio::spawn(async move {
loop {
// Async I/O: receive batch from network
let texts = receive_tokenization_batch().await;
// CPU-bound: parallel tokenization
let tokens = pool.install(move || {
texts.par_iter()
.map(|text| tokenizer.encode(text))
.collect::<Vec<_>>()
}).await.unwrap();
// Async I/O: send results
send_tokens(tokens).await;
}
})
};
// Embedding service - uses scope for multi-stage computation
let embedding_task = {
let pool = pool.clone();
tokio::spawn(async move {
loop {
// Async I/O: receive request
let request = receive_embedding_request().await;
// CPU-bound: multi-stage parallel computation
let embeddings = pool.execute_scoped(|scope| {
let mut text_emb = None;
let mut context_emb = None;
scope.spawn(|_| {
text_emb = Some(compute_text_embedding(&request.text));
});
scope.spawn(|_| {
context_emb = Some(compute_context_embedding(&request.context));
});
// Scope waits for both to complete
combine_embeddings(text_emb.unwrap(), context_emb.unwrap())
}).await.unwrap();
// Async I/O: send results
send_embeddings(embeddings).await;
}
})
};
// Batch inference service - uses nested parallelism
let inference_task = {
let pool = pool.clone();
tokio::spawn(async move {
loop {
let batch = receive_inference_batch().await;
let results = pool.execute_scoped(|scope| {
let mut results = Vec::with_capacity(batch.len());
// Spawn a task for each item
for item in batch {
scope.spawn(move |s2| {
// Within each task, use parallel iterators
let preprocessed = item.data
.par_chunks(10)
.map(|chunk| preprocess(chunk))
.collect::<Vec<_>>();
// Can spawn more tasks within nested scope
let mut stages = vec![];
for p in preprocessed {
s2.spawn(move |_| {
stages.push(run_inference(p));
});
}
results.push(merge_stages(stages));
});
}
results
}).await.unwrap();
send_inference_results(results).await;
}
})
};
// All services run concurrently, sharing the compute pool
tokio::join!(tokenization_task, embedding_task, inference_task);
}
}§How It Works: Thread Pool Sharing
Rayon’s work-stealing scheduler ensures efficient resource utilization even when different async tasks submit different types of work:
- Work Queues: Each Rayon thread has a local deque (double-ended queue)
- Local Execution: Threads prefer executing their own tasks (LIFO for cache locality)
- Work Stealing: Idle threads steal tasks from busy threads (FIFO from the other end)
- No Interference: Different parallelization patterns (scope, par_iter) coexist peacefully
This means:
- A
scopetask spawning many small tasks works alongsidepar_chunksprocessing large batches - The thread pool automatically balances load between different types of work
- No manual coordination needed between different async tasks using the pool
§Performance Considerations
§Thread Pool Sizing
[runtime]
# Tokio threads: optimize for concurrent async tasks
num_worker_threads = 8 # Usually number of cores
# Rayon threads: optimize for CPU saturation
compute_threads = 4 # Often cores/2 to avoid oversubscription§Avoiding Oversubscription
Total threads = Tokio workers + Rayon threads + System threads
Recommendation: Keep total ≤ 1.5 × physical cores
§Monitoring Pool Utilization
// Check pool metrics
let metrics = pool.metrics();
println!("Active tasks: {}", metrics.tasks_active());
println!("Average duration: {:.2}ms", metrics.avg_task_duration_us() / 1000.0);
println!("Slow tasks (>100ms): {}", metrics.slow_tasks());
// Adjust pool size if consistently over/under utilized
if metrics.tasks_active() > pool.num_threads() * 2 {
// Consider increasing compute_threads
}§Common Patterns and Best Practices
§DO: Batch Collection Before Processing
// ✅ Good: Collect async items, then process in parallel
let items = stream.take(100).collect::<Vec<_>>().await;
let processed = pool.install(|| {
items.par_iter().map(|item| process(item)).collect()
}).await?;§DON’T: Mix Async and Compute in Tight Loops
// ❌ Bad: Alternating between async and compute
for item in items {
let data = fetch_data(item).await; // Async
let result = pool.execute(|| compute(data)).await?; // Compute
store_result(result).await; // Async
}
// ✅ Good: Batch operations
let all_data = futures::future::join_all(
items.iter().map(|item| fetch_data(item))
).await;
let all_results = pool.install(|| {
all_data.par_iter().map(|data| compute(data)).collect()
}).await?;
futures::future::join_all(
all_results.iter().map(|result| store_result(result))
).await;§DO: Use Scope for Dynamic Parallelism
// ✅ Good: When you don't know the parallelism level upfront
pool.execute_scoped(|scope| {
while let Some(work) = find_more_work() {
scope.spawn(move |_| {
process_work(work);
});
}
}).await?;§DO: Use ParIter for Data Parallelism
// ✅ Good: When processing collections
pool.install(|| {
data.par_chunks(optimal_chunk_size())
.map(|chunk| process_chunk(chunk))
.reduce(|| initial_value(), |a, b| combine(a, b))
}).await?;§Troubleshooting
§Issue: High Latency Despite Low CPU Usage
Cause: Too few Rayon threads for the workload
Solution: Increase compute_threads configuration
§Issue: System Feels Sluggish
Cause: Thread oversubscription Solution: Reduce total thread count (Tokio + Rayon)
§Issue: Uneven Work Distribution
Cause: Poor chunk size selection
Solution: Use smaller chunks or dynamic scheduling with scope
§Issue: Deadlock or Hanging
Cause: Nested install() calls or blocking in Rayon threads
Solution: Use execute() instead of install() for simple tasks
§Configuration Examples
§High-Throughput Service
# Many concurrent requests, moderate compute per request
[runtime]
num_worker_threads = 16
compute_threads = 8
compute_stack_size = "4MB"§Batch Processing System
# Few concurrent tasks, heavy compute per batch
[runtime]
num_worker_threads = 4
compute_threads = 12
compute_stack_size = "8MB"§Mixed Workload
# Balance between async I/O and compute
[runtime]
num_worker_threads = 8
compute_threads = 6
compute_stack_size = "2MB"§Summary
The Rayon-Tokio integration provides a powerful model for handling mixed workloads:
- Tokio manages async I/O and coordination
- Rayon provides a shared compute thread pool
- Multiple async tasks can concurrently use different Rayon patterns
- Work-stealing ensures efficient resource utilization
- Clear separation between I/O-bound and CPU-bound work
This architecture enables building high-performance services that efficiently handle both network I/O and CPU-intensive computations without manual thread management or complex synchronization.
Re-exports§
pub use metrics::ComputeMetrics;pub use pool::ComputeHandle;pub use pool::ComputePool;pub use pool::ComputePoolExt;
Modules§
- macros
- Zero-overhead macros for compute task execution with optional validation
- metrics
- Metrics for monitoring compute pool operations
- patterns
- Helper functions for common parallel patterns
- pool
- Compute pool implementation with tokio-rayon integration
- thread_
local - Thread-local storage for compute resources
Structs§
- Compute
Config - Configuration for the compute thread pool
Traits§
- Scope
Executor - Helper trait for scope-based operations