tokio_zmq/async_types/
future.rs

1/*
2 * This file is part of Tokio ZMQ.
3 *
4 * Copyright © 2018 Riley Trautman
5 *
6 * Tokio ZMQ is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Tokio ZMQ is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Tokio ZMQ.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20//! This module contains definitions for `MultipartRequest` and `MultipartResponse`, the two types that
21//! implement `futures::Future`.
22
23use std::{fmt, marker::PhantomData};
24
25use async_zmq_types::Multipart;
26use futures::{Async, Future};
27
28use crate::{
29    async_types::{
30        future_types::{request, response},
31    },
32    error::Error,
33    socket::Socket,
34};
35
36/// The `MultipartRequest` Future handles asynchronously sending data to a socket.
37///
38/// ### Example
39/// ```rust
40/// # extern crate zmq;
41/// # extern crate futures;
42/// # extern crate tokio_zmq;
43/// #
44/// # use std::sync::Arc;
45/// #
46/// # use futures::Future;
47/// # use tokio_zmq::{prelude::*, async_types::MultipartRequest, Error, Rep};
48/// #
49/// # fn main() {
50/// #     get_sock();
51/// # }
52/// # fn get_sock() -> impl Future<Item = (), Error = Error> {
53/// #     let ctx = Arc::new(zmq::Context::new());
54/// #     let rep = Rep::builder(ctx)
55/// #         .bind("tcp://*:5567")
56/// #         .build();
57/// #
58/// #     rep.and_then(|rep| {
59/// #       let msg = zmq::Message::from(&format!("Hey"));
60/// MultipartRequest::new(rep.socket(), msg.into()).and_then(|_: Rep| {
61///     // succesfull request
62/// #       Ok(())
63/// })
64/// # })
65/// # }
66/// ```
67pub struct MultipartRequest<T>
68where
69    T: From<Socket>,
70{
71    socks: Option<Socket>,
72    multipart: Multipart,
73    phantom: PhantomData<T>,
74}
75
76impl<T> MultipartRequest<T>
77where
78    T: From<Socket>,
79{
80    pub fn new(sock: Socket, multipart: Multipart) -> Self {
81        MultipartRequest {
82            socks: Some(sock),
83            multipart: multipart,
84            phantom: PhantomData,
85        }
86    }
87}
88
89impl<T> Future for MultipartRequest<T>
90where
91    T: From<Socket>,
92{
93    type Item = T;
94    type Error = Error;
95
96    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
97        let sock = self.socks.take().ok_or(Error::Reused)?;
98
99        match request::poll(&sock, &mut self.multipart, None)? {
100            Async::Ready(()) => Ok(Async::Ready(sock.into())),
101            Async::NotReady => {
102                self.socks = Some(sock);
103
104                Ok(Async::NotReady)
105            }
106        }
107    }
108}
109
110impl<T> fmt::Debug for MultipartRequest<T>
111where
112    T: From<Socket>,
113{
114    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
115        write!(f, "SendFuture")
116    }
117}
118
119impl<T> fmt::Display for MultipartRequest<T>
120where
121    T: From<Socket>,
122{
123    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124        write!(f, "SendFuture")
125    }
126}
127
128/// The `MultipartResponse` Future handles asynchronously getting data from a socket.
129///
130/// ### Example
131/// ```rust
132/// # extern crate zmq;
133/// # extern crate futures;
134/// # extern crate tokio_zmq;
135/// #
136/// # use std::sync::Arc;
137/// #
138/// # use futures::Future;
139/// # use tokio_zmq::{prelude::*, async_types::MultipartResponse, Error, Multipart, Rep};
140/// #
141/// # fn main() {
142/// #     get_sock();
143/// # }
144/// # fn get_sock() -> impl Future<Item = Multipart, Error = Error> {
145/// #     let ctx = Arc::new(zmq::Context::new());
146/// #     let rep = Rep::builder(ctx)
147/// #         .bind("tcp://*:5567")
148/// #         .build();
149/// #     rep.and_then(|rep| {
150/// MultipartResponse::new(rep.socket()).and_then(|(multipart, _): (_, Rep)| {
151///     // handle multipart response
152///     # Ok(multipart)
153/// })
154/// # })
155/// # }
156/// ```
157pub struct MultipartResponse<T>
158where
159    T: From<Socket>,
160{
161    socks: Option<Socket>,
162    multipart: Multipart,
163    phantom: PhantomData<T>,
164}
165
166impl<T> MultipartResponse<T>
167where
168    T: From<Socket>,
169{
170    pub fn new(sock: Socket) -> Self {
171        MultipartResponse {
172            socks: Some(sock),
173            multipart: Multipart::new(),
174            phantom: PhantomData,
175        }
176    }
177}
178
179impl<T> Future for MultipartResponse<T>
180where
181    T: From<Socket>,
182{
183    type Item = (Multipart, T);
184    type Error = Error;
185
186    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
187        let sock = self.socks.take().ok_or(Error::Reused)?;
188
189        match response::poll(&sock, &mut self.multipart, None)? {
190            Async::Ready(multipart) => Ok(Async::Ready((
191                multipart,
192                sock.into(),
193            ))),
194            Async::NotReady => {
195                self.socks = Some(sock);
196
197                Ok(Async::NotReady)
198            }
199        }
200    }
201}
202
203impl<T> fmt::Debug for MultipartResponse<T>
204where
205    T: From<Socket>,
206{
207    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
208        write!(f, "RecvFuture")
209    }
210}
211
212impl<T> fmt::Display for MultipartResponse<T>
213where
214    T: From<Socket>,
215{
216    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
217        write!(f, "RecvFuture")
218    }
219}