Trait stream_cancel::StreamExt
source · pub trait StreamExt: Stream {
fn take_until<U>(self, until: U) -> TakeUntil<Self, U::Future>
where
U: IntoFuture<Item = (), Error = ()>,
Self: Sized,
{ ... }
}
Expand description
This Stream
extension trait provides a take_until
method that terminates the stream once
the given future resolves.
Provided Methods§
sourcefn take_until<U>(self, until: U) -> TakeUntil<Self, U::Future>where
U: IntoFuture<Item = (), Error = ()>,
Self: Sized,
fn take_until<U>(self, until: U) -> TakeUntil<Self, U::Future>where
U: IntoFuture<Item = (), Error = ()>,
Self: Sized,
Take elements from this stream until the given future resolves.
This function will take elements from this stream until the given future resolves. Once it
resolves, the stream will yield None
, and produce no further elements.
If the future produces an error, the stream will be allowed to continue indefinitely.
extern crate tokio;
extern crate futures;
use stream_cancel::StreamExt;
use tokio::prelude::*;
let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
let (tx, rx) = futures::sync::oneshot::channel();
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(
listener
.incoming()
.take_until(rx.map_err(|_| ()))
.map_err(|e| eprintln!("accept failed = {:?}", e))
.for_each(|sock| {
let (reader, writer) = sock.split();
tokio::spawn(
tokio::io::copy(reader, writer)
.map(|amt| println!("wrote {:?} bytes", amt))
.map_err(|err| eprintln!("IO error {:?}", err)),
)
}),
);
// tell the listener to stop accepting new connections
tx.send(()).unwrap();
rt.shutdown_on_idle().wait().unwrap();