pub trait ComputeStreamExt: Stream {
// Required method
fn compute_map<F, U>(self, f: F) -> ComputeMap<Self, F, U>
where Self: Sized,
F: Fn(Self::Item) -> U + Send + Sync + 'static,
Self::Item: Send + 'static,
U: Send + 'static;
}Expand description
Extension trait for streams that adds compute-based processing methods.
This trait is automatically implemented for all types that implement Stream.
§Example
ⓘ
use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};
let runtime = LoomBuilder::new().build()?;
runtime.block_on(async {
let numbers = stream::iter(0..10);
// Each item is processed on rayon, results stream back in order
let results: Vec<_> = numbers
.compute_map(|n| n * 2)
.collect()
.await;
assert_eq!(results, vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
});Required Methods§
Sourcefn compute_map<F, U>(self, f: F) -> ComputeMap<Self, F, U>
fn compute_map<F, U>(self, f: F) -> ComputeMap<Self, F, U>
Map each stream item through a compute-heavy closure on rayon.
Items are processed sequentially (one at a time) to preserve stream ordering and provide natural backpressure.
§Performance
Unlike calling spawn_compute in a loop, compute_map reuses the same
internal TaskState for every item, avoiding per-item pool operations:
- First poll: Gets
TaskStatefrom pool (or allocates new) - Each item: ~100-500ns overhead, 0 allocations
- Stream drop: Returns
TaskStateto pool
§Panics
Panics if called outside a loom runtime context (i.e., not within block_on,
a tokio worker thread, or a rayon worker thread managed by the runtime).
§Example
ⓘ
use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};
let runtime = LoomBuilder::new().build()?;
runtime.block_on(async {
let results: Vec<_> = stream::iter(vec!["hello", "world"])
.compute_map(|s| s.to_uppercase())
.collect()
.await;
assert_eq!(results, vec!["HELLO", "WORLD"]);
});