loom-rs 0.3.1

Weaving threads together - A bespoke thread pool runtime combining tokio and rayon with CPU pinning capabilities
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# loom-rs

**Weaving threads together**

A Rust crate providing a bespoke thread pool runtime combining tokio and rayon with CPU pinning capabilities.

## Features

- **Hybrid Runtime**: Combines tokio for async I/O with rayon for CPU-bound parallel work
- **CPU Pinning**: Automatically pins threads to specific CPUs for consistent performance
- **Flexible Configuration**: Configure via files (TOML/YAML/JSON), environment variables, or code
- **CLI Integration**: Built-in clap support for command-line overrides
- **CUDA NUMA Awareness**: Optional feature for selecting CPUs local to a GPU (Linux only)
- **Adaptive Scheduling**: [MAB-based scheduler]docs/mab.md learns optimal inline vs offload decisions

## Platform Support

| Platform | Status | Notes |
|----------|--------|-------|
| Linux | **Full support** | All features including CPU pinning and CUDA |
| macOS | Partial | Compiles and runs, but CPU pinning may silently fail |
| Windows | Partial | Compiles and runs, but CPU pinning may silently fail |

**Note**: CPU affinity (thread pinning) is a Linux-focused feature. On macOS and Windows, pinning calls may return failure or have no effect. The library remains functional for development and testing, but production deployments targeting performance should use Linux.

## Installation

```bash
cargo add loom-rs
```

For CUDA support (Linux only):

```bash
cargo add loom-rs --features cuda
```

## Quick Start

```rust
use loom_rs::LoomBuilder;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let runtime = LoomBuilder::new()
        .prefix("myapp")
        .tokio_threads(2)
        .rayon_threads(6)
        .build()?;

    runtime.block_on(async {
        // Spawn tracked async I/O task
        let io_handle = runtime.spawn_async(async {
            // Async I/O work
            fetch_data().await
        });

        // Spawn tracked compute task and await result
        let result = runtime.spawn_compute(|| {
            // CPU-bound work on rayon
            (0..1000000).sum::<i64>()
        }).await;
        println!("Compute result: {}", result);

        // Zero-overhead parallel iterators
        let processed = runtime.install(|| {
            use rayon::prelude::*;
            data.par_iter().map(|x| process(x)).collect::<Vec<_>>()
        });

        // Wait for async task
        let data = io_handle.await?;
    });

    // Graceful shutdown - waits for all tracked tasks
    runtime.block_until_idle();

    Ok(())
}
```

## Configuration

Configuration sources are merged in order (later sources override earlier):

1. Default values
2. Config files (via `.file()`)
3. Environment variables (via `.env_prefix()`)
4. Programmatic overrides
5. CLI arguments (via `.with_cli_args()`)

### Config File (TOML)

```toml
prefix = "myapp"
cpuset = "0-7,16-23"
tokio_threads = 2
rayon_threads = 14
```

### Environment Variables

With `.env_prefix("LOOM")`:

```bash
export LOOM_PREFIX=myapp
export LOOM_CPUSET=0-7
export LOOM_TOKIO_THREADS=2
export LOOM_RAYON_THREADS=6
```

### CLI Arguments

```rust
use clap::Parser;
use loom_rs::{LoomBuilder, LoomArgs};

#[derive(Parser)]
struct MyArgs {
    #[command(flatten)]
    loom: LoomArgs,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let args = MyArgs::parse();
    let runtime = LoomBuilder::new()
        .file("config.toml")
        .env_prefix("LOOM")
        .with_cli_args(&args.loom)
        .build()?;
    Ok(())
}
```

Available CLI arguments:
- `--loom-prefix`: Thread name prefix
- `--loom-cpuset`: CPU set (e.g., "0-7,16-23")
- `--loom-tokio-threads`: Number of tokio threads
- `--loom-rayon-threads`: Number of rayon threads
- `--loom-cuda-device`: CUDA device ID or UUID (requires `cuda` feature)

