1use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use crate::{
26 reactor::{AsRawSocket, ZmqSocket},
27 socket::{Broker, Multipart, MultipartIter, SocketBuilder},
28 RecvError, SendError, Sink, SocketError, Stream,
29};
30use zmq::{Message, SocketType};
31
32pub fn router<I: Iterator<Item = T> + Unpin, T: Into<Message>>(
34 endpoint: &str,
35) -> Result<SocketBuilder<'_, Router<I, T>>, SocketError> {
36 Ok(SocketBuilder::new(SocketType::ROUTER, endpoint))
37}
38
39pub struct Router<I: Iterator<Item = T> + Unpin, T: Into<Message>>(Broker<I, T>);
41
42impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> Router<I, T> {
43 pub fn as_raw_socket(&self) -> &zmq::Socket {
45 self.0.socket.as_socket()
46 }
47}
48
49impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> Sink<MultipartIter<I, T>> for Router<I, T> {
50 type Error = SendError;
51
52 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53 Sink::poll_ready(Pin::new(&mut self.get_mut().0), cx)
54 .map(|result| result.map_err(Into::into))
55 }
56
57 fn start_send(self: Pin<&mut Self>, item: MultipartIter<I, T>) -> Result<(), Self::Error> {
58 Pin::new(&mut self.get_mut().0)
59 .start_send(item)
60 .map_err(Into::into)
61 }
62
63 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64 Sink::poll_flush(Pin::new(&mut self.get_mut().0), cx)
65 .map(|result| result.map_err(Into::into))
66 }
67
68 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
69 Sink::poll_close(Pin::new(&mut self.get_mut().0), cx)
70 .map(|result| result.map_err(Into::into))
71 }
72}
73
74impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> Stream for Router<I, T> {
75 type Item = Result<Multipart, RecvError>;
76
77 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78 Pin::new(&mut self.get_mut().0)
79 .poll_next(cx)
80 .map(|poll| poll.map(|result| result.map_err(Into::into)))
81 }
82}
83
84impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> From<zmq::Socket> for Router<I, T> {
85 fn from(socket: zmq::Socket) -> Self {
86 Self(Broker {
87 socket: ZmqSocket::from(socket),
88 buffer: None,
89 })
90 }
91}