tokio_zmq/async_types/
stream.rs

1/*
2 * This file is part of Tokio ZMQ.
3 *
4 * Copyright © 2018 Riley Trautman
5 *
6 * Tokio ZMQ is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Tokio ZMQ is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Tokio ZMQ.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20use std::{
21    fmt,
22    marker::PhantomData,
23    time::{Duration, Instant},
24};
25
26use async_zmq_types::{IntoSocket, Multipart};
27use futures::{future::Either, Async, Future, Stream};
28use tokio_timer::Delay;
29
30pub use async_zmq_types::{ControlledStream, EndingStream};
31
32use crate::{
33    async_types::stream_type::StreamType,
34    error::Error,
35    socket::Socket,
36};
37
38/// The `MultipartStream` Sink handles receiving streams of data from ZeroMQ Sockets.
39///
40/// You shouldn't ever need to manually create one. Here's how to get one from a 'raw' `Socket`'
41/// type.
42///
43/// ### Example
44/// ```rust
45/// extern crate zmq;
46/// extern crate futures;
47/// extern crate tokio_zmq;
48///
49/// use std::sync::Arc;
50///
51/// use futures::{Future, Stream};
52/// use tokio_zmq::{async_types::MultipartStream, prelude::*, Error, Multipart, Socket, Sub};
53///
54/// fn main() {
55///     let context = Arc::new(zmq::Context::new());
56///     let fut = Sub::builder(context)
57///         .connect("tcp://localhost:5568")
58///         .filter(b"")
59///         .build()
60///         .and_then(|sub| {
61///             sub.stream()
62///                 .and_then(|multipart| {
63///                     // handle multipart
64///                     Ok(multipart)
65///                 })
66///                 .for_each(|_| Ok(()))
67///         });
68/// }
69/// ```
70pub struct MultipartStream<T>
71where
72    T: From<Socket>,
73{
74    sock: Socket,
75    inner: StreamType,
76    phantom: PhantomData<T>,
77}
78
79impl<T> MultipartStream<T>
80where
81    T: From<Socket>,
82{
83    pub fn new(sock: Socket) -> Self {
84        MultipartStream {
85            sock,
86            inner: StreamType::new(),
87            phantom: PhantomData,
88        }
89    }
90}
91
92impl<T> IntoSocket<T, Socket> for MultipartStream<T>
93where
94    T: From<Socket>,
95{
96    fn into_socket(self) -> T {
97        T::from(self.sock)
98    }
99}
100
101impl<T> Stream for MultipartStream<T>
102where
103    T: From<Socket>,
104{
105    type Item = Multipart;
106    type Error = Error;
107
108    fn poll(&mut self) -> Result<Async<Option<Multipart>>, Self::Error> {
109        self.inner.poll(&self.sock, None)
110    }
111}
112
113impl<T> fmt::Debug for MultipartStream<T>
114where
115    T: From<Socket>,
116{
117    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118        write!(f, "MultipartStream")
119    }
120}
121
122impl<T> fmt::Display for MultipartStream<T>
123where
124    T: From<Socket>,
125{
126    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127        write!(f, "MultipartStream")
128    }
129}
130
131/// An empty type to represent a timeout event
132pub struct Timeout;
133
134/// A stream that provides either an `Item` or a `Timeout`
135///
136/// This is different from `tokio_timer::TimeoutStream<T>`, since that stream errors on timeout.
137pub struct TimeoutStream<S>
138where
139    S: Stream,
140{
141    stream: S,
142    duration: Duration,
143    timeout: Delay,
144}
145
146impl<S> TimeoutStream<S>
147where
148    S: Stream<Error = Error>,
149{
150    /// Add a timeout to a stream
151    pub fn new(stream: S, duration: Duration) -> Self {
152        let timeout = Delay::new(Instant::now() + duration);
153
154        TimeoutStream {
155            stream,
156            duration,
157            timeout,
158        }
159    }
160}
161
162impl<S> Stream for TimeoutStream<S>
163where
164    S: Stream<Error = Error>,
165{
166    type Item = Either<S::Item, Timeout>;
167    type Error = Error;
168
169    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
170        if let Async::Ready(_) = self.timeout.poll()? {
171            self.timeout = Delay::new(Instant::now() + self.duration);
172
173            return Ok(Async::Ready(Some(Either::B(Timeout))));
174        }
175
176        let res = match self.stream.poll()? {
177            Async::Ready(Some(item)) => Async::Ready(Some(Either::A(item))),
178            Async::Ready(None) => Async::Ready(None),
179            Async::NotReady => Async::NotReady,
180        };
181
182        Ok(res)
183    }
184}