## CPU Set Format

The `cpuset` option accepts a string in Linux taskset/numactl format:

- Single CPUs: `"0"`, `"5"`
- Ranges: `"0-7"`, `"16-23"`
- Mixed: `"0-3,8-11"`, `"0,2,4,6-8"`

## CUDA Support

With the `cuda` feature enabled (Linux only), configure the runtime to use CPUs local to a specific GPU.

### System Dependencies

```bash
sudo apt-get install libhwloc-dev libudev-dev
```

### Usage

```rust
let runtime = LoomBuilder::new()
    .cuda_device_id(0)  // Use CPUs near GPU 0
    .build()?;

// Or by UUID
let runtime = LoomBuilder::new()
    .cuda_device_uuid("GPU-12345678-1234-1234-1234-123456789012")
    .build()?;
```

This is useful for GPU-accelerated workloads where data needs to be transferred between CPU and GPU memory, as it minimizes NUMA-related latency.

## Thread Naming

Threads are named with the configured prefix:

- Tokio threads: `{prefix}-tokio-0000`, `{prefix}-tokio-0001`, ...
- Rayon threads: `{prefix}-rayon-0000`, `{prefix}-rayon-0001`, ...

## API Reference

### Task Spawning

| Method | Use Case | Overhead | Tracked |
|--------|----------|----------|---------|
| `spawn_async()` | I/O-bound async tasks | ~10ns | Yes |
| `spawn_compute()` | CPU-bound work (always offload) | ~100-500ns | Yes |
| `spawn_adaptive()` | CPU work (MAB decides inline/offload) | ~50-200ns | Yes |
| `compute_map()` | Stream -> rayon -> stream | ~100-500ns/item | No |
| `adaptive_map()` | Stream with MAB decisions | ~50-200ns/item | No |
| `install()` | Zero-overhead parallel iterators | ~0ns | No |

### Shutdown

```rust
// Option 1: Simple shutdown from main thread
runtime.block_until_idle();

// Option 2: Manual control from async context
runtime.block_on(async {
    runtime.spawn_async(background_work());

    // Signal shutdown
    runtime.shutdown();

    // Wait for completion
    runtime.wait_for_shutdown().await;
});

// Option 3: Check status without blocking
if runtime.is_idle() {
    println!("All tasks complete");
}
```

### Direct Access (Untracked)

For advanced use cases requiring untracked access:

```rust
// Direct tokio handle
let handle = runtime.tokio_handle();
handle.spawn(untracked_task());

// Direct rayon pool
let pool = runtime.rayon_pool();
pool.spawn(|| untracked_work());
```

## Ergonomic Access

Use `current_runtime()` or `spawn_compute()` from anywhere in the runtime:

```rust
use loom_rs::LoomBuilder;

let runtime = LoomBuilder::new().build()?;

runtime.block_on(async {
    // No need to pass &runtime around
    let result = loom_rs::spawn_compute(|| expensive_work()).await;

    // Or get the runtime explicitly
    let rt = loom_rs::current_runtime().unwrap();
    rt.spawn_async(async { /* ... */ });
});
```

## Stream Processing

Use `ComputeStreamExt` to process async stream items on rayon:

```rust
use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};

let runtime = LoomBuilder::new().build()?;

runtime.block_on(async {
    let numbers = stream::iter(0..100);

    // Each item is processed on rayon, results stream back
    let results: Vec<_> = numbers
        .compute_map(|n| {
            // CPU-intensive work runs on rayon
            (0..n).map(|i| i * i).sum::<i64>()
        })
        .collect()
        .await;
});
```

This is ideal for pipelines where you:
1. Await values from an async source (network, channel, file)
2. Process each value with CPU-intensive work
3. Continue the async pipeline with the results

Items are processed sequentially to preserve ordering and provide natural backpressure.

## Adaptive Scheduling (MAB)

