Skip to main content

ComputeStreamExt

Trait ComputeStreamExt 

Source
pub trait ComputeStreamExt: Stream {
    // Required method
    fn compute_map<F, U>(self, f: F) -> ComputeMap<Self, F, U>
       where Self: Sized,
             F: Fn(Self::Item) -> U + Send + Sync + 'static,
             Self::Item: Send + 'static,
             U: Send + 'static;
}
Expand description

Extension trait for streams that adds compute-based processing methods.

This trait is automatically implemented for all types that implement Stream.

§Example

use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};

let runtime = LoomBuilder::new().build()?;

runtime.block_on(async {
    let numbers = stream::iter(0..10);

    // Each item is processed on rayon, results stream back in order
    let results: Vec<_> = numbers
        .compute_map(|n| n * 2)
        .collect()
        .await;

    assert_eq!(results, vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
});

Required Methods§

Source

fn compute_map<F, U>(self, f: F) -> ComputeMap<Self, F, U>
where Self: Sized, F: Fn(Self::Item) -> U + Send + Sync + 'static, Self::Item: Send + 'static, U: Send + 'static,

Map each stream item through a compute-heavy closure on rayon.

Items are processed sequentially (one at a time) to preserve stream ordering and provide natural backpressure.

§Performance

Unlike calling spawn_compute in a loop, compute_map reuses the same internal TaskState for every item, avoiding per-item pool operations:

  • First poll: Gets TaskState from pool (or allocates new)
  • Each item: ~100-500ns overhead, 0 allocations
  • Stream drop: Returns TaskState to pool
§Panics

Panics if called outside a loom runtime context (i.e., not within block_on, a tokio worker thread, or a rayon worker thread managed by the runtime).

§Example
use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};

let runtime = LoomBuilder::new().build()?;

runtime.block_on(async {
    let results: Vec<_> = stream::iter(vec!["hello", "world"])
        .compute_map(|s| s.to_uppercase())
        .collect()
        .await;

    assert_eq!(results, vec!["HELLO", "WORLD"]);
});

Implementors§