use std::{
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll},
};
use zmq::{Message, SocketType};
use crate::{
reactor::{AsRawSocket, ZmqSocket},
socket::{Multipart, MultipartIter, Sender, SocketBuilder},
RecvError, RequestReplyError, SocketError,
};
use futures::{future::poll_fn, Stream};
pub fn reply<I: Iterator<Item = T> + Unpin, T: Into<Message>>(
endpoint: &str,
) -> Result<SocketBuilder<'_, Reply<I, T>>, SocketError> {
Ok(SocketBuilder::new(SocketType::REP, endpoint))
}
pub struct Reply<I: Iterator<Item = T> + Unpin, T: Into<Message>> {
inner: Sender<I, T>,
received: AtomicBool,
}
impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> From<zmq::Socket> for Reply<I, T> {
fn from(socket: zmq::Socket) -> Self {
Self {
inner: Sender {
socket: ZmqSocket::from(socket),
buffer: None,
},
received: AtomicBool::new(false),
}
}
}
impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> Reply<I, T> {
pub async fn recv(&self) -> Result<Multipart, RequestReplyError> {
let msg = poll_fn(|cx| self.inner.socket.recv(cx)).await?;
self.received.store(true, Ordering::Relaxed);
Ok(msg)
}
pub async fn send<S: Into<MultipartIter<I, T>>>(
&self,
msg: S,
) -> Result<(), RequestReplyError> {
let mut msg = msg.into();
poll_fn(move |cx| self.inner.socket.send(cx, &mut msg)).await?;
self.received.store(false, Ordering::Relaxed);
Ok(())
}
pub fn as_raw_socket(&self) -> &zmq::Socket {
self.inner.socket.as_socket()
}
}
impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> Stream for Reply<I, T> {
type Item = Result<Multipart, RecvError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(Some(Ok(futures::ready!(self.inner.socket.recv(cx))?)))
}
}