use crate::{
core::*,
error::{msg_from_errno, Error, ErrorKind},
msg::Msg,
};
use libzmq_sys as sys;
use sys::errno;
use std::{
os::raw::{c_int, c_void},
time::Duration,
};
fn send(
socket_ptr: *mut c_void,
mut msg: Msg,
no_block: bool,
) -> Result<(), Error<Msg>> {
let rc = unsafe {
sys::zmq_msg_send(msg.as_mut_ptr(), socket_ptr, no_block as c_int)
};
if rc == -1 {
let errno = unsafe { sys::zmq_errno() };
let err = match errno {
errno::EAGAIN => Error::with_content(ErrorKind::WouldBlock, msg),
errno::ENOTSUP => panic!("send is not supported by socket type"),
errno::EINVAL => {
panic!("multipart messages are not supported by socket type")
}
errno::EFSM => {
panic!("operation cannot be completed in current socket state")
}
errno::ETERM => Error::with_content(ErrorKind::InvalidCtx, msg),
errno::ENOTSOCK => panic!("invalid socket"),
errno::EINTR => Error::with_content(ErrorKind::Interrupted, msg),
errno::EFAULT => panic!("invalid message"),
errno::EHOSTUNREACH => {
Error::with_content(ErrorKind::HostUnreachable, msg)
}
_ => panic!(msg_from_errno(errno)),
};
Err(err)
} else {
Ok(())
}
}
pub trait SendMsg: GetRawSocket {
fn send<M>(&self, msg: M) -> Result<(), Error<Msg>>
where
M: Into<Msg>,
{
send(self.raw_socket().as_mut_ptr(), msg.into(), false)
}
fn try_send<M>(&self, msg: M) -> Result<(), Error<Msg>>
where
M: Into<Msg>,
{
send(self.raw_socket().as_mut_ptr(), msg.into(), true)
}
fn send_hwm(&self) -> Result<i32, Error> {
self.raw_socket().send_hwm()
}
fn set_send_hwm(&self, hwm: i32) -> Result<(), Error> {
self.raw_socket().set_send_hwm(hwm)
}
fn send_timeout(&self) -> Result<Period, Error> {
self.raw_socket().send_timeout()
}
fn set_send_timeout<P>(&self, period: P) -> Result<(), Error>
where
P: Into<Period>,
{
self.raw_socket().set_send_timeout(period.into())
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
#[doc(hidden)]
pub struct SendConfig {
pub(crate) send_hwm: HighWaterMark,
pub(crate) send_timeout: Period,
}
impl SendConfig {
pub(crate) fn apply<S: SendMsg>(&self, socket: &S) -> Result<(), Error> {
socket.set_send_hwm(self.send_hwm.into())?;
socket.set_send_timeout(self.send_timeout)?;
Ok(())
}
}
#[doc(hidden)]
pub trait GetSendConfig: private::Sealed {
fn send_config(&self) -> &SendConfig;
fn send_config_mut(&mut self) -> &mut SendConfig;
}
pub trait ConfigureSend: GetSendConfig {
fn send_hwm(&self) -> i32 {
self.send_config().send_hwm.into()
}
fn set_send_hwm(&mut self, hwm: i32) {
self.send_config_mut().send_hwm = HighWaterMark(hwm);
}
fn send_timeout(&self) -> Period {
self.send_config().send_timeout
}
fn set_send_timeout(&mut self, period: Period) {
self.send_config_mut().send_timeout = period;
}
}
pub trait BuildSend: GetSendConfig {
fn send_hwm(&mut self, hwm: i32) -> &mut Self {
self.send_config_mut().send_hwm = HighWaterMark(hwm);
self
}
fn send_timeout(&mut self, timeout: Duration) -> &mut Self {
self.send_config_mut().send_timeout = Finite(timeout);
self
}
}