loom-rs includes a Multi-Armed Bandit (MAB) scheduler that learns whether to run
compute work inline on Tokio or offload to Rayon. This eliminates the need to
manually tune offload decisions - the scheduler adapts to your actual workload.

### Stream Mode

```rust
use loom_rs::ComputeStreamExt;

// MAB learns optimal strategy per-closure
let results: Vec<_> = stream
    .adaptive_map(|item| process(item))
    .collect()
    .await;
```

### One-Shot Mode

```rust
// For request handlers - MAB adapts per function type
let result = runtime.spawn_adaptive(|| handle_request(data)).await;
```

### Key Features

- **Thompson Sampling**: Balances exploration vs exploitation
- **Guardrails**: 4 layers of Tokio starvation protection (GR0-GR3)
- **Pressure-Aware**: Adjusts decisions based on runtime load
- **Low Overhead**: ~50-200ns per decision

See [docs/mab.md]docs/mab.md for the complete design and configuration options.

## Performance

loom-rs is designed for zero unnecessary overhead:

- **Thread pinning**: One-time cost at runtime creation only
- **Zero allocation after warmup**: `spawn_compute()` uses per-type object pools
- **Custom async-rayon bridge**: Uses atomic wakers (~32 bytes) instead of channels (~80 bytes)
- **Main thread is separate**: Not part of worker pools

### spawn_compute Performance

| State | Allocations | Overhead |
|-------|-------------|----------|
| Pool hit | 0 bytes | ~100-500ns |
| Pool miss | ~32 bytes | ~100-500ns |
| First call per type | Pool + state | ~1µs |

Configure pool size for high-concurrency workloads:

```rust
let runtime = LoomBuilder::new()
    .compute_pool_size(128)  // Default is 64
    .build()?;
```

## Patterns to Avoid

### 1. Nested spawn_compute (Deadlock Risk)

```rust
// BAD: Can deadlock if all rayon threads are waiting
runtime.spawn_compute(|| {
    runtime.block_on(runtime.spawn_compute(|| work()))
}).await;

// GOOD: Use install() for nested parallelism
runtime.spawn_compute(|| {
    runtime.install(|| {
        data.par_iter().map(|x| process(x)).collect()
    })
}).await;
```

### 2. Blocking I/O in spawn_compute

```rust
// BAD: Blocks rayon thread
runtime.spawn_compute(|| {
    std::fs::read_to_string("file.txt")
}).await;

// GOOD: I/O in async, compute in rayon
let data = tokio::fs::read_to_string("file.txt").await?;
runtime.spawn_compute(|| process(&data)).await;
```

### 3. spawn_compute in Tight Loops

```rust
// OK (auto-pooling): Each call reuses pooled state
for item in items {
    results.push(runtime.spawn_compute(|| process(item)).await);
}

// STILL BETTER for batch: Single cross-thread trip
let results = runtime.install(|| {
    items.par_iter().map(|item| process(item)).collect()
});
```

### 4. Holding Locks Across spawn_compute

```rust
// BAD: Lock held during async gap
let guard = mutex.lock();
runtime.spawn_compute(|| use(&guard)).await;

// GOOD: Clone data, release lock
let data = mutex.lock().clone();
runtime.spawn_compute(move || process(data)).await;
```

### 5. install() Blocks the Thread

```rust
// CAUTION in async context: blocks tokio worker
runtime.spawn_async(async {
    runtime.install(|| heavy_par_iter());  // Blocks!
}).await;

// BETTER: spawn_compute for async-safe bridge
runtime.spawn_async(async {
    runtime.spawn_compute(|| heavy_par_iter()).await;
}).await;
```

### 6. Manual spawn_compute Loop on Streams

```rust
// WORKS but slower: Pool get/return for each item
while let Some(item) = stream.next().await {
    let result = runtime.spawn_compute(|| process(item)).await;
    results.push(result);
}

// BETTER: compute_map reuses internal state
let results: Vec<_> = stream
    .compute_map(|item| process(item))
    .collect()
    .await;
```

## License

MIT