async_zmq_types/lib.rs
1/*
2 * This file is part of Async ZMQ Types.
3 *
4 * Copyright © 2019 Riley Trautman
5 *
6 * Async ZMQ Types is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Async ZMQ Types is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Async ZMQ Types. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20//! Provide useful types and traits for working with ZMQ Asynchronously.
21
22use std::sync::Arc;
23
24use futures::{Future, Sink, Stream};
25
26mod config;
27mod message;
28mod stream;
29
30pub use crate::{
31 config::{PairConfig, SockConfig, SocketBuilder, SubConfig},
32 message::Multipart,
33 stream::{ControlledStream, EndingStream},
34};
35
36/* ----------------------------------TYPES----------------------------------- */
37
38/* ----------------------------------TRAITS---------------------------------- */
39
40pub trait IntoSocket<T, U>: Sized
41where
42 T: From<U>,
43{
44 fn into_socket(self) -> T;
45}
46
47/// Define all actions possible on a socket
48///
49/// This should be generic enough to implement over any executor. On Tokio, this might consist of
50/// a Socket with an EventedFd, on Futures, it might just be a Socket.
51pub trait InnerSocket<T>: Sized
52where
53 T: IntoInnerSocket + From<Self>,
54{
55 /// The future that sends a multipart to a ZMQ socket
56 type Request: Future<Item = T>;
57
58 /// The future that receives a multipart from a ZMQ socket
59 type Response: Future<Item = (Multipart, T)>;
60
61 /// A Stream of multiparts received from a ZMQ socket
62 type Stream: Stream<Item = Multipart> + IntoSocket<T, Self>;
63
64 /// A Sink that sends multiparts to a ZMQ socket
65 type Sink: Sink<SinkItem = Multipart> + IntoSocket<T, Self>;
66
67 /// A Sink and Stream that sends and receives multiparts from a ZMQ socket
68 type SinkStream: Stream<Item = Multipart> + Sink<SinkItem = Multipart> + IntoSocket<T, Self>;
69
70 fn send(self, multipart: Multipart) -> Self::Request;
71
72 fn recv(self) -> Self::Response;
73
74 fn stream(self) -> Self::Stream;
75
76 fn sink(self, buffer_size: usize) -> Self::Sink;
77
78 fn sink_stream(self, buffer_size: usize) -> Self::SinkStream;
79}
80
81/// The `IntoInnerSocket` trait is implemented for all wrapper types. This makes implementing other traits a
82/// matter of saying a given type implements them.
83pub trait IntoInnerSocket: Sized
84where
85 Self: From<<Self as IntoInnerSocket>::Socket>,
86{
87 type Socket: InnerSocket<Self>;
88
89 /// Any type implementing `IntoInnerSocket` must have a way of returning an InnerSocket.
90 fn socket(self) -> Self::Socket;
91
92 fn kind() -> zmq::SocketType;
93}
94
95/// The `ControlHandler` trait is used to impose stopping rules for streams that otherwise would
96/// continue to create multiparts.
97pub trait ControlHandler {
98 /// `should_stop` determines whether or not a `ControlledStream` should stop producing values.
99 ///
100 /// It accepts a Multipart as input. This Multipart comes from the ControlledStream's
101 /// associated control MultipartStream. If you want to have a socket that stops based on the
102 /// content of a message it receives, see the `EndHandler` trait.
103 fn should_stop(&mut self, multipart: Multipart) -> bool;
104}
105
106/// The `EndHandler` trait is used to impose stopping rules for streams that otherwise would
107/// continue to create multiparts.
108pub trait EndHandler {
109 /// `should_stop` determines whether or not a `StreamSocket` should stop producing values.
110 ///
111 /// This method should be used if the stop signal sent to a given socket will be in-line with
112 /// the rest of the messages that socket receives. If you want to have a socket controlled by
113 /// another socket, see the `ControlHandler` trait.
114 fn should_stop(&mut self, multipart: &Multipart) -> bool;
115}
116
117/// This trait provides the basic Stream support for ZeroMQ Sockets. It depends on `IntoInnerSocket`, but
118/// provides implementations for `sink` and `recv`.
119pub trait StreamSocket: IntoInnerSocket {
120 /// Receive a single multipart message from the socket.
121 ///
122 /// ### Example, using the Rep wrapper type
123 /// ```rust
124 /// extern crate futures;
125 /// extern crate tokio;
126 /// extern crate tokio_zmq;
127 /// extern crate zmq;
128 ///
129 /// use std::sync::Arc;
130 ///
131 /// use futures::Future;
132 /// use tokio_zmq::{prelude::*, async_types::MultipartStream, Error, Multipart, Rep};
133 ///
134 /// fn main() {
135 /// let context = Arc::new(zmq::Context::new());
136 ///
137 /// let fut = Rep::builder(context)
138 /// .connect("tcp://localhost:5568")
139 /// .build()
140 /// .and_then(|rep| {
141 /// rep.recv().and_then(|(multipart, _)| {
142 /// for msg in &multipart {
143 /// if let Some(msg) = msg.as_str() {
144 /// println!("Message: {}", msg);
145 /// }
146 /// }
147 /// Ok(multipart)
148 /// })
149 /// });
150 ///
151 /// // tokio::run(fut.map(|_| ()).or_else(|e| {
152 /// // println!("Error: {}", e);
153 /// // Ok(())
154 /// // }));
155 /// # let _ = fut;
156 /// }
157 /// ```
158 fn recv(self) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::Response {
159 self.socket().recv()
160 }
161
162 /// Receive a stream of multipart messages from the socket.
163 ///
164 /// ### Example, using a Sub wrapper type
165 /// ```rust
166 /// extern crate zmq;
167 /// extern crate futures;
168 /// extern crate tokio;
169 /// extern crate tokio_zmq;
170 ///
171 /// use std::sync::Arc;
172 ///
173 /// use futures::{Future, Stream};
174 /// use tokio_zmq::{prelude::*, async_types::MultipartStream, Error, Multipart, Sub};
175 ///
176 /// fn main() {
177 /// let context = Arc::new(zmq::Context::new());
178 /// let fut = Sub::builder(context)
179 /// .connect("tcp://localhost:5569")
180 /// .filter(b"")
181 /// .build()
182 /// .and_then(|sub| {
183 /// sub.stream().for_each(|multipart| {
184 /// for msg in multipart {
185 /// if let Some(msg) = msg.as_str() {
186 /// println!("Message: {}", msg);
187 /// }
188 /// }
189 /// Ok(())
190 /// })
191 /// });
192 ///
193 /// // tokio::run(fut.map(|_| ()).or_else(|e| {
194 /// // println!("Error: {}", e);
195 /// // Ok(())
196 /// // }));
197 /// }
198 /// ```
199 fn stream(self) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::Stream {
200 self.socket().stream()
201 }
202}
203
204/// This trait provides the basic Sink support for ZeroMQ Sockets. It depends on `IntoInnerSocket` and
205/// provides the `send` and `sink` methods.
206pub trait SinkSocket: IntoInnerSocket {
207 /// Send a single multipart message to the socket.
208 ///
209 /// ### Example, using a Pub wrapper type
210 /// ```rust
211 /// extern crate zmq;
212 /// extern crate futures;
213 /// extern crate tokio;
214 /// extern crate tokio_zmq;
215 ///
216 /// use std::sync::Arc;
217 ///
218 /// use futures::Future;
219 /// use tokio_zmq::{prelude::*, async_types::MultipartStream, Error, Pub};
220 ///
221 /// fn main() {
222 /// let context = Arc::new(zmq::Context::new());
223 /// let msg = zmq::Message::from_slice(b"Hello");
224 /// let fut = Pub::builder(context)
225 /// .connect("tcp://localhost:5569")
226 /// .build()
227 /// .and_then(|zpub| zpub.send(msg.into()));
228 ///
229 /// // tokio::run(fut.map(|_| ()).or_else(|e| {
230 /// // println!("Error: {}", e);
231 /// // Ok(())
232 /// // }));
233 /// }
234 /// ```
235 fn send(
236 self,
237 multipart: Multipart,
238 ) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::Request {
239 self.socket().send(multipart)
240 }
241
242 /// Send a stream of multipart messages to the socket.
243 ///
244 /// It takes a buffer_size argument, which will determine how many `Multipart`s can be
245 /// submitted into the send queue before the sink applies backpressure.
246 ///
247 /// ### Example, using a Pub wrapper type
248 /// ```rust
249 /// extern crate zmq;
250 /// extern crate futures;
251 /// extern crate tokio;
252 /// extern crate tokio_zmq;
253 ///
254 /// use std::sync::Arc;
255 ///
256 /// use futures::{Future, Stream, stream::iter_ok};
257 /// use tokio_zmq::{prelude::*, async_types::MultipartStream, Error, Multipart, Pub};
258 ///
259 /// fn main() {
260 /// let context = Arc::new(zmq::Context::new());
261 /// let fut = Pub::builder(context)
262 /// .connect("tcp://localhost:5570")
263 /// .build()
264 /// .and_then(|zpub| {
265 /// iter_ok(0..5)
266 /// .map(|i| {
267 /// zmq::Message::from_slice(format!("i: {}", i).as_bytes()).into()
268 /// })
269 /// .forward(zpub.sink(25))
270 /// });
271 ///
272 /// // tokio::run(fut.map(|_| ()).or_else(|e| {
273 /// // println!("Error: {}", e);
274 /// // Ok(())
275 /// // }));
276 /// }
277 /// ```
278 fn sink(
279 self,
280 buffer_size: usize,
281 ) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::Sink {
282 self.socket().sink(buffer_size)
283 }
284}
285
286/// This trait is provided for sockets that implement both Sync and Stream
287pub trait SinkStreamSocket: IntoInnerSocket {
288 /// Retrieve a structure that implements both Sync and Stream.
289 ///
290 /// It takes a buffer_size argument, which will determine how many `Multipart`s can be
291 /// submitted into the send queue before the sink applies backpressure.
292 ///
293 /// ### Example, using a Rep wrapper type
294 /// ```rust
295 /// extern crate futures;
296 /// extern crate tokio_zmq;
297 /// extern crate zmq;
298 ///
299 /// use std::sync::Arc;
300 ///
301 /// use futures::{Future, Stream};
302 /// use tokio_zmq::{prelude::*, Rep};
303 ///
304 /// fn main() {
305 /// let ctx = Arc::new(zmq::Context::new());
306 /// let fut = Rep::builder(ctx)
307 /// .bind("tcp://*:5571")
308 /// .build()
309 /// .and_then(|rep| {
310 /// let (sink, stream) = rep.sink_stream(25).split();
311 ///
312 /// stream.forward(sink)
313 /// });
314 ///
315 /// // tokio::run(fut.map(|_| ()).or_else(|e| {
316 /// // println!("Error: {}", e);
317 /// // Ok(())
318 /// // }));
319 /// }
320 /// ```
321 fn sink_stream(
322 self,
323 buffer_size: usize,
324 ) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::SinkStream;
325}
326
327/// This trait is provided to allow for ending a stream based on a Multipart message it receives.
328pub trait WithEndHandler: Stream<Item = Multipart> + Sized {
329 /// Add an EndHandler to a stream.
330 ///
331 /// ### Example, using a Sub wrapper type
332 /// ```rust
333 /// extern crate futures;
334 /// extern crate tokio_zmq;
335 /// extern crate zmq;
336 ///
337 /// use std::sync::Arc;
338 ///
339 /// use futures::{Future, Stream};
340 /// use tokio_zmq::{prelude::*, Sub, Multipart};
341 ///
342 /// struct End(u32);
343 ///
344 /// impl EndHandler for End {
345 /// fn should_stop(&mut self, multipart: &Multipart) -> bool {
346 /// self.0 += 1;
347 ///
348 /// self.0 > 30
349 /// }
350 /// }
351 ///
352 /// fn main() {
353 /// let ctx = Arc::new(zmq::Context::new());
354 /// let fut = Sub::builder(ctx)
355 /// .bind("tcp://*:5571")
356 /// .filter(b"")
357 /// .build()
358 /// .and_then(|sub| {
359 /// sub.stream()
360 /// .with_end_handler(End(0))
361 /// .for_each(|_| Ok(()))
362 /// });
363 ///
364 /// // tokio::run(fut.map(|_| ()).or_else(|e| {
365 /// // println!("Error: {}", e);
366 /// // Ok(())
367 /// // }));
368 /// }
369 /// ```
370 fn with_end_handler<E>(self, end_handler: E) -> EndingStream<E, Self, Self::Error>
371 where
372 E: EndHandler;
373}
374
375/// This trait is implemented by all Streams with Item = Multipart and Error = Error, it provides
376/// the ability to control when the stream stops based on the content of another stream.
377pub trait Controllable: Stream<Item = Multipart> + Sized {
378 /// Add a controller stream to a given stream. This allows the controller stream to decide when
379 /// the controlled stream should stop.
380 ///
381 /// ### Example, using a controlled Pull wrapper type and a controller Sub wrapper type
382 /// ```rust
383 /// extern crate futures;
384 /// extern crate tokio_zmq;
385 /// extern crate zmq;
386 ///
387 /// use std::sync::Arc;
388 ///
389 /// use futures::{Future, Stream};
390 /// use tokio_zmq::{prelude::*, Pull, Sub, Multipart};
391 ///
392 /// struct End;
393 ///
394 /// impl ControlHandler for End {
395 /// fn should_stop(&mut self, _: Multipart) -> bool {
396 /// true
397 /// }
398 /// }
399 ///
400 /// fn main() {
401 /// let ctx = Arc::new(zmq::Context::new());
402 /// let init_pull = Pull::builder(Arc::clone(&ctx))
403 /// .bind("tcp://*:5572")
404 /// .build();
405 ///
406 /// let init_sub = Sub::builder(ctx)
407 /// .bind("tcp://*:5573")
408 /// .filter(b"")
409 /// .build();
410 ///
411 /// let fut = init_pull
412 /// .join(init_sub)
413 /// .and_then(|(pull, sub)| {
414 /// pull.stream()
415 /// .controlled(sub.stream(), End)
416 /// .for_each(|_| Ok(()))
417 /// });
418 ///
419 /// // tokio::run(fut.map(|_| ()).or_else(|e| {
420 /// // println!("Error: {}", e);
421 /// // Ok(())
422 /// // }));
423 /// }
424 /// ```
425 fn controlled<H, S>(
426 self,
427 control_stream: S,
428 handler: H,
429 ) -> ControlledStream<H, S, Self, Self::Error>
430 where
431 H: ControlHandler,
432 S: Stream<Item = Multipart>,
433 Self: Stream<Item = Multipart, Error = S::Error>;
434}
435
436pub trait UnPair {}
437pub trait Pair {}
438pub trait Sub {}
439pub trait UnSub {}
440
441pub trait Build<T, E> {
442 type Result: Future<Item = T, Error = E>;
443
444 fn build(self) -> Self::Result;
445}
446
447/// This trait is implemented by all socket types to allow custom builders to be created
448pub trait HasBuilder: IntoInnerSocket {
449 fn builder(ctx: Arc<zmq::Context>) -> SocketBuilder<'static, Self>
450 where
451 Self: Sized,
452 {
453 SocketBuilder::new(ctx)
454 }
455}
456
457/* ----------------------------------impls----------------------------------- */
458
459impl<T> HasBuilder for T where T: IntoInnerSocket {}
460
461impl<T> SinkStreamSocket for T
462where
463 T: StreamSocket + SinkSocket,
464{
465 fn sink_stream(
466 self,
467 buffer_size: usize,
468 ) -> <<Self as IntoInnerSocket>::Socket as InnerSocket<Self>>::SinkStream {
469 self.socket().sink_stream(buffer_size)
470 }
471}
472
473impl<T> WithEndHandler for T
474where
475 T: Stream<Item = Multipart>,
476{
477 fn with_end_handler<E>(self, end_handler: E) -> EndingStream<E, Self, Self::Error>
478 where
479 E: EndHandler,
480 {
481 EndingStream::new(self, end_handler)
482 }
483}
484
485impl<T> Controllable for T
486where
487 T: Stream<Item = Multipart>,
488{
489 fn controlled<H, S>(
490 self,
491 control_stream: S,
492 handler: H,
493 ) -> ControlledStream<H, S, Self, Self::Error>
494 where
495 H: ControlHandler,
496 S: Stream<Item = Multipart, Error = T::Error>,
497 {
498 ControlledStream::new(self, control_stream, handler)
499 }
500}