tokio_zmq/async_types/
stream.rs1use std::{
21 fmt,
22 marker::PhantomData,
23 time::{Duration, Instant},
24};
25
26use async_zmq_types::{IntoSocket, Multipart};
27use futures::{future::Either, Async, Future, Stream};
28use tokio_timer::Delay;
29
30pub use async_zmq_types::{ControlledStream, EndingStream};
31
32use crate::{
33 async_types::stream_type::StreamType,
34 error::Error,
35 socket::Socket,
36};
37
38pub struct MultipartStream<T>
71where
72 T: From<Socket>,
73{
74 sock: Socket,
75 inner: StreamType,
76 phantom: PhantomData<T>,
77}
78
79impl<T> MultipartStream<T>
80where
81 T: From<Socket>,
82{
83 pub fn new(sock: Socket) -> Self {
84 MultipartStream {
85 sock,
86 inner: StreamType::new(),
87 phantom: PhantomData,
88 }
89 }
90}
91
92impl<T> IntoSocket<T, Socket> for MultipartStream<T>
93where
94 T: From<Socket>,
95{
96 fn into_socket(self) -> T {
97 T::from(self.sock)
98 }
99}
100
101impl<T> Stream for MultipartStream<T>
102where
103 T: From<Socket>,
104{
105 type Item = Multipart;
106 type Error = Error;
107
108 fn poll(&mut self) -> Result<Async<Option<Multipart>>, Self::Error> {
109 self.inner.poll(&self.sock, None)
110 }
111}
112
113impl<T> fmt::Debug for MultipartStream<T>
114where
115 T: From<Socket>,
116{
117 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118 write!(f, "MultipartStream")
119 }
120}
121
122impl<T> fmt::Display for MultipartStream<T>
123where
124 T: From<Socket>,
125{
126 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127 write!(f, "MultipartStream")
128 }
129}
130
131pub struct Timeout;
133
134pub struct TimeoutStream<S>
138where
139 S: Stream,
140{
141 stream: S,
142 duration: Duration,
143 timeout: Delay,
144}
145
146impl<S> TimeoutStream<S>
147where
148 S: Stream<Error = Error>,
149{
150 pub fn new(stream: S, duration: Duration) -> Self {
152 let timeout = Delay::new(Instant::now() + duration);
153
154 TimeoutStream {
155 stream,
156 duration,
157 timeout,
158 }
159 }
160}
161
162impl<S> Stream for TimeoutStream<S>
163where
164 S: Stream<Error = Error>,
165{
166 type Item = Either<S::Item, Timeout>;
167 type Error = Error;
168
169 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
170 if let Async::Ready(_) = self.timeout.poll()? {
171 self.timeout = Delay::new(Instant::now() + self.duration);
172
173 return Ok(Async::Ready(Some(Either::B(Timeout))));
174 }
175
176 let res = match self.stream.poll()? {
177 Async::Ready(Some(item)) => Async::Ready(Some(Either::A(item))),
178 Async::Ready(None) => Async::Ready(None),
179 Async::NotReady => Async::NotReady,
180 };
181
182 Ok(res)
183 }
184}