async_zmq/
router.rs

1//! ROUTER socket module of Request-reply pattern in ZMQ
2//!
3//! Use the [`router`] function to instantiate a router socket and use methods
4//! from the [`Sink`]/[`SinkExt`] and [`Stream`]/[`StreamExt`] traits.
5//!
6//! A router socket must be paired with a [`dealer`], [`request`] or another
7//! router socket.
8//!
9//! # Example
10//!
11//! ```no_run
12//! ```
13//!
14//! [`dealer`]: ../dealer/index.html
15//! [`request`]: ../request/index.html
16//! [`router`]: fn.router.html
17//! [`Sink`]: ../trait.Sink.html
18//! [`SinkExt`]: ../trait.SinkExt.html
19//! [`Stream`]: ../trait.Stream.html
20//! [`StreamExt`]: ../trait.StreamExt.html
21
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use crate::{
26    reactor::{AsRawSocket, ZmqSocket},
27    socket::{Broker, Multipart, MultipartIter, SocketBuilder},
28    RecvError, SendError, Sink, SocketError, Stream,
29};
30use zmq::{Message, SocketType};
31
32/// Create a ZMQ socket with ROUTER type
33pub fn router<I: Iterator<Item = T> + Unpin, T: Into<Message>>(
34    endpoint: &str,
35) -> Result<SocketBuilder<'_, Router<I, T>>, SocketError> {
36    Ok(SocketBuilder::new(SocketType::ROUTER, endpoint))
37}
38
39/// The async wrapper of ZMQ socket with ROUTER type
40pub struct Router<I: Iterator<Item = T> + Unpin, T: Into<Message>>(Broker<I, T>);
41
42impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> Router<I, T> {
43    /// Represent as `Socket` from zmq crate in case you want to call its methods.
44    pub fn as_raw_socket(&self) -> &zmq::Socket {
45        self.0.socket.as_socket()
46    }
47}
48
49impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> Sink<MultipartIter<I, T>> for Router<I, T> {
50    type Error = SendError;
51
52    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53        Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx)
54            .map(|result| result.map_err(Into::into))
55    }
56
57    fn start_send(self: Pin<&mut Self>, item: MultipartIter<I, T>) -> Result<(), Self::Error> {
58        Pin::new(&mut self.get_mut().0)
59            .start_send(item)
60            .map_err(Into::into)
61    }
62
63    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64        Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx)
65            .map(|result| result.map_err(Into::into))
66    }
67
68    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
69        Sink::poll_close(Pin::new(&mut self.get_mut().0), cx)
70            .map(|result| result.map_err(Into::into))
71    }
72}
73
74impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> Stream for Router<I, T> {
75    type Item = Result<Multipart, RecvError>;
76
77    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78        Pin::new(&mut self.get_mut().0)
79            .poll_next(cx)
80            .map(|poll| poll.map(|result| result.map_err(Into::into)))
81    }
82}
83
84impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> From<zmq::Socket> for Router<I, T> {
85    fn from(socket: zmq::Socket) -> Self {
86        Self(Broker {
87            socket: ZmqSocket::from(socket),
88            buffer: None,
89        })
90    }
91}