tokio_zmq/async_types/future.rs
1/*
2 * This file is part of Tokio ZMQ.
3 *
4 * Copyright © 2018 Riley Trautman
5 *
6 * Tokio ZMQ is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Tokio ZMQ is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Tokio ZMQ. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20//! This module contains definitions for `MultipartRequest` and `MultipartResponse`, the two types that
21//! implement `futures::Future`.
22
23use std::{fmt, marker::PhantomData};
24
25use async_zmq_types::Multipart;
26use futures::{Async, Future};
27
28use crate::{
29 async_types::{
30 future_types::{request, response},
31 },
32 error::Error,
33 socket::Socket,
34};
35
36/// The `MultipartRequest` Future handles asynchronously sending data to a socket.
37///
38/// ### Example
39/// ```rust
40/// # extern crate zmq;
41/// # extern crate futures;
42/// # extern crate tokio_zmq;
43/// #
44/// # use std::sync::Arc;
45/// #
46/// # use futures::Future;
47/// # use tokio_zmq::{prelude::*, async_types::MultipartRequest, Error, Rep};
48/// #
49/// # fn main() {
50/// # get_sock();
51/// # }
52/// # fn get_sock() -> impl Future<Item = (), Error = Error> {
53/// # let ctx = Arc::new(zmq::Context::new());
54/// # let rep = Rep::builder(ctx)
55/// # .bind("tcp://*:5567")
56/// # .build();
57/// #
58/// # rep.and_then(|rep| {
59/// # let msg = zmq::Message::from(&format!("Hey"));
60/// MultipartRequest::new(rep.socket(), msg.into()).and_then(|_: Rep| {
61/// // succesfull request
62/// # Ok(())
63/// })
64/// # })
65/// # }
66/// ```
67pub struct MultipartRequest<T>
68where
69 T: From<Socket>,
70{
71 socks: Option<Socket>,
72 multipart: Multipart,
73 phantom: PhantomData<T>,
74}
75
76impl<T> MultipartRequest<T>
77where
78 T: From<Socket>,
79{
80 pub fn new(sock: Socket, multipart: Multipart) -> Self {
81 MultipartRequest {
82 socks: Some(sock),
83 multipart: multipart,
84 phantom: PhantomData,
85 }
86 }
87}
88
89impl<T> Future for MultipartRequest<T>
90where
91 T: From<Socket>,
92{
93 type Item = T;
94 type Error = Error;
95
96 fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
97 let sock = self.socks.take().ok_or(Error::Reused)?;
98
99 match request::poll(&sock, &mut self.multipart, None)? {
100 Async::Ready(()) => Ok(Async::Ready(sock.into())),
101 Async::NotReady => {
102 self.socks = Some(sock);
103
104 Ok(Async::NotReady)
105 }
106 }
107 }
108}
109
110impl<T> fmt::Debug for MultipartRequest<T>
111where
112 T: From<Socket>,
113{
114 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
115 write!(f, "SendFuture")
116 }
117}
118
119impl<T> fmt::Display for MultipartRequest<T>
120where
121 T: From<Socket>,
122{
123 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124 write!(f, "SendFuture")
125 }
126}
127
128/// The `MultipartResponse` Future handles asynchronously getting data from a socket.
129///
130/// ### Example
131/// ```rust
132/// # extern crate zmq;
133/// # extern crate futures;
134/// # extern crate tokio_zmq;
135/// #
136/// # use std::sync::Arc;
137/// #
138/// # use futures::Future;
139/// # use tokio_zmq::{prelude::*, async_types::MultipartResponse, Error, Multipart, Rep};
140/// #
141/// # fn main() {
142/// # get_sock();
143/// # }
144/// # fn get_sock() -> impl Future<Item = Multipart, Error = Error> {
145/// # let ctx = Arc::new(zmq::Context::new());
146/// # let rep = Rep::builder(ctx)
147/// # .bind("tcp://*:5567")
148/// # .build();
149/// # rep.and_then(|rep| {
150/// MultipartResponse::new(rep.socket()).and_then(|(multipart, _): (_, Rep)| {
151/// // handle multipart response
152/// # Ok(multipart)
153/// })
154/// # })
155/// # }
156/// ```
157pub struct MultipartResponse<T>
158where
159 T: From<Socket>,
160{
161 socks: Option<Socket>,
162 multipart: Multipart,
163 phantom: PhantomData<T>,
164}
165
166impl<T> MultipartResponse<T>
167where
168 T: From<Socket>,
169{
170 pub fn new(sock: Socket) -> Self {
171 MultipartResponse {
172 socks: Some(sock),
173 multipart: Multipart::new(),
174 phantom: PhantomData,
175 }
176 }
177}
178
179impl<T> Future for MultipartResponse<T>
180where
181 T: From<Socket>,
182{
183 type Item = (Multipart, T);
184 type Error = Error;
185
186 fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
187 let sock = self.socks.take().ok_or(Error::Reused)?;
188
189 match response::poll(&sock, &mut self.multipart, None)? {
190 Async::Ready(multipart) => Ok(Async::Ready((
191 multipart,
192 sock.into(),
193 ))),
194 Async::NotReady => {
195 self.socks = Some(sock);
196
197 Ok(Async::NotReady)
198 }
199 }
200 }
201}
202
203impl<T> fmt::Debug for MultipartResponse<T>
204where
205 T: From<Socket>,
206{
207 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
208 write!(f, "RecvFuture")
209 }
210}
211
212impl<T> fmt::Display for MultipartResponse<T>
213where
214 T: From<Socket>,
215{
216 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
217 write!(f, "RecvFuture")
218 }
219}