forest/utils/
stream.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3use futures::{Stream, StreamExt};
4
5/// Decouple stream generation and stream consumption into separate threads,
6/// keeping not-yet-consumed elements in a bounded queue. This is similar to
7/// [`stream::buffered`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.buffered)
8/// and
9/// [`sink::buffer`](https://docs.rs/futures/latest/futures/sink/trait.SinkExt.html#method.buffer).
10/// The key difference is that [`par_buffer`] is parallel rather than concurrent
11/// and will make use of multiple cores when both the stream and the stream
12/// consumer are CPU-bound. Because a new thread is spawned, the stream has to
13/// be [`Sync`], [`Send`] and `'static`.
14pub fn par_buffer<V: Send + Sync + 'static>(
15    cap: usize,
16    stream: impl Stream<Item = V> + Send + Sync + 'static,
17) -> impl Stream<Item = V> {
18    let (send, recv) = flume::bounded(cap);
19    tokio::task::spawn(stream.map(Ok).forward(send.into_sink()));
20    recv.into_stream()
21}