Skip to main content

ComputeStreamExt

Trait ComputeStreamExt 

Source
pub trait ComputeStreamExt: Stream {
    // Required methods
    fn compute_map<F, U>(self, f: F) -> ComputeMap<Self, F, U>
       where Self: Sized,
             F: Fn(Self::Item) -> U + Send + Sync + 'static,
             Self::Item: Send + 'static,
             U: Send + 'static;
    fn adaptive_map<F, U>(self, f: F) -> AdaptiveMap<Self, F, U>
       where Self: Sized,
             F: Fn(Self::Item) -> U + Send + Sync + 'static,
             Self::Item: ComputeHintProvider + Send + 'static,
             U: Send + 'static;
}
Expand description

Extension trait for streams that adds compute-based processing methods.

This trait is automatically implemented for all types that implement Stream.

§Example

use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};

let runtime = LoomBuilder::new().build()?;

runtime.block_on(async {
    let numbers = stream::iter(0..10);

    // Each item is processed on rayon, results stream back in order
    let results: Vec<_> = numbers
        .compute_map(|n| n * 2)
        .collect()
        .await;

    assert_eq!(results, vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
});

Required Methods§

Source

fn compute_map<F, U>(self, f: F) -> ComputeMap<Self, F, U>
where Self: Sized, F: Fn(Self::Item) -> U + Send + Sync + 'static, Self::Item: Send + 'static, U: Send + 'static,

Map each stream item through a compute-heavy closure on rayon.

Items are processed sequentially (one at a time) to preserve stream ordering and provide natural backpressure.

§Performance

Unlike calling spawn_compute in a loop, compute_map reuses the same internal TaskState for every item, avoiding per-item pool operations:

  • First poll: Gets TaskState from pool (or allocates new)
  • Each item: ~100-500ns overhead, 0 allocations
  • Stream drop: Returns TaskState to pool
§Panics

Panics if called outside a loom runtime context (i.e., not within block_on, a tokio worker thread, or a rayon worker thread managed by the runtime).

§Example
use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};

let runtime = LoomBuilder::new().build()?;

runtime.block_on(async {
    let results: Vec<_> = stream::iter(vec!["hello", "world"])
        .compute_map(|s| s.to_uppercase())
        .collect()
        .await;

    assert_eq!(results, vec!["HELLO", "WORLD"]);
});
Source

fn adaptive_map<F, U>(self, f: F) -> AdaptiveMap<Self, F, U>
where Self: Sized, F: Fn(Self::Item) -> U + Send + Sync + 'static, Self::Item: ComputeHintProvider + Send + 'static, U: Send + 'static,

Adaptively map items, choosing inline vs offload per item.

Each stream instance maintains its own MAB scheduler state for immediate feedback learning. The scheduler learns from execution times and adapts its decisions to minimize total cost.

If the input item implements ComputeHintProvider, the hint is used to guide cold-start behavior before the scheduler has learned enough.

§When to Use

Use adaptive_map when:

  • You’re unsure if work is cheap enough for inline execution
  • Work complexity varies significantly across items
  • You want automatic adaptation without manual tuning

Use compute_map when:

  • Work is consistently expensive (> 250µs)
  • You want guaranteed offload behavior
§Example
use loom_rs::{LoomBuilder, ComputeStreamExt};
use futures::stream::{self, StreamExt};

let runtime = LoomBuilder::new().build()?;

runtime.block_on(async {
    // Scheduler learns that small items are fast (inline)
    // and large items are slow (offload)
    let results: Vec<_> = stream::iter(data)
        .adaptive_map(|item| process(item))
        .collect()
        .await;
});

Implementors§