Trait async_zmq_types::Controllable
source · pub trait Controllable: Stream<Item = Multipart> + Sized {
fn controlled<H, S>(
self,
control_stream: S,
handler: H
) -> ControlledStream<H, S, Self, Self::Error>
where
H: ControlHandler,
S: Stream<Item = Multipart>,
Self: Stream<Item = Multipart, Error = S::Error>;
}
Expand description
This trait is implemented by all Streams with Item = Multipart and Error = Error, it provides the ability to control when the stream stops based on the content of another stream.
Required Methods
sourcefn controlled<H, S>(
self,
control_stream: S,
handler: H
) -> ControlledStream<H, S, Self, Self::Error>where
H: ControlHandler,
S: Stream<Item = Multipart>,
Self: Stream<Item = Multipart, Error = S::Error>,
fn controlled<H, S>(
self,
control_stream: S,
handler: H
) -> ControlledStream<H, S, Self, Self::Error>where
H: ControlHandler,
S: Stream<Item = Multipart>,
Self: Stream<Item = Multipart, Error = S::Error>,
Add a controller stream to a given stream. This allows the controller stream to decide when the controlled stream should stop.
Example, using a controlled Pull wrapper type and a controller Sub wrapper type
extern crate futures;
extern crate tokio_zmq;
extern crate zmq;
use std::sync::Arc;
use futures::{Future, Stream};
use tokio_zmq::{prelude::*, Pull, Sub, Multipart};
struct End;
impl ControlHandler for End {
fn should_stop(&mut self, _: Multipart) -> bool {
true
}
}
fn main() {
let ctx = Arc::new(zmq::Context::new());
let init_pull = Pull::builder(Arc::clone(&ctx))
.bind("tcp://*:5572")
.build();
let init_sub = Sub::builder(ctx)
.bind("tcp://*:5573")
.filter(b"")
.build();
let fut = init_pull
.join(init_sub)
.and_then(|(pull, sub)| {
pull.stream()
.controlled(sub.stream(), End)
.for_each(|_| Ok(()))
});
// tokio::run(fut.map(|_| ()).or_else(|e| {
// println!("Error: {}", e);
// Ok(())
// }));
}