Trait par_stream::prelude::StreamExt
source · [−]pub trait StreamExt where
Self: Stream, {
fn shared(self) -> Shared<Self>;
fn with_state<B>(self, init: B) -> WithState<Self, B>
where
Self: Sized;
fn wait_until<Fut>(self, fut: Fut) -> WaitUntil<Self, Fut>
where
Fut: Future<Output = bool>;
fn reduce<F, Fut>(self, f: F) -> Reduce<Self, F, Fut>;
fn batching<T, F, Fut>(self, f: F) -> Batching<Self, T, F, Fut>
where
Self: Sized,
F: 'static + Send + FnMut(Self) -> Fut,
Fut: 'static + Future<Output = Option<(T, Self)>> + Send,
T: 'static + Send;
fn stateful_batching<T, B, F, Fut>(
self,
init: B,
f: F
) -> StatefulBatching<Self, B, T, F, Fut>
where
Self: Sized + Stream,
F: FnMut(B, Self) -> Fut,
Fut: Future<Output = Option<(T, B, Self)>>;
fn stateful_then<T, B, F, Fut>(
self,
init: B,
f: F
) -> StatefulThen<Self, B, T, F, Fut>
where
Self: Stream,
F: FnMut(B, Self::Item) -> Fut,
Fut: Future<Output = Option<(B, T)>>;
fn stateful_map<T, B, F>(self, init: B, f: F) -> StatefulMap<Self, B, T, F>
where
Self: Stream,
F: FnMut(B, Self::Item) -> Option<(B, T)>;
}
Expand description
The trait extneds Stream types with extra combinators.
Required methods
Creates a shareable stream that can clone the ownership of the stream.
use futures::prelude::*;
use par_stream::prelude::*;
use std::mem;
// Creates two sharing handles to the stream
let stream = stream::iter(0..100);
let shared1 = stream.shared();
let shared2 = shared1.clone();
// Consumes the shared streams individually
let collect1 = par_stream::rt::spawn(shared1.collect());
let collect2 = par_stream::rt::spawn(shared2.collect());
let (vec1, vec2): (Vec<_>, Vec<_>) = futures::join!(collect1, collect2);
// Checks that the combined values of two vecs are equal to original values
let mut all_vec: Vec<_> = vec1.into_iter().chain(vec2).collect();
all_vec.sort();
itertools::assert_equal(all_vec, 0..100);
fn with_state<B>(self, init: B) -> WithState<Self, B> where
Self: Sized,
fn with_state<B>(self, init: B) -> WithState<Self, B> where
Self: Sized,
Binds the stream with a state value.
It turns the stream item into (item, state)
. The state
is a mutable
handle to the state value which is initialized to init
. The state
can
be modified as user desires. The state must be given back by state.send()
or be dropped so that the stream can proceed to the next iteration. If
state.close()
is called, the state is discarded and terminates the stream.
fn wait_until<Fut>(self, fut: Fut) -> WaitUntil<Self, Fut> where
Fut: Future<Output = bool>,
fn wait_until<Fut>(self, fut: Fut) -> WaitUntil<Self, Fut> where
Fut: Future<Output = bool>,
Take elements after the provided future resolves to true
, otherwise fuse the stream.
The stream waits for the future fut
to resolve. Once the future becomes ready
The stream starts taking elements if it resolves to true
. If the future returns false
,
this stream combinator will always return that the stream is done.
Examples
use futures::{prelude::*, stream, stream::StreamExt as _};
use par_stream::prelude::*;
use std::{
sync::{
atomic::{AtomicBool, Ordering::*},
Arc,
},
time::Duration,
};
let is_ready = Arc::new(AtomicBool::new(false));
stream::iter(0..10)
.wait_until(async {
par_stream::rt::sleep(Duration::from_millis(200)).await;
is_ready.store(true, SeqCst);
true
})
.for_each(|_| async {
assert!(is_ready.load(SeqCst));
})
.await;
Reduces the stream items into a single value.
The f(item, item) -> item
returns a future that reduces two stream items into one value.
If the stream is empty, this stream combinator resolves to None
. Otherwise it resolves to
the reduced value Some(value)
.
The combinator consumes as many items from the stream as it likes for each output item.
The function f(stream) -> Option<(output, stream)>
returns a future that takes values
from the stream, and returns combined values and the stream back. If it returns None
,
this stream combinator stops producing future values.
use futures::prelude::*;
use par_stream::prelude::*;
use std::mem;
let data = vec![1, 2, -3, 4, 5, -6, 7, 8];
let mut stream = stream::iter(data)
.batching(|mut stream| async move {
let mut buffer = vec![];
while let Some(value) = stream.next().await {
buffer.push(value);
if value < 0 {
break;
}
}
(!buffer.is_empty()).then(|| (buffer, stream))
})
.boxed();
assert_eq!(stream.next().await, Some(vec![1, 2, -3]));
assert_eq!(stream.next().await, Some(vec![4, 5, -6]));
assert_eq!(stream.next().await, Some(vec![7, 8]));
assert!(stream.next().await.is_none());
fn stateful_batching<T, B, F, Fut>(
self,
init: B,
f: F
) -> StatefulBatching<Self, B, T, F, Fut> where
Self: Sized + Stream,
F: FnMut(B, Self) -> Fut,
Fut: Future<Output = Option<(T, B, Self)>>,
fn stateful_batching<T, B, F, Fut>(
self,
init: B,
f: F
) -> StatefulBatching<Self, B, T, F, Fut> where
Self: Sized + Stream,
F: FnMut(B, Self) -> Fut,
Fut: Future<Output = Option<(T, B, Self)>>,
Similar to batching but with a state.
The batching funtion f(state, stream) -> Option<(output, state, stream)>
returns a future
that takes items from stream as many as it wants, and returns the output and gives the state
and stream back.
use futures::{stream, stream::StreamExt as _};
use par_stream::StreamExt as _;
let vec: Vec<_> = stream::iter([1i32, 1, 1, -1, -1, 1])
.stateful_batching(None, |mut sum: Option<i32>, mut stream| async move {
while let Some(val) = stream.next().await {
match &mut sum {
Some(sum) => {
if sum.signum() == val.signum() {
*sum += val;
} else {
return Some((*sum, Some(val), stream));
}
}
sum => *sum = Some(val),
}
}
match sum {
Some(sum) => Some((sum, None, stream)),
None => None,
}
})
.collect()
.await;
assert_eq!(vec, [3, -2, 1]);
fn stateful_then<T, B, F, Fut>(
self,
init: B,
f: F
) -> StatefulThen<Self, B, T, F, Fut> where
Self: Stream,
F: FnMut(B, Self::Item) -> Fut,
Fut: Future<Output = Option<(B, T)>>,
fn stateful_then<T, B, F, Fut>(
self,
init: B,
f: F
) -> StatefulThen<Self, B, T, F, Fut> where
Self: Stream,
F: FnMut(B, Self::Item) -> Fut,
Fut: Future<Output = Option<(B, T)>>,
Similar to then but with a state.
fn stateful_map<T, B, F>(self, init: B, f: F) -> StatefulMap<Self, B, T, F> where
Self: Stream,
F: FnMut(B, Self::Item) -> Option<(B, T)>,
fn stateful_map<T, B, F>(self, init: B, f: F) -> StatefulMap<Self, B, T, F> where
Self: Stream,
F: FnMut(B, Self::Item) -> Option<(B, T)>,
Similar to map but with a state.