pub trait ComputeStreamExt: Stream {
// Required methods
fn compute_map<F, U>(self, f: F) -> ComputeMap<Self, F, U>
where Self: Sized,
F: Fn(Self::Item) -> U + Send + Sync + 'static,
Self::Item: Send + 'static,
U: Send + 'static;
fn adaptive_map<F, U>(self, f: F) -> AdaptiveMap<Self, F, U>
where Self: Sized,
F: Fn(Self::Item) -> U + Send + Sync + 'static,
Self::Item: ComputeHintProvider + Send + 'static,
U: Send + 'static;
}Expand description
Extension trait for streams that adds compute-based processing methods.
This trait is automatically implemented for all types that implement Stream.
§Example
use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};
let runtime = LoomBuilder::new().build()?;
runtime.block_on(async {
let numbers = stream::iter(0..10);
// Each item is processed on rayon, results stream back in order
let results: Vec<_> = numbers
.compute_map(|n| n * 2)
.collect()
.await;
assert_eq!(results, vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
});Required Methods§
Sourcefn compute_map<F, U>(self, f: F) -> ComputeMap<Self, F, U>
fn compute_map<F, U>(self, f: F) -> ComputeMap<Self, F, U>
Map each stream item through a compute-heavy closure on rayon.
Items are processed sequentially (one at a time) to preserve stream ordering and provide natural backpressure.
§Performance
Unlike calling spawn_compute in a loop, compute_map reuses the same
internal TaskState for every item, avoiding per-item pool operations:
- First poll: Gets
TaskStatefrom pool (or allocates new) - Each item: ~100-500ns overhead, 0 allocations
- Stream drop: Returns
TaskStateto pool
§Panics
Panics if called outside a loom runtime context (i.e., not within block_on,
a tokio worker thread, or a rayon worker thread managed by the runtime).
§Example
use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};
let runtime = LoomBuilder::new().build()?;
runtime.block_on(async {
let results: Vec<_> = stream::iter(vec!["hello", "world"])
.compute_map(|s| s.to_uppercase())
.collect()
.await;
assert_eq!(results, vec!["HELLO", "WORLD"]);
});Sourcefn adaptive_map<F, U>(self, f: F) -> AdaptiveMap<Self, F, U>
fn adaptive_map<F, U>(self, f: F) -> AdaptiveMap<Self, F, U>
Adaptively map items, choosing inline vs offload per item.
Each stream instance maintains its own MAB scheduler state for immediate feedback learning. The scheduler learns from execution times and adapts its decisions to minimize total cost.
If the input item implements ComputeHintProvider, the hint is used
to guide cold-start behavior before the scheduler has learned enough.
§When to Use
Use adaptive_map when:
- You’re unsure if work is cheap enough for inline execution
- Work complexity varies significantly across items
- You want automatic adaptation without manual tuning
Use compute_map when:
- Work is consistently expensive (> 250µs)
- You want guaranteed offload behavior
§Example
use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};
let runtime = LoomBuilder::new().build()?;
runtime.block_on(async {
// Scheduler learns that small items are fast (inline)
// and large items are slow (offload)
let results: Vec<_> = stream::iter(data)
.adaptive_map(|item| process(item))
.collect()
.await;
});