async_zmq_types/
lib.rs

1/*
2 * This file is part of Async ZMQ Types.
3 *
4 * Copyright © 2019 Riley Trautman
5 *
6 * Async ZMQ Types is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Async ZMQ Types is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Async ZMQ Types.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20//! Provide useful types and traits for working with ZMQ Asynchronously.
21
22use std::sync::Arc;
23
24use futures::{Future, Sink, Stream};
25
26mod config;
27mod message;
28mod stream;
29
30pub use crate::{
31    config::{PairConfig, SockConfig, SocketBuilder, SubConfig},
32    message::Multipart,
33    stream::{ControlledStream, EndingStream},
34};
35
36/* ----------------------------------TYPES----------------------------------- */
37
38/* ----------------------------------TRAITS---------------------------------- */
39
40pub trait IntoSocket<T, U>: Sized
41where
42    T: From<U>,
43{
44    fn into_socket(self) -> T;
45}
46
47/// Define all actions possible on a socket
48///
49/// This should be generic enough to implement over any executor. On Tokio, this might consist of
50/// a Socket with an EventedFd, on Futures, it might just be a Socket.
51pub trait InnerSocket<T>: Sized
52where
53    T: IntoInnerSocket + From<Self>,
54{
55    /// The future that sends a multipart to a ZMQ socket
56    type Request: Future<Item = T>;
57
58    /// The future that receives a multipart from a ZMQ socket
59    type Response: Future<Item = (Multipart, T)>;
60
61    /// A Stream of multiparts received from a ZMQ socket
62    type Stream: Stream<Item = Multipart> + IntoSocket<T, Self>;
63
64    /// A Sink that sends multiparts to a ZMQ socket
65    type Sink: Sink<SinkItem = Multipart> + IntoSocket<T, Self>;
66
67    /// A Sink and Stream that sends and receives multiparts from a ZMQ socket
68    type SinkStream: Stream<Item = Multipart> + Sink<SinkItem = Multipart> + IntoSocket<T, Self>;
69
70    fn send(self, multipart: Multipart) -> Self::Request;
71
72    fn recv(self) -> Self::Response;
73
74    fn stream(self) -> Self::Stream;
75
76    fn sink(self, buffer_size: usize) -> Self::Sink;
77
78    fn sink_stream(self, buffer_size: usize) -> Self::SinkStream;
79}
80
81/// The `IntoInnerSocket` trait is implemented for all wrapper types. This makes implementing other traits a
82/// matter of saying a given type implements them.
83pub trait IntoInnerSocket: Sized
84where
85    Self: From<<Self as IntoInnerSocket>::Socket>,
86{
87    type Socket: InnerSocket<Self>;
88
89    /// Any type implementing `IntoInnerSocket` must have a way of returning an InnerSocket.
90    fn socket(self) -> Self::Socket;
91
92    fn kind() -> zmq::SocketType;
93}
94
95/// The `ControlHandler` trait is used to impose stopping rules for streams that otherwise would
96/// continue to create multiparts.
97pub trait ControlHandler {
98    /// `should_stop` determines whether or not a `ControlledStream` should stop producing values.
99    ///
100    /// It accepts a Multipart as input. This Multipart comes from the ControlledStream's
101    /// associated control MultipartStream. If you want to have a socket that stops based on the
102    /// content of a message it receives, see the `EndHandler` trait.
103    fn should_stop(&mut self, multipart: Multipart) -> bool;
104}
105
106/// The `EndHandler` trait is used to impose stopping rules for streams that otherwise would
107/// continue to create multiparts.
108pub trait EndHandler {
109    /// `should_stop` determines whether or not a `StreamSocket` should stop producing values.
110    ///
111    /// This method should be used if the stop signal sent to a given socket will be in-line with
112    /// the rest of the messages that socket receives. If you want to have a socket controlled by
113    /// another socket, see the `ControlHandler` trait.
114    fn should_stop(&mut self, multipart: &Multipart) -> bool;
115}
116
117/// This trait provides the basic Stream support for ZeroMQ Sockets. It depends on `IntoInnerSocket`, but
118/// provides implementations for `sink` and `recv`.
119pub trait StreamSocket: IntoInnerSocket {
120    /// Receive a single multipart message from the socket.
121    ///
122    /// ### Example, using the Rep wrapper type
123    /// ```rust
124    /// extern crate futures;
125    /// extern crate tokio;
126    /// extern crate tokio_zmq;
127    /// extern crate zmq;
128    ///
129    /// use std::sync::Arc;
130    ///
131    /// use futures::Future;
132    /// use tokio_zmq::{prelude::*, async_types::MultipartStream, Error, Multipart, Rep};
133    ///
134    /// fn main() {
135    ///     let context = Arc::new(zmq::Context::new());
136    ///
137    ///     let fut = Rep::builder(context)
138    ///         .connect("tcp://localhost:5568")
139    ///         .build()
140    ///         .and_then(|rep| {
141    ///             rep.recv().and_then(|(multipart, _)| {
142    ///                 for msg in &multipart {
143    ///                     if let Some(msg) = msg.as_str() {
144    ///                         println!("Message: {}", msg);
145    ///                     }
146    ///                 }
147    ///                 Ok(multipart)
148    ///             })
149    ///         });
150    ///
151    ///     // tokio::run(fut.map(|_| ()).or_else(|e| {
152    ///     //     println!("Error: {}", e);
153    ///     //     Ok(())
154    ///     // }));
155    ///     # let _ = fut;
156    /// }
157    /// ```
158    fn recv(self) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::Response {
159        self.socket().recv()
160    }
161
162    /// Receive a stream of multipart messages from the socket.
163    ///
164    /// ### Example, using a Sub wrapper type
165    /// ```rust
166    /// extern crate zmq;
167    /// extern crate futures;
168    /// extern crate tokio;
169    /// extern crate tokio_zmq;
170    ///
171    /// use std::sync::Arc;
172    ///
173    /// use futures::{Future, Stream};
174    /// use tokio_zmq::{prelude::*, async_types::MultipartStream, Error, Multipart, Sub};
175    ///
176    /// fn main() {
177    ///     let context = Arc::new(zmq::Context::new());
178    ///     let fut = Sub::builder(context)
179    ///         .connect("tcp://localhost:5569")
180    ///         .filter(b"")
181    ///         .build()
182    ///         .and_then(|sub| {
183    ///             sub.stream().for_each(|multipart| {
184    ///                 for msg in multipart {
185    ///                     if let Some(msg) = msg.as_str() {
186    ///                         println!("Message: {}", msg);
187    ///                     }
188    ///                 }
189    ///                 Ok(())
190    ///             })
191    ///         });
192    ///
193    ///     // tokio::run(fut.map(|_| ()).or_else(|e| {
194    ///     //     println!("Error: {}", e);
195    ///     //     Ok(())
196    ///     // }));
197    /// }
198    /// ```
199    fn stream(self) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::Stream {
200        self.socket().stream()
201    }
202}
203
204/// This trait provides the basic Sink support for ZeroMQ Sockets. It depends on `IntoInnerSocket` and
205/// provides the `send` and `sink` methods.
206pub trait SinkSocket: IntoInnerSocket {
207    /// Send a single multipart message to the socket.
208    ///
209    /// ### Example, using a Pub wrapper type
210    /// ```rust
211    /// extern crate zmq;
212    /// extern crate futures;
213    /// extern crate tokio;
214    /// extern crate tokio_zmq;
215    ///
216    /// use std::sync::Arc;
217    ///
218    /// use futures::Future;
219    /// use tokio_zmq::{prelude::*, async_types::MultipartStream, Error, Pub};
220    ///
221    /// fn main() {
222    ///     let context = Arc::new(zmq::Context::new());
223    ///     let msg = zmq::Message::from_slice(b"Hello");
224    ///     let fut = Pub::builder(context)
225    ///         .connect("tcp://localhost:5569")
226    ///         .build()
227    ///         .and_then(|zpub| zpub.send(msg.into()));
228    ///
229    ///     // tokio::run(fut.map(|_| ()).or_else(|e| {
230    ///     //     println!("Error: {}", e);
231    ///     //     Ok(())
232    ///     // }));
233    /// }
234    /// ```
235    fn send(
236        self,
237        multipart: Multipart,
238    ) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::Request {
239        self.socket().send(multipart)
240    }
241
242    /// Send a stream of multipart messages to the socket.
243    ///
244    /// It takes a buffer_size argument, which will determine how many `Multipart`s can be
245    /// submitted into the send queue before the sink applies backpressure.
246    ///
247    /// ### Example, using a Pub wrapper type
248    /// ```rust
249    /// extern crate zmq;
250    /// extern crate futures;
251    /// extern crate tokio;
252    /// extern crate tokio_zmq;
253    ///
254    /// use std::sync::Arc;
255    ///
256    /// use futures::{Future, Stream, stream::iter_ok};
257    /// use tokio_zmq::{prelude::*, async_types::MultipartStream, Error, Multipart, Pub};
258    ///
259    /// fn main() {
260    ///     let context = Arc::new(zmq::Context::new());
261    ///     let fut = Pub::builder(context)
262    ///         .connect("tcp://localhost:5570")
263    ///         .build()
264    ///         .and_then(|zpub| {
265    ///             iter_ok(0..5)
266    ///                 .map(|i| {
267    ///                     zmq::Message::from_slice(format!("i: {}", i).as_bytes()).into()
268    ///                 })
269    ///                 .forward(zpub.sink(25))
270    ///         });
271    ///
272    ///     // tokio::run(fut.map(|_| ()).or_else(|e| {
273    ///     //     println!("Error: {}", e);
274    ///     //     Ok(())
275    ///     // }));
276    /// }
277    /// ```
278    fn sink(
279        self,
280        buffer_size: usize,
281    ) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::Sink {
282        self.socket().sink(buffer_size)
283    }
284}
285
286/// This trait is provided for sockets that implement both Sync and Stream
287pub trait SinkStreamSocket: IntoInnerSocket {
288    /// Retrieve a structure that implements both Sync and Stream.
289    ///
290    /// It takes a buffer_size argument, which will determine how many `Multipart`s can be
291    /// submitted into the send queue before the sink applies backpressure.
292    ///
293    /// ### Example, using a Rep wrapper type
294    /// ```rust
295    /// extern crate futures;
296    /// extern crate tokio_zmq;
297    /// extern crate zmq;
298    ///
299    /// use std::sync::Arc;
300    ///
301    /// use futures::{Future, Stream};
302    /// use tokio_zmq::{prelude::*, Rep};
303    ///
304    /// fn main() {
305    ///     let ctx = Arc::new(zmq::Context::new());
306    ///     let fut = Rep::builder(ctx)
307    ///         .bind("tcp://*:5571")
308    ///         .build()
309    ///         .and_then(|rep| {
310    ///             let (sink, stream) = rep.sink_stream(25).split();
311    ///
312    ///             stream.forward(sink)
313    ///         });
314    ///
315    ///     // tokio::run(fut.map(|_| ()).or_else(|e| {
316    ///     //     println!("Error: {}", e);
317    ///     //     Ok(())
318    ///     // }));
319    /// }
320    /// ```
321    fn sink_stream(
322        self,
323        buffer_size: usize,
324    ) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::SinkStream;
325}
326
327/// This trait is provided to allow for ending a stream based on a Multipart message it receives.
328pub trait WithEndHandler: Stream<Item = Multipart> + Sized {
329    /// Add an EndHandler to a stream.
330    ///
331    /// ### Example, using a Sub wrapper type
332    /// ```rust
333    /// extern crate futures;
334    /// extern crate tokio_zmq;
335    /// extern crate zmq;
336    ///
337    /// use std::sync::Arc;
338    ///
339    /// use futures::{Future, Stream};
340    /// use tokio_zmq::{prelude::*, Sub, Multipart};
341    ///
342    /// struct End(u32);
343    ///
344    /// impl EndHandler for End {
345    ///     fn should_stop(&mut self, multipart: &Multipart) -> bool {
346    ///         self.0 += 1;
347    ///
348    ///         self.0 > 30
349    ///     }
350    /// }
351    ///
352    /// fn main() {
353    ///     let ctx = Arc::new(zmq::Context::new());
354    ///     let fut = Sub::builder(ctx)
355    ///         .bind("tcp://*:5571")
356    ///         .filter(b"")
357    ///         .build()
358    ///         .and_then(|sub| {
359    ///             sub.stream()
360    ///                 .with_end_handler(End(0))
361    ///                 .for_each(|_| Ok(()))
362    ///         });
363    ///
364    ///     // tokio::run(fut.map(|_| ()).or_else(|e| {
365    ///     //     println!("Error: {}", e);
366    ///     //     Ok(())
367    ///     // }));
368    /// }
369    /// ```
370    fn with_end_handler<E>(self, end_handler: E) -> EndingStream<E, Self, Self::Error>
371    where
372        E: EndHandler;
373}
374
375/// This trait is implemented by all Streams with Item = Multipart and Error = Error, it provides
376/// the ability to control when the stream stops based on the content of another stream.
377pub trait Controllable: Stream<Item = Multipart> + Sized {
378    /// Add a controller stream to a given stream. This allows the controller stream to decide when
379    /// the controlled stream should stop.
380    ///
381    /// ### Example, using a controlled Pull wrapper type and a controller Sub wrapper type
382    /// ```rust
383    /// extern crate futures;
384    /// extern crate tokio_zmq;
385    /// extern crate zmq;
386    ///
387    /// use std::sync::Arc;
388    ///
389    /// use futures::{Future, Stream};
390    /// use tokio_zmq::{prelude::*, Pull, Sub, Multipart};
391    ///
392    /// struct End;
393    ///
394    /// impl ControlHandler for End {
395    ///     fn should_stop(&mut self, _: Multipart) -> bool {
396    ///         true
397    ///     }
398    /// }
399    ///
400    /// fn main() {
401    ///     let ctx = Arc::new(zmq::Context::new());
402    ///     let init_pull = Pull::builder(Arc::clone(&ctx))
403    ///         .bind("tcp://*:5572")
404    ///         .build();
405    ///
406    ///     let init_sub = Sub::builder(ctx)
407    ///         .bind("tcp://*:5573")
408    ///         .filter(b"")
409    ///         .build();
410    ///
411    ///     let fut = init_pull
412    ///         .join(init_sub)
413    ///         .and_then(|(pull, sub)| {
414    ///             pull.stream()
415    ///                 .controlled(sub.stream(), End)
416    ///                 .for_each(|_| Ok(()))
417    ///         });
418    ///
419    ///     // tokio::run(fut.map(|_| ()).or_else(|e| {
420    ///     //     println!("Error: {}", e);
421    ///     //     Ok(())
422    ///     // }));
423    /// }
424    /// ```
425    fn controlled<H, S>(
426        self,
427        control_stream: S,
428        handler: H,
429    ) -> ControlledStream<H, S, Self, Self::Error>
430    where
431        H: ControlHandler,
432        S: Stream<Item = Multipart>,
433        Self: Stream<Item = Multipart, Error = S::Error>;
434}
435
436pub trait UnPair {}
437pub trait Pair {}
438pub trait Sub {}
439pub trait UnSub {}
440
441pub trait Build<T, E> {
442    type Result: Future<Item = T, Error = E>;
443
444    fn build(self) -> Self::Result;
445}
446
447/// This trait is implemented by all socket types to allow custom builders to be created
448pub trait HasBuilder: IntoInnerSocket {
449    fn builder(ctx: Arc<zmq::Context>) -> SocketBuilder<'static, Self>
450    where
451        Self: Sized,
452    {
453        SocketBuilder::new(ctx)
454    }
455}
456
457/* ----------------------------------impls----------------------------------- */
458
459impl<T> HasBuilder for T where T: IntoInnerSocket {}
460
461impl<T> SinkStreamSocket for T
462where
463    T: StreamSocket + SinkSocket,
464{
465    fn sink_stream(
466        self,
467        buffer_size: usize,
468    ) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::SinkStream {
469        self.socket().sink_stream(buffer_size)
470    }
471}
472
473impl<T> WithEndHandler for T
474where
475    T: Stream<Item = Multipart>,
476{
477    fn with_end_handler<E>(self, end_handler: E) -> EndingStream<E, Self, Self::Error>
478    where
479        E: EndHandler,
480    {
481        EndingStream::new(self, end_handler)
482    }
483}
484
485impl<T> Controllable for T
486where
487    T: Stream<Item = Multipart>,
488{
489    fn controlled<H, S>(
490        self,
491        control_stream: S,
492        handler: H,
493    ) -> ControlledStream<H, S, Self, Self::Error>
494    where
495        H: ControlHandler,
496        S: Stream<Item = Multipart, Error = T::Error>,
497    {
498        ControlledStream::new(self, control_stream, handler)
499    }
500}