use crate::error::{msg_from_errno, Error, ErrorKind};
use libzmq_sys as sys;
use sys::errno;
use libc::{c_int, size_t};
use std::{
ffi::CString,
os::raw::c_void,
time::Duration,
{mem, ptr, str},
};
const MAX_OPTION_SIZE: size_t = 255;
#[derive(Copy, Clone, Debug)]
#[allow(dead_code)]
pub(crate) enum SocketOption {
Backlog = sys::ZMQ_BACKLOG as isize,
ConnectTimeout = sys::ZMQ_CONNECT_TIMEOUT as isize,
FileDescriptor = sys::ZMQ_FD as isize,
HeartbeatInterval = sys::ZMQ_HEARTBEAT_IVL as isize,
HeartbeatTimeout = sys::ZMQ_HEARTBEAT_TIMEOUT as isize,
HeartbeatTtl = sys::ZMQ_HEARTBEAT_TTL as isize,
SendHighWaterMark = sys::ZMQ_SNDHWM as isize,
SendTimeout = sys::ZMQ_SNDTIMEO as isize,
RecvHighWaterMark = sys::ZMQ_RCVHWM as isize,
RecvTimeout = sys::ZMQ_RCVTIMEO as isize,
NoDrop = sys::ZMQ_XPUB_NODROP as isize,
Linger = sys::ZMQ_LINGER as isize,
LastEndpoint = sys::ZMQ_LAST_ENDPOINT as isize,
PlainPassword = sys::ZMQ_PLAIN_PASSWORD as isize,
PlainUsername = sys::ZMQ_PLAIN_USERNAME as isize,
PlainServer = sys::ZMQ_PLAIN_SERVER as isize,
EnforceDomain = sys::ZMQ_ZAP_ENFORCE_DOMAIN as isize,
ZapDomain = sys::ZMQ_ZAP_DOMAIN as isize,
Subscribe = sys::ZMQ_SUBSCRIBE as isize,
Unsubscribe = sys::ZMQ_UNSUBSCRIBE as isize,
CurvePublicKey = sys::ZMQ_CURVE_PUBLICKEY as isize,
CurveSecretKey = sys::ZMQ_CURVE_SECRETKEY as isize,
CurveServer = sys::ZMQ_CURVE_SERVER as isize,
CurveServerKey = sys::ZMQ_CURVE_SERVERKEY as isize,
InBatchSize = sys::ZMQ_IN_BATCH_SIZE as isize,
OutBatchSize = sys::ZMQ_OUT_BATCH_SIZE as isize,
}
impl From<SocketOption> for c_int {
fn from(s: SocketOption) -> c_int {
match s {
SocketOption::Backlog => SocketOption::Backlog as c_int,
SocketOption::ConnectTimeout => {
SocketOption::ConnectTimeout as c_int
}
SocketOption::FileDescriptor => {
SocketOption::FileDescriptor as c_int
}
SocketOption::HeartbeatInterval => {
SocketOption::HeartbeatInterval as c_int
}
SocketOption::HeartbeatTimeout => {
SocketOption::HeartbeatTimeout as c_int
}
SocketOption::HeartbeatTtl => SocketOption::HeartbeatTtl as c_int,
SocketOption::SendHighWaterMark => {
SocketOption::SendHighWaterMark as c_int
}
SocketOption::SendTimeout => SocketOption::SendTimeout as c_int,
SocketOption::RecvHighWaterMark => {
SocketOption::RecvHighWaterMark as c_int
}
SocketOption::RecvTimeout => SocketOption::RecvTimeout as c_int,
SocketOption::NoDrop => SocketOption::NoDrop as c_int,
SocketOption::Linger => SocketOption::Linger as c_int,
SocketOption::LastEndpoint => SocketOption::LastEndpoint as c_int,
SocketOption::PlainPassword => SocketOption::PlainPassword as c_int,
SocketOption::PlainUsername => SocketOption::PlainUsername as c_int,
SocketOption::PlainServer => SocketOption::PlainServer as c_int,
SocketOption::EnforceDomain => SocketOption::EnforceDomain as c_int,
SocketOption::ZapDomain => SocketOption::ZapDomain as c_int,
SocketOption::Subscribe => SocketOption::Subscribe as c_int,
SocketOption::Unsubscribe => SocketOption::Unsubscribe as c_int,
SocketOption::CurvePublicKey => {
SocketOption::CurvePublicKey as c_int
}
SocketOption::CurveSecretKey => {
SocketOption::CurveSecretKey as c_int
}
SocketOption::CurveServer => SocketOption::CurveServer as c_int,
SocketOption::CurveServerKey => {
SocketOption::CurveServerKey as c_int
}
SocketOption::InBatchSize => SocketOption::InBatchSize as c_int,
SocketOption::OutBatchSize => SocketOption::OutBatchSize as c_int,
}
}
}
fn getsockopt(
mut_sock_ptr: *mut c_void,
option: SocketOption,
mut_value_ptr: *mut c_void,
size: &mut size_t,
) -> Result<(), Error> {
let rc = unsafe {
sys::zmq_getsockopt(mut_sock_ptr, option.into(), mut_value_ptr, size)
};
if rc == -1 {
let errno = unsafe { sys::zmq_errno() };
let err = match errno {
errno::EINVAL => panic!("invalid option"),
errno::ETERM => Error::new(ErrorKind::InvalidCtx),
errno::ENOTSOCK => panic!("invalid socket"),
errno::EINTR => Error::new(ErrorKind::Interrupted),
_ => panic!(msg_from_errno(errno)),
};
Err(err)
} else {
Ok(())
}
}
pub(crate) fn getsockopt_bool(
mut_sock_ptr: *mut c_void,
option: SocketOption,
) -> Result<bool, Error> {
let mut value = c_int::default();
let mut size = mem::size_of::<c_int>();
let value_ptr = &mut value as *mut c_int as *mut c_void;
getsockopt(mut_sock_ptr, option, value_ptr, &mut size)?;
Ok(value != 0)
}
pub(crate) fn getsockopt_scalar<T>(
mut_sock_ptr: *mut c_void,
option: SocketOption,
) -> Result<T, Error>
where
T: Default,
{
let mut value = T::default();
let mut size = mem::size_of::<T>();
let value_ptr = &mut value as *mut T as *mut c_void;
getsockopt(mut_sock_ptr, option, value_ptr, &mut size)?;
Ok(value)
}
pub(crate) fn getsockopt_bytes(
mut_sock_ptr: *mut c_void,
option: SocketOption,
) -> Result<Option<Vec<u8>>, Error> {
let mut size = MAX_OPTION_SIZE;
let mut value = vec![0u8; size];
let value_ptr = value.as_mut_ptr() as *mut c_void;
getsockopt(mut_sock_ptr, option, value_ptr, &mut size)?;
if size == 0 {
Ok(None)
} else {
value.truncate(size);
Ok(Some(value))
}
}
pub(crate) fn getsockopt_string(
mut_sock_ptr: *mut c_void,
option: SocketOption,
) -> Result<Option<String>, Error> {
match getsockopt_bytes(mut_sock_ptr, option)? {
Some(mut bytes) => {
bytes.pop();
if bytes.is_empty() {
Ok(None)
} else {
let c_str = unsafe { CString::from_vec_unchecked(bytes) };
Ok(Some(c_str.into_string().unwrap()))
}
}
None => Ok(None),
}
}
pub(crate) fn getsockopt_option_duration(
mut_sock_ptr: *mut c_void,
option: SocketOption,
none_value: i32,
) -> Result<Option<Duration>, Error> {
let ms: i32 = getsockopt_scalar(mut_sock_ptr, option)?;
if ms == none_value {
Ok(None)
} else {
Ok(Some(Duration::from_millis(ms as u64)))
}
}
fn setsockopt(
mut_sock_ptr: *mut c_void,
option: SocketOption,
value_ptr: *const c_void,
size: size_t,
) -> Result<(), Error> {
let rc = unsafe {
sys::zmq_setsockopt(mut_sock_ptr, option.into(), value_ptr, size)
};
if rc == -1 {
let errno = unsafe { sys::zmq_errno() };
let err = match errno {
errno::EINVAL => panic!("invalid option"),
errno::ETERM => Error::new(ErrorKind::InvalidCtx),
errno::ENOTSOCK => panic!("invalid socket"),
errno::EINTR => Error::new(ErrorKind::Interrupted),
_ => panic!(msg_from_errno(errno)),
};
Err(err)
} else {
Ok(())
}
}
pub(crate) fn setsockopt_bool(
mut_sock_ptr: *mut c_void,
option: SocketOption,
value: bool,
) -> Result<(), Error> {
let value = value as c_int;
let size = mem::size_of::<c_int>() as size_t;
let value_ptr = &value as *const c_int as *const c_void;
setsockopt(mut_sock_ptr, option, value_ptr, size)
}
pub(crate) fn setsockopt_scalar<T>(
mut_sock_ptr: *mut c_void,
option: SocketOption,
value: T,
) -> Result<(), Error> {
let size = mem::size_of::<T>() as size_t;
let value_ptr = &value as *const T as *const c_void;
setsockopt(mut_sock_ptr, option, value_ptr, size)
}
pub(crate) fn setsockopt_option_scalar<T>(
mut_sock_ptr: *mut c_void,
option: SocketOption,
maybe: Option<T>,
none_value: T,
) -> Result<(), Error>
where
T: Eq,
{
let size = mem::size_of::<T>() as size_t;
match maybe {
Some(value) => {
let value_ptr = &value as *const T as *const c_void;
setsockopt(mut_sock_ptr, option, value_ptr, size)
}
None => {
let value_ptr = &none_value as *const T as *const c_void;
setsockopt(mut_sock_ptr, option, value_ptr, size)
}
}
}
pub(crate) fn setsockopt_bytes(
mut_sock_ptr: *mut c_void,
option: SocketOption,
maybe: Option<&[u8]>,
) -> Result<(), Error> {
match maybe {
Some(bytes) => {
let size = bytes.len();
let value_ptr = bytes.as_ptr() as *const c_void;
setsockopt(mut_sock_ptr, option, value_ptr, size)
}
None => setsockopt_null(mut_sock_ptr, option),
}
}
pub(crate) fn setsockopt_str(
mut_sock_ptr: *mut c_void,
option: SocketOption,
maybe: Option<&str>,
) -> Result<(), Error> {
setsockopt_bytes(mut_sock_ptr, option, maybe.map(str::as_bytes))
}
pub(crate) fn setsockopt_null(
mut_sock_ptr: *mut c_void,
option: SocketOption,
) -> Result<(), Error> {
setsockopt(mut_sock_ptr, option, ptr::null(), 0)
}
pub(crate) fn setsockopt_option_duration(
mut_sock_ptr: *mut c_void,
option: SocketOption,
maybe: Option<Duration>,
none_value: i32,
) -> Result<(), Error> {
if let Some(duration) = maybe {
check_duration(duration)?;
}
setsockopt_option_scalar(
mut_sock_ptr,
option,
maybe.map(|d| d.as_millis() as i32),
none_value,
)
}
pub(crate) fn setsockopt_duration(
mut_sock_ptr: *mut c_void,
option: SocketOption,
duration: Duration,
) -> Result<(), Error> {
check_duration(duration)?;
setsockopt_scalar(mut_sock_ptr, option, duration.as_millis() as i32)
}
fn check_duration(duration: Duration) -> Result<(), Error> {
if duration.as_millis() > i32::max_value() as u128 {
Err(Error::new(ErrorKind::InvalidInput(
"ms in duration cannot be greater than i32::MAX",
)))
} else {
Ok(())
}
}