use crate::{
reactor::{AsRawSocket, ZmqSocket},
socket::{Multipart, MultipartIter, Sender, SocketBuilder},
RequestReplyError, SocketError,
};
use futures::future::poll_fn;
use std::sync::atomic::{AtomicBool, Ordering};
use zmq::{Message, SocketType};
pub fn request<I: Iterator<Item = T> + Unpin, T: Into<Message>>(
endpoint: &str,
) -> Result<SocketBuilder<'_, Request<I, T>>, SocketError> {
Ok(SocketBuilder::new(SocketType::REQ, endpoint))
}
pub struct Request<I: Iterator<Item = T> + Unpin, T: Into<Message>> {
inner: Sender<I, T>,
received: AtomicBool,
}
impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> From<zmq::Socket> for Request<I, T> {
fn from(socket: zmq::Socket) -> Self {
Self {
inner: Sender {
socket: ZmqSocket::from(socket),
buffer: None,
},
received: AtomicBool::new(false),
}
}
}
impl<I: Iterator<Item = T> + Unpin, T: Into<Message>> Request<I, T> {
pub async fn send<S: Into<MultipartIter<I, T>>>(
&self,
msg: S,
) -> Result<(), RequestReplyError> {
let mut msg = msg.into();
poll_fn(move |cx| self.inner.socket.send(cx, &mut msg)).await?;
self.received.store(false, Ordering::Relaxed);
Ok(())
}
pub async fn recv(&self) -> Result<Multipart, RequestReplyError> {
let msg = poll_fn(|cx| self.inner.socket.recv(cx)).await?;
self.received.store(true, Ordering::Relaxed);
Ok(msg)
}
pub async fn as_raw_socket(&self) -> &zmq::Socket {
self.inner.socket.as_socket()
}
}