use crate::{
core::{raw::GetRawSocket, *},
error::{msg_from_errno, Error, ErrorKind},
msg::Msg,
};
use libzmq_sys as sys;
use sys::errno;
use std::{
os::raw::{c_int, c_void},
time::Duration,
};
fn recv(
socket_ptr: *mut c_void,
msg: &mut Msg,
no_block: bool,
) -> Result<(), Error> {
let rc = unsafe {
sys::zmq_msg_recv(msg.as_mut_ptr(), socket_ptr, no_block as c_int)
};
if rc == -1 {
let errno = unsafe { sys::zmq_errno() };
let err = match errno {
errno::EAGAIN => Error::new(ErrorKind::WouldBlock),
errno::ENOTSUP => panic!("recv not supported by socket type"),
errno::EFSM => {
panic!("operation cannot be completed in current socket state")
}
errno::ETERM => Error::new(ErrorKind::InvalidCtx),
errno::ENOTSOCK => panic!("invalid socket"),
errno::EINTR => Error::new(ErrorKind::Interrupted),
errno::EFAULT => panic!("invalid message"),
_ => panic!(msg_from_errno(errno)),
};
Err(err)
} else {
Ok(())
}
}
pub trait RecvMsg: GetRawSocket {
fn recv(&self, msg: &mut Msg) -> Result<(), Error> {
recv(self.raw_socket().as_mut_ptr(), msg, false)
}
fn try_recv(&self, msg: &mut Msg) -> Result<(), Error> {
recv(self.raw_socket().as_mut_ptr(), msg, true)
}
fn recv_msg(&self) -> Result<Msg, Error> {
let mut msg = Msg::new();
self.recv(&mut msg)?;
Ok(msg)
}
fn try_recv_msg(&self) -> Result<Msg, Error> {
let mut msg = Msg::new();
self.try_recv(&mut msg)?;
Ok(msg)
}
fn recv_hwm(&self) -> Result<i32, Error> {
self.raw_socket().recv_hwm()
}
fn set_recv_hwm(&self, hwm: i32) -> Result<(), Error> {
self.raw_socket().set_recv_hwm(hwm)
}
fn recv_timeout(&self) -> Result<Period, Error> {
self.raw_socket().recv_timeout()
}
fn set_recv_timeout<P>(&self, period: P) -> Result<(), Error>
where
P: Into<Period>,
{
self.raw_socket().set_recv_timeout(period.into())
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
#[doc(hidden)]
pub struct RecvConfig {
pub(crate) recv_hwm: HighWaterMark,
pub(crate) recv_timeout: Period,
}
impl RecvConfig {
pub(crate) fn apply<S: RecvMsg>(&self, socket: &S) -> Result<(), Error> {
socket.set_recv_hwm(self.recv_hwm.into())?;
socket.set_recv_timeout(self.recv_timeout)?;
Ok(())
}
}
#[doc(hidden)]
pub trait GetRecvConfig: private::Sealed {
fn recv_config(&self) -> &RecvConfig;
fn recv_config_mut(&mut self) -> &mut RecvConfig;
}
pub trait ConfigureRecv: GetRecvConfig {
fn recv_hwm(&self) -> i32 {
self.recv_config().recv_hwm.into()
}
fn set_recv_hwm(&mut self, hwm: i32) {
self.recv_config_mut().recv_hwm = HighWaterMark(hwm);
}
fn recv_timeout(&self) -> Period {
self.recv_config().recv_timeout
}
fn set_recv_timeout(&mut self, period: Period) {
self.recv_config_mut().recv_timeout = period;
}
}
pub trait BuildRecv: GetRecvConfig {
fn recv_hwm(&mut self, hwm: i32) -> &mut Self {
self.recv_config_mut().recv_hwm = HighWaterMark(hwm);
self
}
fn recv_timeout(&mut self, timeout: Duration) -> &mut Self {
self.recv_config_mut().recv_timeout = Finite(timeout);
self
}
}