Trait ex_futures::stream::StreamExt [] [src]

pub trait StreamExt: Stream {
    fn cloneable(self) -> Cloneable<Self>
        Self: Sized,
        Self::Item: Clone,
        Self::Error: Clone
, { ... }
fn unsync_cloneable(self) -> UnsyncCloneable<Self>
        Self: Sized,
        Self::Item: Clone,
        Self::Error: Clone
, { ... }
fn fork<F, T>(self, router: F) -> (LeftFork<Self, F>, RightFork<Self, F>)
        Self: Sized,
        Self::Error: Clone,
        F: FnMut(&Self::Item) -> T,
        Side: From<T>
, { ... }
fn unsync_fork<F, T>(
        router: F
    ) -> (LeftUnsyncFork<Self, F>, RightUnsyncFork<Self, F>)
        Self: Sized,
        Self::Error: Clone,
        F: FnMut(&Self::Item) -> T,
        Side: From<T>
, { ... }
fn as_err<E>(self) -> AsErr<Self, E>
        Self: Sized,
        Self: Stream<Error = ()>
, { ... }
fn find_first<F>(self, f: F) -> FindFirst<Self, F>
        F: FnMut(&Self::Item) -> bool,
        Self: Sized
, { ... }
fn find_first_map<F, B>(self, f: F) -> FindFirstMap<Self, F>
        F: FnMut(Self::Item) -> Option<B>,
        Self: Sized
, { ... } }

An extention of Stream provided by futures crate. Any Stream implements StreamExt automatically. All you is to import StreamExt.

use ex_futures::StreamExt;

Provided Methods

Convert any kind of stream into "cloneable" stream. The Item and Error need to implement Clone. If not, consider wrap it by Arc.


This feature is work. But does not have high performance. If you need not to use Sync, please use unsync_cloneable function. That is fast enough.


use ex_futures::StreamExt;

let (tx, rx) = ::futures::sync::mpsc::channel::<usize>(42);

let cloneable_rx = rx.cloneable(); // Convert "rx" into cloneable.
let cloneable_rx2 = cloneable_rx.clone(); // Now you can clone it.

Convert any kind of stream into "cloneable" stream but unsync. If your stream emits non Clone item or error, consider wrap it by Rc.

Each cloneable stream has its own queue. And each item of original stream is cloned and queued to there.


use ex_futures::StreamExt;

let (tx, rx) = ::futures::sync::mpsc::channel::<usize>(42);

let cloneable_rx = rx.unsync_cloneable(); // Convert "rx" into cloneable.
let cloneable_rx2 = cloneable_rx.clone(); // Now you can clone it.


The value being returned by this function is not Sync. We will provide Sync version later.

Fork any kind of stream into two stream like that the river branches. The closure being passed this function is called "router". Each item of original stream is passed to branch following to "router" decision. "Router" can return not only Side which is Left or Right but also bool (true is considered as Left).


use ex_futures::StreamExt;

let (tx, rx) = ::futures::sync::mpsc::channel::<usize>(42);

let (even, odd) = rx.fork(|i| i % 2 == 0);

Fork any kind of stream into two "unsync" stream. The closure being passed this function is called "router". Each item of original stream is passed to branch following to "router" decision. "Router" can return not only Side which is Left or Right but also bool (true is considered as Left).


use ex_futures::StreamExt;

let (tx, rx) = ::futures::sync::mpsc::channel::<usize>(42);

let (even, odd) = rx.unsync_fork(|i| i % 2 == 0);


The value being returned by this function is not Sync. We will provide Sync version later.

Converts Error association type which is () into any kind of type you want.


use ex_futures::StreamExt;

let (tx, rx) = ::futures::sync::mpsc::channel::<usize>(42);

struct MyError();

let rx = rx.as_err::<MyError>(); // Accomplished by this function.

Returns Future which will be completed when find first item you want.


use ex_futures::StreamExt;
use futures::{Sink, Future};

let (tx, rx) = ::futures::unsync::mpsc::channel::<usize>(42);


let fut = rx.find_first(|i| *i == 42);
let (the_Answer_to_the_Ultimate_Question_of_Life_the_Universe_and_Everything, rx) = fut.wait().unwrap();

Similar function to StreamExt::find_first. The only deference is that this function "maps" the result.


use ex_futures::StreamExt;
use futures::{Sink, Future};

let (tx, rx) = ::futures::unsync::mpsc::channel::<usize>(42);


let first_odd_half_fut = rx.find_first_map(|i| if i % 2 == 0 { Some(i / 2) } else { None });
let (first_odd_half, continue_rx) = first_odd_half_fut.wait().unwrap();
