pub trait StreamExt where
    Self: Stream, 
{ fn shared(self) -> Shared<Self>;
fn with_state<B>(self, init: B) -> WithState<Self, B>
    where
        Self: Sized
;
fn wait_until<Fut>(self, fut: Fut) -> WaitUntil<Self, Fut>
    where
        Fut: Future<Output = bool>
;
fn reduce<F, Fut>(self, f: F) -> Reduce<Self, F, Fut>;
fn batching<T, F, Fut>(self, f: F) -> Batching<Self, T, F, Fut>
    where
        Self: Sized,
        F: 'static + Send + FnMut(Self) -> Fut,
        Fut: 'static + Future<Output = Option<(T, Self)>> + Send,
        T: 'static + Send
;
fn stateful_batching<T, B, F, Fut>(
        self,
        init: B,
        f: F
    ) -> StatefulBatching<Self, B, T, F, Fut>
    where
        Self: Sized + Stream,
        F: FnMut(B, Self) -> Fut,
        Fut: Future<Output = Option<(T, B, Self)>>
;
fn stateful_then<T, B, F, Fut>(
        self,
        init: B,
        f: F
    ) -> StatefulThen<Self, B, T, F, Fut>
    where
        Self: Stream,
        F: FnMut(B, Self::Item) -> Fut,
        Fut: Future<Output = Option<(B, T)>>
;
fn stateful_map<T, B, F>(self, init: B, f: F) -> StatefulMap<Self, B, T, F>
    where
        Self: Stream,
        F: FnMut(B, Self::Item) -> Option<(B, T)>
; }
Expand description

The trait extneds Stream types with extra combinators.

Required methods

Creates a shareable stream that can clone the ownership of the stream.

use futures::prelude::*;
use par_stream::prelude::*;
use std::mem;

// Creates two sharing handles to the stream
let stream = stream::iter(0..100);
let shared1 = stream.shared();
let shared2 = shared1.clone();

// Consumes the shared streams individually
let collect1 = par_stream::rt::spawn(shared1.collect());
let collect2 = par_stream::rt::spawn(shared2.collect());
let (vec1, vec2): (Vec<_>, Vec<_>) = futures::join!(collect1, collect2);

// Checks that the combined values of two vecs are equal to original values
let mut all_vec: Vec<_> = vec1.into_iter().chain(vec2).collect();
all_vec.sort();
itertools::assert_equal(all_vec, 0..100);

Binds the stream with a state value.

It turns the stream item into (item, state). The state is a mutable handle to the state value which is initialized to init. The state can be modified as user desires. The state must be given back by state.send() or be dropped so that the stream can proceed to the next iteration. If state.close() is called, the state is discarded and terminates the stream.

Take elements after the provided future resolves to true, otherwise fuse the stream.

The stream waits for the future fut to resolve. Once the future becomes ready The stream starts taking elements if it resolves to true. If the future returns false, this stream combinator will always return that the stream is done.

Examples
use futures::{prelude::*, stream, stream::StreamExt as _};
use par_stream::prelude::*;

use std::{
    sync::{
        atomic::{AtomicBool, Ordering::*},
        Arc,
    },
    time::Duration,
};

let is_ready = Arc::new(AtomicBool::new(false));

stream::iter(0..10)
    .wait_until(async {
        par_stream::rt::sleep(Duration::from_millis(200)).await;
        is_ready.store(true, SeqCst);
        true
    })
    .for_each(|_| async {
        assert!(is_ready.load(SeqCst));
    })
    .await;

Reduces the stream items into a single value.

The f(item, item) -> item returns a future that reduces two stream items into one value. If the stream is empty, this stream combinator resolves to None. Otherwise it resolves to the reduced value Some(value).

The combinator consumes as many items from the stream as it likes for each output item.

The function f(stream) -> Option<(output, stream)> returns a future that takes values from the stream, and returns combined values and the stream back. If it returns None, this stream combinator stops producing future values.

use futures::prelude::*;
use par_stream::prelude::*;
use std::mem;

let data = vec![1, 2, -3, 4, 5, -6, 7, 8];
let mut stream = stream::iter(data)
    .batching(|mut stream| async move {
        let mut buffer = vec![];
        while let Some(value) = stream.next().await {
            buffer.push(value);
            if value < 0 {
                break;
            }
        }

        (!buffer.is_empty()).then(|| (buffer, stream))
    })
    .boxed();

assert_eq!(stream.next().await, Some(vec![1, 2, -3]));
assert_eq!(stream.next().await, Some(vec![4, 5, -6]));
assert_eq!(stream.next().await, Some(vec![7, 8]));
assert!(stream.next().await.is_none());

Similar to batching but with a state.

The batching funtion f(state, stream) -> Option<(output, state, stream)> returns a future that takes items from stream as many as it wants, and returns the output and gives the state and stream back.

use futures::{stream, stream::StreamExt as _};
use par_stream::StreamExt as _;

let vec: Vec<_> = stream::iter([1i32, 1, 1, -1, -1, 1])
    .stateful_batching(None, |mut sum: Option<i32>, mut stream| async move {
        while let Some(val) = stream.next().await {
            match &mut sum {
                Some(sum) => {
                    if sum.signum() == val.signum() {
                        *sum += val;
                    } else {
                        return Some((*sum, Some(val), stream));
                    }
                }
                sum => *sum = Some(val),
            }
        }

        match sum {
            Some(sum) => Some((sum, None, stream)),
            None => None,
        }
    })
    .collect()
    .await;

assert_eq!(vec, [3, -2, 1]);

Similar to then but with a state.

Similar to map but with a state.

Implementors