tokio_zmq/prelude.rs
1/*
2 * This file is part of Tokio ZMQ.
3 *
4 * Copyright © 2018 Riley Trautman
5 *
6 * Tokio ZMQ is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Tokio ZMQ is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Tokio ZMQ. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20//! Provide useful types and traits for working with Tokio ZMQ.
21
22use std::time::Duration;
23
24use futures::{Future, Stream};
25
26pub use async_zmq_types::{
27 ControlHandler, Controllable, EndHandler, HasBuilder, IntoInnerSocket, SinkSocket,
28 SinkStreamSocket, StreamSocket, WithEndHandler,
29};
30
31use crate::{async_types::TimeoutStream, error::Error};
32
33/* ----------------------------------TYPES----------------------------------- */
34
35/* ----------------------------------TRAITS---------------------------------- */
36
37/// This trait allows adding a timeout to any stream with Error = Error.
38pub trait WithTimeout: Stream<Error = Error> + Sized {
39 /// Add a timeout to a given stream.
40 ///
41 /// ### Example, using a Pull wrapper type
42 /// ```rust
43 /// extern crate futures;
44 /// extern crate tokio_zmq;
45 /// extern crate zmq;
46 ///
47 /// use std::{sync::Arc, time::Duration};
48 ///
49 /// use futures::{Future, Stream};
50 /// use tokio_zmq::{prelude::*, Socket, Pull, Multipart};
51 ///
52 /// fn main() {
53 /// let ctx = Arc::new(zmq::Context::new());
54 /// let fut = Pull::builder(ctx)
55 /// .bind("tcp://*:5574")
56 /// .build()
57 /// .and_then(|pull| {
58 /// // Receive a Timeout after 30 seconds if the stream hasn't produced a value
59 /// pull.stream()
60 /// .timeout(Duration::from_secs(30))
61 /// .for_each(|_| Ok(()))
62 /// });
63 ///
64 /// // tokio::run(fut.map(|_| ()).or_else(|e| {
65 /// // println!("Error: {}", e);
66 /// // Ok(())
67 /// // }));
68 /// }
69 /// ```
70 fn timeout(self, duration: Duration) -> TimeoutStream<Self>;
71}
72
73pub trait Build<T>: Sized {
74 fn build(self) -> Box<dyn Future<Item = T, Error = Error> + Send>;
75}
76
77/* ----------------------------------impls----------------------------------- */
78
79impl<T> WithTimeout for T
80where
81 T: Stream<Error = Error>,
82{
83 fn timeout(self, duration: Duration) -> TimeoutStream<Self> {
84 TimeoutStream::new(self, duration)
85 }
86}