Crate stream_cancel
source ·Expand description
This crate provides multiple mechanisms for interrupting a Stream
.
Stream combinator
The extension trait StreamExt
provides a single new Stream
combinator: take_until
.
StreamExt::take_until
continues yielding elements from the underlying Stream
until a
Future
resolves, and at that moment immediately yields None
and stops producing further
elements.
For convenience, the crate also includes the Tripwire
type, which produces a cloneable
Future
that can then be passed to take_until
. When a new Tripwire
is created, an
associated Trigger
is also returned, which interrupts the Stream
when it is dropped.
extern crate tokio;
use stream_cancel::{StreamExt, Tripwire};
use tokio::prelude::*;
let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
let (trigger, tripwire) = Tripwire::new();
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(
listener
.incoming()
.take_until(tripwire)
.map_err(|e| eprintln!("accept failed = {:?}", e))
.for_each(|sock| {
let (reader, writer) = sock.split();
tokio::spawn(
tokio::io::copy(reader, writer)
.map(|amt| println!("wrote {:?} bytes", amt))
.map_err(|err| eprintln!("IO error {:?}", err)),
)
}),
);
// tell the listener to stop accepting new connections
drop(trigger);
rt.shutdown_on_idle().wait().unwrap();
Stream wrapper
Any stream can be wrapped in a Valved
, which enables it to be remotely terminated through
an associated Trigger
. This can be useful to implement graceful shutdown on “infinite”
streams like a TcpListener
. Once [Trigger::close
] is called on the handle for a given
stream’s Valved
, the stream will yield None
to indicate that it has terminated.
extern crate tokio;
use stream_cancel::Valved;
use tokio::prelude::*;
use std::thread;
let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
let (exit, incoming) = Valved::new(listener.incoming());
let server = thread::spawn(move || {
// start a tokio echo server
tokio::run(
incoming
.map_err(|e| eprintln!("accept failed = {:?}", e))
.for_each(|sock| {
let (reader, writer) = sock.split();
tokio::spawn(
tokio::io::copy(reader, writer)
.map(|amt| println!("wrote {:?} bytes", amt))
.map_err(|err| eprintln!("IO error {:?}", err)),
)
}),
)
});
// the server thread will normally never exit, since more connections
// can always arrive. however, with a Valved, we can turn off the
// stream of incoming connections to initiate a graceful shutdown
drop(exit);
server.join().unwrap();
You can share the same Trigger
between multiple streams by first creating a Valve
,
and then wrapping multiple streams using [Valve::Wrap
]:
extern crate tokio;
use stream_cancel::Valve;
use tokio::prelude::*;
let (exit, valve) = Valve::new();
let listener1 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
let listener2 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
let incoming1 = valve.wrap(listener1.incoming());
let incoming2 = valve.wrap(listener2.incoming());
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(
incoming1
.select(incoming2)
.map_err(|e| eprintln!("accept failed = {:?}", e))
.for_each(|sock| {
let (reader, writer) = sock.split();
tokio::spawn(
tokio::io::copy(reader, writer)
.map(|amt| println!("wrote {:?} bytes", amt))
.map_err(|err| eprintln!("IO error {:?}", err)),
)
}),
);
// the runtime will not become idle until both incoming1 and incoming2 have stopped
// (due to the select). this checks that they are indeed both interrupted when the
// valve is closed.
drop(exit);
rt.shutdown_on_idle().wait().unwrap();
Structs
Tripwire
is a convenient mechanism for implementing graceful shutdown over many
asynchronous streams. A Tripwire
is a Future
that is Clone
, and that can be passed to
StreamExt::take_until
. All Tripwire
clones are associated with a single Trigger
,
which is then used to signal that all the associated streams should be terminated.Valve
is associated with a Trigger
, and can be used to wrap one or more
asynchronous streams. All streams wrapped by a given Valve
(or its clones) will be
interrupted when [Trigger::close
] is called on the valve’s associated handle.Valved
is wrapper around a Stream
that enables the stream to be turned off remotely to
initiate a graceful shutdown. When a new Valved
is created with Valved::new
, a handle to
that Valved
is also produced; when [Trigger::close
] is called on that handle, the
wrapped stream will immediately yield None
to indicate that it has completed.Traits
Stream
extension trait provides a take_until
method that terminates the stream once
the given future resolves.