#![allow(trivial_numeric_casts)]
use bitflags::bitflags;
use libc::{c_int, c_long, c_short};
use std::ffi;
use std::fmt;
use std::marker::PhantomData;
use std::os::raw::c_void;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, RawFd as UnixRawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::result;
use std::string::FromUtf8Error;
use std::sync::Arc;
use std::{mem, ptr, str};
use zmq_sys2::{errno, RawFd};
macro_rules! zmq_try {
($($tt:tt)*) => {{
let rc = $($tt)*;
if rc == -1 {
return Err(crate::errno_to_error());
}
rc
}}
}
mod message;
mod sockopt;
use crate::message::msg_ptr;
pub use crate::message::Message;
pub use crate::SocketType::*;
pub type Result<T> = result::Result<T, Error>;
#[allow(non_camel_case_types)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SocketType {
PAIR,
PUB,
SUB,
REQ,
REP,
DEALER,
ROUTER,
PULL,
PUSH,
XPUB,
XSUB,
STREAM,
}
impl SocketType {
fn to_raw(self) -> c_int {
let raw = match self {
PAIR => zmq_sys2::ZMQ_PAIR,
PUB => zmq_sys2::ZMQ_PUB,
SUB => zmq_sys2::ZMQ_SUB,
REQ => zmq_sys2::ZMQ_REQ,
REP => zmq_sys2::ZMQ_REP,
DEALER => zmq_sys2::ZMQ_DEALER,
ROUTER => zmq_sys2::ZMQ_ROUTER,
PULL => zmq_sys2::ZMQ_PULL,
PUSH => zmq_sys2::ZMQ_PUSH,
XPUB => zmq_sys2::ZMQ_XPUB,
XSUB => zmq_sys2::ZMQ_XSUB,
STREAM => zmq_sys2::ZMQ_STREAM,
};
raw as c_int
}
fn from_raw(raw: c_int) -> SocketType {
match raw as u32 {
zmq_sys2::ZMQ_PAIR => PAIR,
zmq_sys2::ZMQ_PUB => PUB,
zmq_sys2::ZMQ_SUB => SUB,
zmq_sys2::ZMQ_REQ => REQ,
zmq_sys2::ZMQ_REP => REP,
zmq_sys2::ZMQ_DEALER => DEALER,
zmq_sys2::ZMQ_ROUTER => ROUTER,
zmq_sys2::ZMQ_PULL => PULL,
zmq_sys2::ZMQ_PUSH => PUSH,
zmq_sys2::ZMQ_XPUB => XPUB,
zmq_sys2::ZMQ_XSUB => XSUB,
zmq_sys2::ZMQ_STREAM => STREAM,
_ => panic!("socket type is out of range!"),
}
}
}
#[allow(non_camel_case_types)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SocketEvent {
CONNECTED = zmq_sys2::ZMQ_EVENT_CONNECTED as isize,
CONNECT_DELAYED = zmq_sys2::ZMQ_EVENT_CONNECT_DELAYED as isize,
CONNECT_RETRIED = zmq_sys2::ZMQ_EVENT_CONNECT_RETRIED as isize,
LISTENING = zmq_sys2::ZMQ_EVENT_LISTENING as isize,
BIND_FAILED = zmq_sys2::ZMQ_EVENT_BIND_FAILED as isize,
ACCEPTED = zmq_sys2::ZMQ_EVENT_ACCEPTED as isize,
ACCEPT_FAILED = zmq_sys2::ZMQ_EVENT_ACCEPT_FAILED as isize,
CLOSED = zmq_sys2::ZMQ_EVENT_CLOSED as isize,
CLOSE_FAILED = zmq_sys2::ZMQ_EVENT_CLOSE_FAILED as isize,
DISCONNECTED = zmq_sys2::ZMQ_EVENT_DISCONNECTED as isize,
MONITOR_STOPPED = zmq_sys2::ZMQ_EVENT_MONITOR_STOPPED as isize,
HANDSHAKE_FAILED_NO_DETAIL = zmq_sys2::ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL as isize,
HANDSHAKE_SUCCEEDED = zmq_sys2::ZMQ_EVENT_HANDSHAKE_SUCCEEDED as isize,
HANDSHAKE_FAILED_PROTOCOL = zmq_sys2::ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL as isize,
HANDSHAKE_FAILED_AUTH = zmq_sys2::ZMQ_EVENT_HANDSHAKE_FAILED_AUTH as isize,
ALL = zmq_sys2::ZMQ_EVENT_ALL as isize,
}
impl SocketEvent {
pub fn to_raw(self) -> u16 {
self as u16
}
pub fn from_raw(raw: u16) -> SocketEvent {
use SocketEvent::*;
match u32::from(raw) {
zmq_sys2::ZMQ_EVENT_CONNECTED => CONNECTED,
zmq_sys2::ZMQ_EVENT_CONNECT_DELAYED => CONNECT_DELAYED,
zmq_sys2::ZMQ_EVENT_CONNECT_RETRIED => CONNECT_RETRIED,
zmq_sys2::ZMQ_EVENT_LISTENING => LISTENING,
zmq_sys2::ZMQ_EVENT_BIND_FAILED => BIND_FAILED,
zmq_sys2::ZMQ_EVENT_ACCEPTED => ACCEPTED,
zmq_sys2::ZMQ_EVENT_ACCEPT_FAILED => ACCEPT_FAILED,
zmq_sys2::ZMQ_EVENT_CLOSED => CLOSED,
zmq_sys2::ZMQ_EVENT_CLOSE_FAILED => CLOSE_FAILED,
zmq_sys2::ZMQ_EVENT_DISCONNECTED => DISCONNECTED,
zmq_sys2::ZMQ_EVENT_MONITOR_STOPPED => MONITOR_STOPPED,
zmq_sys2::ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL => HANDSHAKE_FAILED_NO_DETAIL,
zmq_sys2::ZMQ_EVENT_HANDSHAKE_SUCCEEDED => HANDSHAKE_SUCCEEDED,
zmq_sys2::ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL => HANDSHAKE_FAILED_PROTOCOL,
zmq_sys2::ZMQ_EVENT_HANDSHAKE_FAILED_AUTH => HANDSHAKE_FAILED_AUTH,
zmq_sys2::ZMQ_EVENT_ALL => ALL,
x => panic!("unknown event type {}", x),
}
}
}
pub static DONTWAIT: i32 = zmq_sys2::ZMQ_DONTWAIT as i32;
pub static SNDMORE: i32 = zmq_sys2::ZMQ_SNDMORE as i32;
#[allow(non_camel_case_types)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Mechanism {
ZMQ_NULL,
ZMQ_PLAIN,
ZMQ_CURVE,
ZMQ_GSSAPI,
}
#[derive(Clone, Copy, Eq, PartialEq)]
pub enum Error {
EACCES,
EADDRINUSE,
EAGAIN,
EBUSY,
ECONNREFUSED,
EFAULT,
EINTR,
EHOSTUNREACH,
EINPROGRESS,
EINVAL,
EMFILE,
EMSGSIZE,
ENAMETOOLONG,
ENODEV,
ENOENT,
ENOMEM,
ENOTCONN,
ENOTSOCK,
EPROTO,
EPROTONOSUPPORT,
ENOTSUP,
ENOBUFS,
ENETDOWN,
EADDRNOTAVAIL,
EFSM,
ENOCOMPATPROTO,
ETERM,
EMTHREAD,
}
impl Error {
pub fn to_raw(self) -> i32 {
match self {
Error::EACCES => errno::EACCES,
Error::EADDRINUSE => errno::EADDRINUSE,
Error::EAGAIN => errno::EAGAIN,
Error::EBUSY => errno::EBUSY,
Error::ECONNREFUSED => errno::ECONNREFUSED,
Error::EFAULT => errno::EFAULT,
Error::EINTR => errno::EINTR,
Error::EHOSTUNREACH => errno::EHOSTUNREACH,
Error::EINPROGRESS => errno::EINPROGRESS,
Error::EINVAL => errno::EINVAL,
Error::EMFILE => errno::EMFILE,
Error::EMSGSIZE => errno::EMSGSIZE,
Error::ENAMETOOLONG => errno::ENAMETOOLONG,
Error::ENODEV => errno::ENODEV,
Error::ENOENT => errno::ENOENT,
Error::ENOMEM => errno::ENOMEM,
Error::ENOTCONN => errno::ENOTCONN,
Error::ENOTSOCK => errno::ENOTSOCK,
Error::EPROTO => errno::EPROTO,
Error::EPROTONOSUPPORT => errno::EPROTONOSUPPORT,
Error::ENOTSUP => errno::ENOTSUP,
Error::ENOBUFS => errno::ENOBUFS,
Error::ENETDOWN => errno::ENETDOWN,
Error::EADDRNOTAVAIL => errno::EADDRNOTAVAIL,
Error::EFSM => errno::EFSM,
Error::ENOCOMPATPROTO => errno::ENOCOMPATPROTO,
Error::ETERM => errno::ETERM,
Error::EMTHREAD => errno::EMTHREAD,
}
}
pub fn from_raw(raw: i32) -> Error {
match raw {
errno::EACCES => Error::EACCES,
errno::EADDRINUSE => Error::EADDRINUSE,
errno::EAGAIN => Error::EAGAIN,
errno::EBUSY => Error::EBUSY,
errno::ECONNREFUSED => Error::ECONNREFUSED,
errno::EFAULT => Error::EFAULT,
errno::EHOSTUNREACH => Error::EHOSTUNREACH,
errno::EINPROGRESS => Error::EINPROGRESS,
errno::EINVAL => Error::EINVAL,
errno::EMFILE => Error::EMFILE,
errno::EMSGSIZE => Error::EMSGSIZE,
errno::ENAMETOOLONG => Error::ENAMETOOLONG,
errno::ENODEV => Error::ENODEV,
errno::ENOENT => Error::ENOENT,
errno::ENOMEM => Error::ENOMEM,
errno::ENOTCONN => Error::ENOTCONN,
errno::ENOTSOCK => Error::ENOTSOCK,
errno::EPROTO => Error::EPROTO,
errno::EPROTONOSUPPORT => Error::EPROTONOSUPPORT,
errno::ENOTSUP => Error::ENOTSUP,
errno::ENOBUFS => Error::ENOBUFS,
errno::ENETDOWN => Error::ENETDOWN,
errno::EADDRNOTAVAIL => Error::EADDRNOTAVAIL,
errno::EINTR => Error::EINTR,
errno::ENOTSUP_ALT => Error::ENOTSUP,
errno::EPROTONOSUPPORT_ALT => Error::EPROTONOSUPPORT,
errno::ENOBUFS_ALT => Error::ENOBUFS,
errno::ENETDOWN_ALT => Error::ENETDOWN,
errno::EADDRINUSE_ALT => Error::EADDRINUSE,
errno::EADDRNOTAVAIL_ALT => Error::EADDRNOTAVAIL,
errno::ECONNREFUSED_ALT => Error::ECONNREFUSED,
errno::EINPROGRESS_ALT => Error::EINPROGRESS,
errno::ENOTSOCK_ALT => Error::ENOTSOCK,
errno::EMSGSIZE_ALT => Error::EMSGSIZE,
errno::EFSM => Error::EFSM,
errno::ENOCOMPATPROTO => Error::ENOCOMPATPROTO,
errno::ETERM => Error::ETERM,
errno::EMTHREAD => Error::EMTHREAD,
x => unsafe {
let s = zmq_sys2::zmq_strerror(x);
panic!(
"unknown error [{}]: {}",
x,
str::from_utf8(ffi::CStr::from_ptr(s).to_bytes()).unwrap()
)
},
}
}
pub fn message(self) -> &'static str {
unsafe {
let s = zmq_sys2::zmq_strerror(self.to_raw());
let v: &'static [u8] = mem::transmute(ffi::CStr::from_ptr(s).to_bytes());
str::from_utf8(v).unwrap()
}
}
}
impl std::error::Error for Error {
fn description(&self) -> &str {
self.message()
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.message())
}
}
impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.message())
}
}
impl From<Error> for std::io::Error {
fn from(error: Error) -> Self {
use std::io::ErrorKind;
let kind = match error {
Error::ENOENT => ErrorKind::NotFound,
Error::EACCES => ErrorKind::PermissionDenied,
Error::ECONNREFUSED => ErrorKind::ConnectionRefused,
Error::ENOTCONN => ErrorKind::NotConnected,
Error::EADDRINUSE => ErrorKind::AddrInUse,
Error::EADDRNOTAVAIL => ErrorKind::AddrNotAvailable,
Error::EAGAIN => ErrorKind::WouldBlock,
Error::EINVAL => ErrorKind::InvalidInput,
Error::EINTR => ErrorKind::Interrupted,
_ => ErrorKind::Other,
};
std::io::Error::new(kind, error)
}
}
fn errno_to_error() -> Error {
Error::from_raw(unsafe { zmq_sys2::zmq_errno() })
}
pub fn version() -> (i32, i32, i32) {
let mut major = 0;
let mut minor = 0;
let mut patch = 0;
unsafe {
zmq_sys2::zmq_version(&mut major, &mut minor, &mut patch);
}
(major as i32, minor as i32, patch as i32)
}
struct RawContext {
ctx: *mut c_void,
}
impl RawContext {
fn term(&self) -> Result<()> {
zmq_try!(unsafe { zmq_sys2::zmq_ctx_term(self.ctx) });
Ok(())
}
}
unsafe impl Send for RawContext {}
unsafe impl Sync for RawContext {}
impl Drop for RawContext {
fn drop(&mut self) {
let mut e = self.term();
while e == Err(Error::EINTR) {
e = self.term();
}
}
}
#[derive(Clone)]
pub struct Context {
raw: Arc<RawContext>,
}
impl Context {
pub fn new() -> Context {
Context {
raw: Arc::new(RawContext {
ctx: unsafe { zmq_sys2::zmq_ctx_new() },
}),
}
}
pub fn get_io_threads(&self) -> Result<i32> {
let rc =
zmq_try!(unsafe { zmq_sys2::zmq_ctx_get(self.raw.ctx, zmq_sys2::ZMQ_IO_THREADS as _) });
Ok(rc as i32)
}
pub fn set_io_threads(&self, value: i32) -> Result<()> {
zmq_try!(unsafe {
zmq_sys2::zmq_ctx_set(self.raw.ctx, zmq_sys2::ZMQ_IO_THREADS as _, value as i32)
});
Ok(())
}
pub fn socket(&self, socket_type: SocketType) -> Result<Socket> {
let sock = unsafe { zmq_sys2::zmq_socket(self.raw.ctx, socket_type.to_raw()) };
if sock.is_null() {
return Err(errno_to_error());
}
Ok(Socket {
sock,
context: Some(self.clone()),
owned: true,
})
}
pub fn destroy(&mut self) -> Result<()> {
self.raw.term()
}
}
impl Default for Context {
fn default() -> Self {
Context::new()
}
}
pub struct Socket {
sock: *mut c_void,
#[allow(dead_code)]
context: Option<Context>,
owned: bool,
}
unsafe impl Send for Socket {}
impl Drop for Socket {
fn drop(&mut self) {
if self.owned && unsafe { zmq_sys2::zmq_close(self.sock) } == -1 {
panic!("{}", errno_to_error());
}
}
}
#[cfg(unix)]
impl AsRawFd for Socket {
fn as_raw_fd(&self) -> UnixRawFd {
self.get_fd().unwrap() as UnixRawFd
}
}
#[cfg(windows)]
impl AsRawSocket for Socket {
fn as_raw_socket(&self) -> RawSocket {
self.get_fd().unwrap() as RawSocket
}
}
macro_rules! sockopt_getter {
( $(#[$meta:meta])*
pub $getter:ident => $constant_name:ident as $ty:ty
) => {
$(#[$meta])*
pub fn $getter(&self) -> Result<$ty> {
<$ty as sockopt::Getter>::get(self.sock, zmq_sys2::$constant_name as c_int)
}
};
}
macro_rules! sockopt_setter {
( $(#[$meta:meta])*
pub $setter:ident => $constant_name:ident as $ty:ty
) => {
$(#[$meta])*
pub fn $setter(&self, value: $ty) -> Result<()> {
<$ty as sockopt::Setter>::set(self.sock, zmq_sys2::$constant_name as c_int, value)
}
};
}
macro_rules! sockopt_seq {
( META { $($meta:meta)* }, ) => ();
( META { $($meta:meta)* }, $(#[$item_meta:meta])* (_, $setter:ident) => $constant_name:ident as $ty:ty,
$($rest:tt)*
) => {
sockopt_setter! {
$(#[$meta])* $(#[$item_meta])*
pub $setter => $constant_name as $ty
}
sockopt_seq!(META { $($meta)* }, $($rest)*);
};
( META { $($meta:meta)* }, $(#[$item_meta:meta])* ($getter:ident) => $constant_name:ident as $ty:ty,
$($rest:tt)*
) => {
sockopt_getter! {
$(#[$meta])* $(#[$item_meta])*
pub $getter => $constant_name as $ty
}
sockopt_seq!(META { $($meta)* }, $($rest)*);
};
( META { $($meta:meta)* }, $(#[$item_meta:meta])* ($getter:ident, $setter:ident) => $constant_name:ident as $ty:ty,
$($rest:tt)*
) => {
sockopt_getter! {
$(#[$meta])* $(#[$item_meta])*
pub $getter => $constant_name as $ty
}
sockopt_setter! {
$(#[$meta])* $(#[$item_meta])*
pub $setter => $constant_name as $ty
}
sockopt_seq!(META { $($meta)* }, $($rest)*);
};
}
macro_rules! sockopts {
() => ();
( $($rest:tt)* ) => {
sockopt_seq!(META {}, $($rest)*);
};
}
pub trait Sendable {
fn send(self, socket: &Socket, flags: i32) -> Result<()>;
}
impl<T> Sendable for T
where
T: Into<Message>,
{
fn send(self, socket: &Socket, flags: i32) -> Result<()> {
let mut msg = self.into();
zmq_try!(unsafe { zmq_sys2::zmq_msg_send(msg_ptr(&mut msg), socket.sock, flags as c_int) });
Ok(())
}
}
impl Socket {
pub fn into_raw(mut self) -> *mut c_void {
self.owned = false;
self.sock
}
pub unsafe fn from_raw(sock: *mut c_void) -> Socket {
Socket {
sock,
context: None,
owned: true,
}
}
pub fn as_mut_ptr(&mut self) -> *mut c_void {
self.sock
}
pub fn bind(&self, endpoint: &str) -> Result<()> {
let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
zmq_try!(unsafe { zmq_sys2::zmq_bind(self.sock, c_str.as_ptr()) });
Ok(())
}
pub fn unbind(&self, endpoint: &str) -> Result<()> {
let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
zmq_try!(unsafe { zmq_sys2::zmq_unbind(self.sock, c_str.as_ptr()) });
Ok(())
}
pub fn connect(&self, endpoint: &str) -> Result<()> {
let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
zmq_try!(unsafe { zmq_sys2::zmq_connect(self.sock, c_str.as_ptr()) });
Ok(())
}
pub fn disconnect(&self, endpoint: &str) -> Result<()> {
let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
zmq_try!(unsafe { zmq_sys2::zmq_disconnect(self.sock, c_str.as_ptr()) });
Ok(())
}
pub fn monitor(&self, monitor_endpoint: &str, events: i32) -> Result<()> {
let c_str = ffi::CString::new(monitor_endpoint.as_bytes()).unwrap();
zmq_try!(unsafe {
zmq_sys2::zmq_socket_monitor(self.sock, c_str.as_ptr(), events as c_int)
});
Ok(())
}
pub fn send<T>(&self, data: T, flags: i32) -> Result<()>
where
T: Sendable,
{
data.send(self, flags)
}
#[deprecated(since = "0.9.0", note = "Use `send` instead")]
pub fn send_msg(&self, msg: Message, flags: i32) -> Result<()> {
self.send(msg, flags)
}
#[deprecated(since = "0.9.0", note = "Use `send` instead")]
pub fn send_str(&self, data: &str, flags: i32) -> Result<()> {
self.send(data, flags)
}
pub fn send_multipart<I, T>(&self, iter: I, flags: i32) -> Result<()>
where
I: IntoIterator<Item = T>,
T: Into<Message>,
{
let mut last_part: Option<T> = None;
for part in iter {
let maybe_last = last_part.take();
if let Some(last) = maybe_last {
self.send(last.into(), flags | SNDMORE)?;
}
last_part = Some(part);
}
if let Some(last) = last_part {
self.send(last.into(), flags)
} else {
Ok(())
}
}
pub fn recv(&self, msg: &mut Message, flags: i32) -> Result<()> {
zmq_try!(unsafe { zmq_sys2::zmq_msg_recv(msg_ptr(msg), self.sock, flags as c_int) });
Ok(())
}
pub fn recv_into(&self, bytes: &mut [u8], flags: i32) -> Result<usize> {
let bytes_ptr = bytes.as_mut_ptr() as *mut c_void;
let rc = zmq_try!(unsafe {
zmq_sys2::zmq_recv(self.sock, bytes_ptr, bytes.len(), flags as c_int)
});
Ok(rc as usize)
}
pub fn recv_msg(&self, flags: i32) -> Result<Message> {
let mut msg = Message::new();
self.recv(&mut msg, flags).map(|_| msg)
}
pub fn recv_bytes(&self, flags: i32) -> Result<Vec<u8>> {
self.recv_msg(flags).map(|msg| msg.to_vec())
}
pub fn recv_string(&self, flags: i32) -> Result<result::Result<String, Vec<u8>>> {
self.recv_bytes(flags)
.map(|bytes| String::from_utf8(bytes).map_err(FromUtf8Error::into_bytes))
}
pub fn recv_multipart(&self, flags: i32) -> Result<Vec<Vec<u8>>> {
let mut parts: Vec<Vec<u8>> = vec![];
loop {
let part = self.recv_bytes(flags)?;
parts.push(part);
let more_parts = self.get_rcvmore()?;
if !more_parts {
break;
}
}
Ok(parts)
}
sockopts! {
(is_ipv6, set_ipv6) => ZMQ_IPV6 as bool,
(is_immediate, set_immediate) => ZMQ_IMMEDIATE as bool,
(is_plain_server, set_plain_server) => ZMQ_PLAIN_SERVER as bool,
(is_conflate, set_conflate) => ZMQ_CONFLATE as bool,
(is_probe_router, set_probe_router) => ZMQ_PROBE_ROUTER as bool,
(is_router_mandatory, set_router_mandatory) => ZMQ_ROUTER_MANDATORY as bool,
(is_router_handover, set_router_handover) => ZMQ_ROUTER_HANDOVER as bool,
(is_curve_server, set_curve_server) => ZMQ_CURVE_SERVER as bool,
(is_gssapi_server, set_gssapi_server) => ZMQ_GSSAPI_SERVER as bool,
(is_gssapi_plaintext, set_gssapi_plaintext) => ZMQ_GSSAPI_PLAINTEXT as bool,
(_, set_req_relaxed) => ZMQ_REQ_RELAXED as bool,
(_, set_req_correlate) => ZMQ_REQ_CORRELATE as bool,
}
pub fn get_socket_type(&self) -> Result<SocketType> {
sockopt::get(self.sock, zmq_sys2::ZMQ_TYPE as c_int).map(SocketType::from_raw)
}
pub fn get_rcvmore(&self) -> Result<bool> {
sockopt::get(self.sock, zmq_sys2::ZMQ_RCVMORE as c_int).map(|o: i64| o == 1i64)
}
sockopts! {
(get_maxmsgsize, set_maxmsgsize) => ZMQ_MAXMSGSIZE as i64,
(get_sndhwm, set_sndhwm) => ZMQ_SNDHWM as i32,
(get_rcvhwm, set_rcvhwm) => ZMQ_RCVHWM as i32,
(get_affinity, set_affinity) => ZMQ_AFFINITY as u64,
(get_rate, set_rate) => ZMQ_RATE as i32,
(get_recovery_ivl, set_recovery_ivl) => ZMQ_RECOVERY_IVL as i32,
(get_sndbuf, set_sndbuf) => ZMQ_SNDBUF as i32,
(get_rcvbuf, set_rcvbuf) => ZMQ_RCVBUF as i32,
(get_tos, set_tos) => ZMQ_TOS as i32,
(get_linger, set_linger) => ZMQ_LINGER as i32,
(get_reconnect_ivl, set_reconnect_ivl) => ZMQ_RECONNECT_IVL as i32,
(get_reconnect_ivl_max, set_reconnect_ivl_max) => ZMQ_RECONNECT_IVL_MAX as i32,
(get_backlog, set_backlog) => ZMQ_BACKLOG as i32,
(get_fd) => ZMQ_FD as RawFd,
(get_events) => ZMQ_EVENTS as PollEvents,
(get_multicast_hops, set_multicast_hops) => ZMQ_MULTICAST_HOPS as i32,
(get_rcvtimeo, set_rcvtimeo) => ZMQ_RCVTIMEO as i32,
(get_sndtimeo, set_sndtimeo) => ZMQ_SNDTIMEO as i32,
(get_tcp_keepalive, set_tcp_keepalive) => ZMQ_TCP_KEEPALIVE as i32,
(get_tcp_keepalive_cnt, set_tcp_keepalive_cnt) => ZMQ_TCP_KEEPALIVE_CNT as i32,
(get_tcp_keepalive_idle, set_tcp_keepalive_idle) => ZMQ_TCP_KEEPALIVE_IDLE as i32,
(get_tcp_keepalive_intvl, set_tcp_keepalive_intvl) => ZMQ_TCP_KEEPALIVE_INTVL as i32,
(get_handshake_ivl, set_handshake_ivl) => ZMQ_HANDSHAKE_IVL as i32,
(_, set_identity) => ZMQ_ROUTING_ID as &[u8],
(_, set_subscribe) => ZMQ_SUBSCRIBE as &[u8],
(_, set_unsubscribe) => ZMQ_UNSUBSCRIBE as &[u8],
(get_heartbeat_ivl, set_heartbeat_ivl) => ZMQ_HEARTBEAT_IVL as i32,
(get_heartbeat_ttl, set_heartbeat_ttl) => ZMQ_HEARTBEAT_TTL as i32,
(get_heartbeat_timeout, set_heartbeat_timeout) => ZMQ_HEARTBEAT_TIMEOUT as i32,
(get_connect_timeout, set_connect_timeout) => ZMQ_CONNECT_TIMEOUT as i32,
}
pub fn get_identity(&self) -> Result<Vec<u8>> {
sockopt::get_bytes(self.sock, zmq_sys2::ZMQ_ROUTING_ID as c_int, 255)
}
pub fn get_socks_proxy(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(self.sock, zmq_sys2::ZMQ_SOCKS_PROXY as c_int, 255, true)
}
pub fn get_mechanism(&self) -> Result<Mechanism> {
sockopt::get(self.sock, zmq_sys2::ZMQ_MECHANISM as c_int).map(|mech| match mech {
zmq_sys2::ZMQ_NULL => Mechanism::ZMQ_NULL,
zmq_sys2::ZMQ_PLAIN => Mechanism::ZMQ_PLAIN,
zmq_sys2::ZMQ_CURVE => Mechanism::ZMQ_CURVE,
zmq_sys2::ZMQ_GSSAPI => Mechanism::ZMQ_GSSAPI,
_ => panic!("Mechanism is out of range!"),
})
}
pub fn get_plain_username(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(self.sock, zmq_sys2::ZMQ_PLAIN_USERNAME as c_int, 255, true)
}
pub fn get_plain_password(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(self.sock, zmq_sys2::ZMQ_PLAIN_PASSWORD as c_int, 256, true)
}
pub fn get_zap_domain(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(self.sock, zmq_sys2::ZMQ_ZAP_DOMAIN as c_int, 255, true)
}
pub fn get_last_endpoint(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(
self.sock,
zmq_sys2::ZMQ_LAST_ENDPOINT as c_int,
256 + 9 + 1,
true,
)
}
pub fn get_curve_publickey(&self) -> Result<Vec<u8>> {
sockopt::get_bytes(self.sock, zmq_sys2::ZMQ_CURVE_PUBLICKEY as c_int, 32)
}
pub fn get_curve_secretkey(&self) -> Result<Vec<u8>> {
sockopt::get_bytes(self.sock, zmq_sys2::ZMQ_CURVE_SECRETKEY as c_int, 32)
}
pub fn get_curve_serverkey(&self) -> Result<Vec<u8>> {
sockopt::get_bytes(self.sock, zmq_sys2::ZMQ_CURVE_SERVERKEY as c_int, 32)
}
pub fn get_gssapi_principal(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(
self.sock,
zmq_sys2::ZMQ_GSSAPI_PRINCIPAL as c_int,
260,
true,
)
}
pub fn get_gssapi_service_principal(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(
self.sock,
zmq_sys2::ZMQ_GSSAPI_SERVICE_PRINCIPAL as c_int,
260,
true,
)
}
sockopts! {
(_, set_socks_proxy) => ZMQ_SOCKS_PROXY as Option<&str>,
(_, set_plain_username) => ZMQ_PLAIN_USERNAME as Option<&str>,
(_, set_plain_password) => ZMQ_PLAIN_PASSWORD as Option<&str>,
(_, set_zap_domain) => ZMQ_ZAP_DOMAIN as &str,
(_, set_xpub_welcome_msg) => ZMQ_XPUB_WELCOME_MSG as Option<&str>,
(_, set_xpub_verbose) => ZMQ_XPUB_VERBOSE as bool,
(_, set_curve_publickey) => ZMQ_CURVE_PUBLICKEY as &[u8],
(_, set_curve_secretkey) => ZMQ_CURVE_SECRETKEY as &[u8],
(_, set_curve_serverkey) => ZMQ_CURVE_SERVERKEY as &[u8],
(_, set_gssapi_principal) => ZMQ_GSSAPI_PRINCIPAL as &str,
(_, set_gssapi_service_principal) => ZMQ_GSSAPI_SERVICE_PRINCIPAL as &str,
}
pub fn as_poll_item(&self, events: PollEvents) -> PollItem {
PollItem {
socket: self.sock,
fd: 0,
events: events.bits(),
revents: 0,
marker: PhantomData,
}
}
pub fn poll(&self, events: PollEvents, timeout_ms: i64) -> Result<i32> {
poll(&mut [self.as_poll_item(events)], timeout_ms)
}
}
bitflags! {
pub struct PollEvents: i16 {
const POLLIN = zmq_sys2::ZMQ_POLLIN as i16;
const POLLOUT = zmq_sys2::ZMQ_POLLOUT as i16;
const POLLERR = zmq_sys2::ZMQ_POLLERR as i16;
}
}
pub const POLLIN: PollEvents = PollEvents::POLLIN;
pub const POLLOUT: PollEvents = PollEvents::POLLOUT;
pub const POLLERR: PollEvents = PollEvents::POLLERR;
#[repr(C)]
pub struct PollItem<'a> {
socket: *mut c_void,
fd: RawFd,
events: c_short,
revents: c_short,
marker: PhantomData<&'a Socket>,
}
impl<'a> PollItem<'a> {
pub fn from_fd(fd: RawFd, events: PollEvents) -> PollItem<'a> {
PollItem {
socket: ptr::null_mut(),
fd,
events: events.bits(),
revents: 0,
marker: PhantomData,
}
}
pub fn set_events(&mut self, events: PollEvents) {
self.events = events.bits();
}
pub fn get_revents(&self) -> PollEvents {
PollEvents::from_bits_truncate(self.revents)
}
pub fn is_readable(&self) -> bool {
(self.revents & POLLIN.bits()) != 0
}
pub fn is_writable(&self) -> bool {
(self.revents & POLLOUT.bits()) != 0
}
pub fn is_error(&self) -> bool {
(self.revents & POLLERR.bits()) != 0
}
pub fn has_socket(&self, socket: &Socket) -> bool {
self.socket == socket.sock
}
pub fn has_fd(&self, fd: RawFd) -> bool {
self.socket.is_null() && self.fd == fd
}
}
pub fn poll(items: &mut [PollItem], timeout: i64) -> Result<i32> {
let rc = zmq_try!(unsafe {
zmq_sys2::zmq_poll(
items.as_mut_ptr() as *mut zmq_sys2::zmq_pollitem_t,
items.len() as c_int,
timeout as c_long,
)
});
Ok(rc as i32)
}
pub fn proxy(frontend: &Socket, backend: &Socket) -> Result<()> {
zmq_try!(unsafe { zmq_sys2::zmq_proxy(frontend.sock, backend.sock, ptr::null_mut()) });
Ok(())
}
pub fn proxy_with_capture(
frontend: &mut Socket,
backend: &mut Socket,
capture: &mut Socket,
) -> Result<()> {
zmq_try!(unsafe { zmq_sys2::zmq_proxy(frontend.sock, backend.sock, capture.sock) });
Ok(())
}
pub fn proxy_steerable(
frontend: &mut Socket,
backend: &mut Socket,
control: &mut Socket,
) -> Result<()> {
zmq_try!(unsafe {
zmq_sys2::zmq_proxy_steerable(frontend.sock, backend.sock, ptr::null_mut(), control.sock)
});
Ok(())
}
pub fn proxy_steerable_with_capture(
frontend: &mut Socket,
backend: &mut Socket,
capture: &mut Socket,
control: &mut Socket,
) -> Result<()> {
zmq_try!(unsafe {
zmq_sys2::zmq_proxy_steerable(frontend.sock, backend.sock, capture.sock, control.sock)
});
Ok(())
}
pub fn has(capability: &str) -> Option<bool> {
let c_str = ffi::CString::new(capability).unwrap();
unsafe { Some(zmq_sys2::zmq_has(c_str.as_ptr()) == 1) }
}
#[derive(Debug)]
pub struct CurveKeyPair {
pub public_key: [u8; 32],
pub secret_key: [u8; 32],
}
impl CurveKeyPair {
pub fn new() -> Result<CurveKeyPair> {
let mut ffi_public_key = [0u8; 41];
let mut ffi_secret_key = [0u8; 41];
zmq_try!(unsafe {
zmq_sys2::zmq_curve_keypair(
ffi_public_key.as_mut_ptr() as *mut libc::c_char,
ffi_secret_key.as_mut_ptr() as *mut libc::c_char,
)
});
let mut pair = CurveKeyPair {
public_key: [0; 32],
secret_key: [0; 32],
};
unsafe {
zmq_sys2::zmq_z85_decode(
pair.public_key.as_mut_ptr(),
ffi_public_key.as_ptr() as *mut libc::c_char,
);
zmq_sys2::zmq_z85_decode(
pair.secret_key.as_mut_ptr(),
ffi_secret_key.as_ptr() as *mut libc::c_char,
);
}
Ok(pair)
}
}
#[derive(Debug)]
pub enum EncodeError {
BadLength,
FromUtf8Error(FromUtf8Error),
}
impl From<FromUtf8Error> for EncodeError {
fn from(err: FromUtf8Error) -> Self {
EncodeError::FromUtf8Error(err)
}
}
impl fmt::Display for EncodeError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
EncodeError::BadLength => write!(f, "Invalid data length. Should be multiple of 4."),
EncodeError::FromUtf8Error(ref e) => write!(f, "UTF8 conversion error: {}", e),
}
}
}
impl std::error::Error for EncodeError {}
pub fn z85_encode(data: &[u8]) -> result::Result<String, EncodeError> {
if data.len() % 4 != 0 {
return Err(EncodeError::BadLength);
}
let len = data.len() * 5 / 4 + 1;
let mut dest = vec![0u8; len];
unsafe {
zmq_sys2::zmq_z85_encode(
dest.as_mut_ptr() as *mut libc::c_char,
data.as_ptr(),
data.len(),
);
}
dest.truncate(len - 1);
String::from_utf8(dest).map_err(EncodeError::FromUtf8Error)
}
#[derive(Debug)]
pub enum DecodeError {
BadLength,
NulError(ffi::NulError),
}
impl From<ffi::NulError> for DecodeError {
fn from(err: ffi::NulError) -> Self {
DecodeError::NulError(err)
}
}
impl fmt::Display for DecodeError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
DecodeError::BadLength => write!(f, "Invalid data length. Should be multiple of 5."),
DecodeError::NulError(ref e) => write!(f, "Nul byte error: {}", e),
}
}
}
impl std::error::Error for DecodeError {}
pub fn z85_decode(data: &str) -> result::Result<Vec<u8>, DecodeError> {
if data.len() % 5 != 0 {
return Err(DecodeError::BadLength);
}
let len = data.len() * 4 / 5;
let mut dest = vec![0u8; len];
let c_str = ffi::CString::new(data)?;
unsafe {
zmq_sys2::zmq_z85_decode(dest.as_mut_ptr(), c_str.into_raw());
}
Ok(dest)
}