Trait ex_futures::stream::StreamExt
[−]
[src]
pub trait StreamExt: Stream { fn cloneable(self) -> Cloneable<Self>
where
Self: Sized,
Self::Item: Clone,
Self::Error: Clone, { ... } fn unsync_cloneable(self) -> UnsyncCloneable<Self>
where
Self: Sized,
Self::Item: Clone,
Self::Error: Clone, { ... } fn fork<F, T>(self, router: F) -> (LeftFork<Self, F>, RightFork<Self, F>)
where
Self: Sized,
Self::Error: Clone,
F: FnMut(&Self::Item) -> T,
Side: From<T>, { ... } fn unsync_fork<F, T>(
self,
router: F
) -> (LeftUnsyncFork<Self, F>, RightUnsyncFork<Self, F>)
where
Self: Sized,
Self::Error: Clone,
F: FnMut(&Self::Item) -> T,
Side: From<T>, { ... } fn as_err<E>(self) -> AsErr<Self, E>
where
Self: Sized,
Self: Stream<Error = ()>, { ... } fn find_first<F>(self, f: F) -> FindFirst<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized, { ... } fn find_first_map<F, B>(self, f: F) -> FindFirstMap<Self, F>
where
F: FnMut(Self::Item) -> Option<B>,
Self: Sized, { ... } }
An extention of Stream
provided by futures
crate.
Any Stream
implements StreamExt
automatically.
All you is to import StreamExt
.
use ex_futures::StreamExt;
Provided Methods
fn cloneable(self) -> Cloneable<Self> where
Self: Sized,
Self::Item: Clone,
Self::Error: Clone,
Self: Sized,
Self::Item: Clone,
Self::Error: Clone,
Convert any kind of stream into "cloneable" stream.
The Item
and Error
need to implement Clone
. If not, consider wrap it by Arc
.
Notice
This feature is work. But does not have high performance.
If you need not to use Sync
, please use unsync_cloneable
function. That is fast enough.
Examples
use ex_futures::StreamExt; let (tx, rx) = ::futures::sync::mpsc::channel::<usize>(42); let cloneable_rx = rx.cloneable(); // Convert "rx" into cloneable. let cloneable_rx2 = cloneable_rx.clone(); // Now you can clone it.
fn unsync_cloneable(self) -> UnsyncCloneable<Self> where
Self: Sized,
Self::Item: Clone,
Self::Error: Clone,
Self: Sized,
Self::Item: Clone,
Self::Error: Clone,
Convert any kind of stream into "cloneable" stream but unsync.
If your stream emits non Clone
item or error, consider wrap it by Rc
.
Each cloneable stream has its own queue. And each item of original stream is cloned and queued to there.
Examples
use ex_futures::StreamExt; let (tx, rx) = ::futures::sync::mpsc::channel::<usize>(42); let cloneable_rx = rx.unsync_cloneable(); // Convert "rx" into cloneable. let cloneable_rx2 = cloneable_rx.clone(); // Now you can clone it.
Notice
The value being returned by this function is not Sync
. We will provide Sync
version later.
fn fork<F, T>(self, router: F) -> (LeftFork<Self, F>, RightFork<Self, F>) where
Self: Sized,
Self::Error: Clone,
F: FnMut(&Self::Item) -> T,
Side: From<T>,
Self: Sized,
Self::Error: Clone,
F: FnMut(&Self::Item) -> T,
Side: From<T>,
Fork any kind of stream into two stream like that the river branches.
The closure being passed this function is called "router". Each item of original stream is
passed to branch following to "router" decision.
"Router" can return not only Side
which is Left
or Right
but also
bool
(true
is considered as Left
).
Examples
use ex_futures::StreamExt; let (tx, rx) = ::futures::sync::mpsc::channel::<usize>(42); let (even, odd) = rx.fork(|i| i % 2 == 0);
fn unsync_fork<F, T>(
self,
router: F
) -> (LeftUnsyncFork<Self, F>, RightUnsyncFork<Self, F>) where
Self: Sized,
Self::Error: Clone,
F: FnMut(&Self::Item) -> T,
Side: From<T>,
self,
router: F
) -> (LeftUnsyncFork<Self, F>, RightUnsyncFork<Self, F>) where
Self: Sized,
Self::Error: Clone,
F: FnMut(&Self::Item) -> T,
Side: From<T>,
Fork any kind of stream into two "unsync" stream.
The closure being passed this function is called "router". Each item of original stream is
passed to branch following to "router" decision.
"Router" can return not only Side
which is Left
or Right
but also
bool
(true
is considered as Left
).
Examples
use ex_futures::StreamExt; let (tx, rx) = ::futures::sync::mpsc::channel::<usize>(42); let (even, odd) = rx.unsync_fork(|i| i % 2 == 0);
Notice
The value being returned by this function is not Sync
. We will provide Sync
version later.
fn as_err<E>(self) -> AsErr<Self, E> where
Self: Sized,
Self: Stream<Error = ()>,
Self: Sized,
Self: Stream<Error = ()>,
Converts Error
association type which is () into any kind of type you want.
Examples
use ex_futures::StreamExt; let (tx, rx) = ::futures::sync::mpsc::channel::<usize>(42); struct MyError(); let rx = rx.as_err::<MyError>(); // Accomplished by this function.
fn find_first<F>(self, f: F) -> FindFirst<Self, F> where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
F: FnMut(&Self::Item) -> bool,
Self: Sized,
Returns Future
which will be completed when find first item you want.
Examples
use ex_futures::StreamExt; use futures::{Sink, Future}; let (tx, rx) = ::futures::unsync::mpsc::channel::<usize>(42); tx.send(42).wait(); let fut = rx.find_first(|i| *i == 42); let (the_Answer_to_the_Ultimate_Question_of_Life_the_Universe_and_Everything, rx) = fut.wait().unwrap();
fn find_first_map<F, B>(self, f: F) -> FindFirstMap<Self, F> where
F: FnMut(Self::Item) -> Option<B>,
Self: Sized,
F: FnMut(Self::Item) -> Option<B>,
Self: Sized,
Similar function to StreamExt::find_first
. The only deference is that this function "maps"
the result.
Examples
use ex_futures::StreamExt; use futures::{Sink, Future}; let (tx, rx) = ::futures::unsync::mpsc::channel::<usize>(42); tx.send(0).wait(); let first_odd_half_fut = rx.find_first_map(|i| if i % 2 == 0 { Some(i / 2) } else { None }); let (first_odd_half, continue_rx) = first_odd_half_fut.wait().unwrap();
Implementors
impl<S> StreamExt for S where
S: Stream,