[−][src]Trait futures_zmq::prelude::Controllable
This trait is implemented by all Streams with Item = Multipart and Error = Error, it provides the ability to control when the stream stops based on the content of another stream.
Required methods
fn controlled<H, S>(
self,
control_stream: S,
handler: H
) -> ControlledStream<H, S, Self, Self::Error> where
H: ControlHandler,
S: Stream<Item = Multipart>,
Self: Stream<Item = Multipart, Error = <S as Stream>::Error>,
self,
control_stream: S,
handler: H
) -> ControlledStream<H, S, Self, Self::Error> where
H: ControlHandler,
S: Stream<Item = Multipart>,
Self: Stream<Item = Multipart, Error = <S as Stream>::Error>,
Add a controller stream to a given stream. This allows the controller stream to decide when the controlled stream should stop.
Example, using a controlled Pull wrapper type and a controller Sub wrapper type
extern crate futures; extern crate tokio_zmq; extern crate zmq; use std::sync::Arc; use futures::{Future, Stream}; use tokio_zmq::{prelude::*, Pull, Sub, Multipart}; struct End; impl ControlHandler for End { fn should_stop(&mut self, _: Multipart) -> bool { true } } fn main() { let ctx = Arc::new(zmq::Context::new()); let init_pull = Pull::builder(Arc::clone(&ctx)) .bind("tcp://*:5572") .build(); let init_sub = Sub::builder(ctx) .bind("tcp://*:5573") .filter(b"") .build(); let fut = init_pull .join(init_sub) .and_then(|(pull, sub)| { pull.stream() .controlled(sub.stream(), End) .for_each(|_| Ok(())) }); // tokio::run(fut.map(|_| ()).or_else(|e| { // println!("Error: {}", e); // Ok(()) // })); }
Implementors
impl<T> Controllable for T where
T: Stream<Item = Multipart>,
[src]
T: Stream<Item = Multipart>,
fn controlled<H, S>(
self,
control_stream: S,
handler: H
) -> ControlledStream<H, S, T, <T as Stream>::Error> where
H: ControlHandler,
S: Stream<Item = Multipart, Error = <T as Stream>::Error>,
[src]
self,
control_stream: S,
handler: H
) -> ControlledStream<H, S, T, <T as Stream>::Error> where
H: ControlHandler,
S: Stream<Item = Multipart, Error = <T as Stream>::Error>,