tokio_zmq/
prelude.rs

1/*
2 * This file is part of Tokio ZMQ.
3 *
4 * Copyright © 2018 Riley Trautman
5 *
6 * Tokio ZMQ is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Tokio ZMQ is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Tokio ZMQ.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20//! Provide useful types and traits for working with Tokio ZMQ.
21
22use std::time::Duration;
23
24use futures::{Future, Stream};
25
26pub use async_zmq_types::{
27    ControlHandler, Controllable, EndHandler, HasBuilder, IntoInnerSocket, SinkSocket,
28    SinkStreamSocket, StreamSocket, WithEndHandler,
29};
30
31use crate::{async_types::TimeoutStream, error::Error};
32
33/* ----------------------------------TYPES----------------------------------- */
34
35/* ----------------------------------TRAITS---------------------------------- */
36
37/// This trait allows adding a timeout to any stream with Error = Error.
38pub trait WithTimeout: Stream<Error = Error> + Sized {
39    /// Add a timeout to a given stream.
40    ///
41    /// ### Example, using a Pull wrapper type
42    /// ```rust
43    /// extern crate futures;
44    /// extern crate tokio_zmq;
45    /// extern crate zmq;
46    ///
47    /// use std::{sync::Arc, time::Duration};
48    ///
49    /// use futures::{Future, Stream};
50    /// use tokio_zmq::{prelude::*, Socket, Pull, Multipart};
51    ///
52    /// fn main() {
53    ///     let ctx = Arc::new(zmq::Context::new());
54    ///     let fut = Pull::builder(ctx)
55    ///         .bind("tcp://*:5574")
56    ///         .build()
57    ///         .and_then(|pull| {
58    ///             // Receive a Timeout after 30 seconds if the stream hasn't produced a value
59    ///             pull.stream()
60    ///                 .timeout(Duration::from_secs(30))
61    ///                 .for_each(|_| Ok(()))
62    ///         });
63    ///
64    ///     // tokio::run(fut.map(|_| ()).or_else(|e| {
65    ///     //     println!("Error: {}", e);
66    ///     //     Ok(())
67    ///     // }));
68    /// }
69    /// ```
70    fn timeout(self, duration: Duration) -> TimeoutStream<Self>;
71}
72
73pub trait Build<T>: Sized {
74    fn build(self) -> Box<dyn Future<Item = T, Error = Error> + Send>;
75}
76
77/* ----------------------------------impls----------------------------------- */
78
79impl<T> WithTimeout for T
80where
81    T: Stream<Error = Error>,
82{
83    fn timeout(self, duration: Duration) -> TimeoutStream<Self> {
84        TimeoutStream::new(self, duration)
85    }
86}