# loom-rs
**Weaving threads together**
A Rust crate providing a bespoke thread pool runtime combining tokio and rayon with CPU pinning capabilities.
## Features
- **Hybrid Runtime**: Combines tokio for async I/O with rayon for CPU-bound parallel work
- **CPU Pinning**: Automatically pins threads to specific CPUs for consistent performance
- **Flexible Configuration**: Configure via files (TOML/YAML/JSON), environment variables, or code
- **CLI Integration**: Built-in clap support for command-line overrides
- **CUDA NUMA Awareness**: Optional feature for selecting CPUs local to a GPU (Linux only)
## Platform Support
| Linux | **Full support** | All features including CPU pinning and CUDA |
| macOS | Partial | Compiles and runs, but CPU pinning may silently fail |
| Windows | Partial | Compiles and runs, but CPU pinning may silently fail |
**Note**: CPU affinity (thread pinning) is a Linux-focused feature. On macOS and Windows, pinning calls may return failure or have no effect. The library remains functional for development and testing, but production deployments targeting performance should use Linux.
## Installation
Add to your `Cargo.toml`:
```toml
[dependencies]
loom-rs = "0.1"
```
For CUDA support (Linux only):
```toml
[dependencies]
loom-rs = { version = "0.1", features = ["cuda"] }
```
## Quick Start
```rust
use loom_rs::LoomBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = LoomBuilder::new()
.prefix("myapp")
.tokio_threads(2)
.rayon_threads(6)
.build()?;
runtime.block_on(async {
// Spawn tracked async I/O task
let io_handle = runtime.spawn_async(async {
// Async I/O work
fetch_data().await
});
// Spawn tracked compute task and await result
let result = runtime.spawn_compute(|| {
// CPU-bound work on rayon
(0..1000000).sum::<i64>()
}).await;
println!("Compute result: {}", result);
// Zero-overhead parallel iterators
let processed = runtime.install(|| {
use rayon::prelude::*;
data.par_iter().map(|x| process(x)).collect::<Vec<_>>()
});
// Wait for async task
let data = io_handle.await?;
});
// Graceful shutdown - waits for all tracked tasks
runtime.block_until_idle();
Ok(())
}
```
## Configuration
Configuration sources are merged in order (later sources override earlier):
1. Default values
2. Config files (via `.file()`)
3. Environment variables (via `.env_prefix()`)
4. Programmatic overrides
5. CLI arguments (via `.with_cli_args()`)
### Config File (TOML)
```toml
prefix = "myapp"
cpuset = "0-7,16-23"
tokio_threads = 2
rayon_threads = 14
```
### Environment Variables
With `.env_prefix("LOOM")`:
```bash
export LOOM_PREFIX=myapp
export LOOM_CPUSET=0-7
export LOOM_TOKIO_THREADS=2
export LOOM_RAYON_THREADS=6
```
### CLI Arguments
```rust
use clap::Parser;
use loom_rs::{LoomBuilder, LoomArgs};
#[derive(Parser)]
struct MyArgs {
#[command(flatten)]
loom: LoomArgs,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = MyArgs::parse();
let runtime = LoomBuilder::new()
.file("config.toml")
.env_prefix("LOOM")
.with_cli_args(&args.loom)
.build()?;
Ok(())
}
```
Available CLI arguments:
- `--loom-prefix`: Thread name prefix
- `--loom-cpuset`: CPU set (e.g., "0-7,16-23")
- `--loom-tokio-threads`: Number of tokio threads
- `--loom-rayon-threads`: Number of rayon threads
- `--loom-cuda-device`: CUDA device ID or UUID (requires `cuda` feature)
## CPU Set Format
The `cpuset` option accepts a string in Linux taskset/numactl format:
- Single CPUs: `"0"`, `"5"`
- Ranges: `"0-7"`, `"16-23"`
- Mixed: `"0-3,8-11"`, `"0,2,4,6-8"`
## CUDA Support
With the `cuda` feature enabled (Linux only), configure the runtime to use CPUs local to a specific GPU.
### System Dependencies
```bash
sudo apt-get install libhwloc-dev libudev-dev
```
### Usage
```rust
let runtime = LoomBuilder::new()
.cuda_device_id(0) // Use CPUs near GPU 0
.build()?;
// Or by UUID
let runtime = LoomBuilder::new()
.cuda_device_uuid("GPU-12345678-1234-1234-1234-123456789012")
.build()?;
```
This is useful for GPU-accelerated workloads where data needs to be transferred between CPU and GPU memory, as it minimizes NUMA-related latency.
## Thread Naming
Threads are named with the configured prefix:
- Tokio threads: `{prefix}-tokio-0000`, `{prefix}-tokio-0001`, ...
- Rayon threads: `{prefix}-rayon-0000`, `{prefix}-rayon-0001`, ...
## API Reference
### Task Spawning
| `spawn_async()` | I/O-bound async tasks | ~10ns | Yes |
| `spawn_compute()` | CPU-bound work (await from async) | ~100-500ns | Yes |
| `compute_map()` | Stream items -> rayon -> stream | ~100-500ns/item | No |
| `install()` | Zero-overhead parallel iterators | ~0ns | No |
### Shutdown
```rust
// Option 1: Simple shutdown from main thread
runtime.block_until_idle();
// Option 2: Manual control from async context
runtime.block_on(async {
runtime.spawn_async(background_work());
// Signal shutdown
runtime.shutdown();
// Wait for completion
runtime.wait_for_shutdown().await;
});
// Option 3: Check status without blocking
if runtime.is_idle() {
println!("All tasks complete");
}
```
### Direct Access (Untracked)
For advanced use cases requiring untracked access:
```rust
// Direct tokio handle
let handle = runtime.tokio_handle();
handle.spawn(untracked_task());
// Direct rayon pool
let pool = runtime.rayon_pool();
## Ergonomic Access
Use `current_runtime()` or `spawn_compute()` from anywhere in the runtime:
```rust
use loom_rs::LoomBuilder;
let runtime = LoomBuilder::new().build()?;
runtime.block_on(async {
// No need to pass &runtime around
let result = loom_rs::spawn_compute(|| expensive_work()).await;
// Or get the runtime explicitly
let rt = loom_rs::current_runtime().unwrap();
rt.spawn_async(async { /* ... */ });
});
```
## Stream Processing
Use `ComputeStreamExt` to process async stream items on rayon:
```rust
use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};
let runtime = LoomBuilder::new().build()?;
runtime.block_on(async {
let numbers = stream::iter(0..100);
// Each item is processed on rayon, results stream back
let results: Vec<_> = numbers
.compute_map(|n| {
// CPU-intensive work runs on rayon
(0..n).map(|i| i * i).sum::<i64>()
})
.collect()
.await;
});
```
This is ideal for pipelines where you:
1. Await values from an async source (network, channel, file)
2. Process each value with CPU-intensive work
3. Continue the async pipeline with the results
Items are processed sequentially to preserve ordering and provide natural backpressure.
## Performance
loom-rs is designed for zero unnecessary overhead:
- **Thread pinning**: One-time cost at runtime creation only
- **Zero allocation after warmup**: `spawn_compute()` uses per-type object pools
- **Custom async-rayon bridge**: Uses atomic wakers (~32 bytes) instead of channels (~80 bytes)
- **Main thread is separate**: Not part of worker pools
### spawn_compute Performance
| Pool hit | 0 bytes | ~100-500ns |
| Pool miss | ~32 bytes | ~100-500ns |
| First call per type | Pool + state | ~1µs |
Configure pool size for high-concurrency workloads:
```rust
let runtime = LoomBuilder::new()
.compute_pool_size(128) // Default is 64
.build()?;
```
## Patterns to Avoid
### 1. Nested spawn_compute (Deadlock Risk)
```rust
// BAD: Can deadlock if all rayon threads are waiting
runtime.spawn_compute(|| {
runtime.block_on(runtime.spawn_compute(|| work()))
}).await;
// GOOD: Use install() for nested parallelism
runtime.spawn_compute(|| {
runtime.install(|| {
data.par_iter().map(|x| process(x)).collect()
})
}).await;
```
### 2. Blocking I/O in spawn_compute
```rust
// BAD: Blocks rayon thread
runtime.spawn_compute(|| {
std::fs::read_to_string("file.txt")
}).await;
// GOOD: I/O in async, compute in rayon
let data = tokio::fs::read_to_string("file.txt").await?;
runtime.spawn_compute(|| process(&data)).await;
```
### 3. spawn_compute in Tight Loops
```rust
// OK (auto-pooling): Each call reuses pooled state
for item in items {
results.push(runtime.spawn_compute(|| process(item)).await);
}
// STILL BETTER for batch: Single cross-thread trip
});
```
### 4. Holding Locks Across spawn_compute
```rust
// BAD: Lock held during async gap
let guard = mutex.lock();
runtime.spawn_compute(|| use(&guard)).await;
// GOOD: Clone data, release lock
let data = mutex.lock().clone();
runtime.spawn_compute(move || process(data)).await;
```
### 5. install() Blocks the Thread
```rust
// CAUTION in async context: blocks tokio worker
runtime.spawn_async(async {
runtime.install(|| heavy_par_iter()); // Blocks!
}).await;
// BETTER: spawn_compute for async-safe bridge
runtime.spawn_async(async {
runtime.spawn_compute(|| heavy_par_iter()).await;
}).await;
```
### 6. Manual spawn_compute Loop on Streams
```rust
// WORKS but slower: Pool get/return for each item
while let Some(item) = stream.next().await {
let result = runtime.spawn_compute(|| process(item)).await;
results.push(result);
}
// BETTER: compute_map reuses internal state
let results: Vec<_> = stream
.compute_map(|item| process(item))
.collect()
.await;
```
## License
MIT