#![allow(trivial_numeric_casts)]
use bitflags::bitflags;
use libc::{c_int, c_long, c_short};
use log::debug;
use std::ffi;
use std::fmt;
use std::marker::PhantomData;
use std::os::raw::c_void;
use std::result;
use std::string::FromUtf8Error;
use std::sync::Arc;
use std::{mem, ptr, str};
use zmq_sys::{errno, RawFd};
macro_rules! zmq_try {
($($tt:tt)*) => {{
let rc = $($tt)*;
if rc == -1 {
return Err(crate::errno_to_error());
}
rc
}}
}
mod message;
mod sockopt;
use crate::message::msg_ptr;
pub use crate::message::Message;
pub use crate::SocketType::*;
pub type Result<T> = result::Result<T, Error>;
#[allow(non_camel_case_types)]
#[derive(Clone, Debug, PartialEq)]
pub enum SocketType {
PAIR = 0,
PUB = 1,
SUB = 2,
REQ = 3,
REP = 4,
DEALER = 5,
ROUTER = 6,
PULL = 7,
PUSH = 8,
XPUB = 9,
XSUB = 10,
STREAM = 11,
}
impl Copy for SocketType {}
#[allow(non_camel_case_types)]
#[derive(Clone, Debug, PartialEq)]
pub enum SocketEvent {
CONNECTED = 0x0001,
CONNECT_DELAYED = 0x0002,
CONNECT_RETRIED = 0x0004,
LISTENING = 0x0008,
BIND_FAILED = 0x0010,
ACCEPTED = 0x0020,
ACCEPT_FAILED = 0x0040,
CLOSED = 0x0080,
CLOSE_FAILED = 0x0100,
DISCONNECTED = 0x0200,
MONITOR_STOPPED = 0x0400,
HANDSHAKE_FAILED_NO_DETAIL = 0x0800,
HANDSHAKE_SUCCEEDED = 0x1000,
HANDSHAKE_FAILED_PROTOCOL = 0x2000,
HANDSHAKE_FAILED_AUTH = 0x4000,
ALL = 0xFFFF,
}
impl Copy for SocketEvent {}
impl SocketEvent {
pub fn to_raw(self) -> u16 {
self as u16
}
pub fn from_raw(raw: u16) -> SocketEvent {
match raw {
0x0001 => SocketEvent::CONNECTED,
0x0002 => SocketEvent::CONNECT_DELAYED,
0x0004 => SocketEvent::CONNECT_RETRIED,
0x0008 => SocketEvent::LISTENING,
0x0010 => SocketEvent::BIND_FAILED,
0x0020 => SocketEvent::ACCEPTED,
0x0040 => SocketEvent::ACCEPT_FAILED,
0x0080 => SocketEvent::CLOSED,
0x0100 => SocketEvent::CLOSE_FAILED,
0x0200 => SocketEvent::DISCONNECTED,
0x0400 => SocketEvent::MONITOR_STOPPED,
0x0800 => SocketEvent::HANDSHAKE_FAILED_NO_DETAIL,
0x1000 => SocketEvent::HANDSHAKE_SUCCEEDED,
0x2000 => SocketEvent::HANDSHAKE_FAILED_PROTOCOL,
0x4000 => SocketEvent::HANDSHAKE_FAILED_AUTH,
0xFFFF => SocketEvent::ALL,
x => panic!("unknown event type {}", x),
}
}
}
pub static DONTWAIT: i32 = 1;
pub static SNDMORE: i32 = 2;
#[allow(non_camel_case_types, dead_code)]
#[derive(Clone, Debug, PartialEq)]
enum Constants {
ZMQ_AFFINITY = 4,
ZMQ_IDENTITY = 5,
ZMQ_SUBSCRIBE = 6,
ZMQ_UNSUBSCRIBE = 7,
ZMQ_RATE = 8,
ZMQ_RECOVERY_IVL = 9,
ZMQ_SNDBUF = 11,
ZMQ_RCVBUF = 12,
ZMQ_RCVMORE = 13,
ZMQ_FD = 14,
ZMQ_EVENTS = 15,
ZMQ_TYPE = 16,
ZMQ_LINGER = 17,
ZMQ_RECONNECT_IVL = 18,
ZMQ_BACKLOG = 19,
ZMQ_RECONNECT_IVL_MAX = 21,
ZMQ_MAXMSGSIZE = 22,
ZMQ_SNDHWM = 23,
ZMQ_RCVHWM = 24,
ZMQ_MULTICAST_HOPS = 25,
ZMQ_RCVTIMEO = 27,
ZMQ_SNDTIMEO = 28,
ZMQ_LAST_ENDPOINT = 32,
ZMQ_ROUTER_MANDATORY = 33,
ZMQ_TCP_KEEPALIVE = 34,
ZMQ_TCP_KEEPALIVE_CNT = 35,
ZMQ_TCP_KEEPALIVE_IDLE = 36,
ZMQ_TCP_KEEPALIVE_INTVL = 37,
ZMQ_IMMEDIATE = 39,
ZMQ_XPUB_VERBOSE = 40,
ZMQ_ROUTER_RAW = 41,
ZMQ_IPV6 = 42,
ZMQ_MECHANISM = 43,
ZMQ_PLAIN_SERVER = 44,
ZMQ_PLAIN_USERNAME = 45,
ZMQ_PLAIN_PASSWORD = 46,
ZMQ_CURVE_SERVER = 47,
ZMQ_CURVE_PUBLICKEY = 48,
ZMQ_CURVE_SECRETKEY = 49,
ZMQ_CURVE_SERVERKEY = 50,
ZMQ_PROBE_ROUTER = 51,
ZMQ_REQ_CORRELATE = 52,
ZMQ_REQ_RELAXED = 53,
ZMQ_CONFLATE = 54,
ZMQ_ZAP_DOMAIN = 55,
ZMQ_ROUTER_HANDOVER = 56,
ZMQ_TOS = 57,
ZMQ_CONNECT_RID = 61,
ZMQ_GSSAPI_SERVER = 62,
ZMQ_GSSAPI_PRINCIPAL = 63,
ZMQ_GSSAPI_SERVICE_PRINCIPAL = 64,
ZMQ_GSSAPI_PLAINTEXT = 65,
ZMQ_HANDSHAKE_IVL = 66,
ZMQ_SOCKS_PROXY = 68,
ZMQ_XPUB_NODROP = 69,
ZMQ_BLOCKY = 70,
ZMQ_XPUB_MANUAL = 71,
ZMQ_XPUB_WELCOME_MSG = 72,
ZMQ_STREAM_NOTIFY = 73,
ZMQ_INVERT_MATCHING = 74,
ZMQ_HEARTBEAT_IVL = 75,
ZMQ_HEARTBEAT_TTL = 76,
ZMQ_HEARTBEAT_TIMEOUT = 77,
ZMQ_XPUB_VERBOSER = 78,
ZMQ_CONNECT_TIMEOUT = 79,
ZMQ_TCP_MAXRT = 80,
ZMQ_THREAD_SAFE = 81,
ZMQ_MULTICAST_MAXTPDU = 84,
ZMQ_VMCI_BUFFER_SIZE = 85,
ZMQ_VMCI_BUFFER_MIN_SIZE = 86,
ZMQ_VMCI_BUFFER_MAX_SIZE = 87,
ZMQ_VMCI_CONNECT_TIMEOUT = 88,
ZMQ_USE_FD = 89,
ZMQ_MSG_MORE = 1,
ZMQ_MSG_SHARED = 128,
ZMQ_MSG_MASK = 129,
}
impl Copy for Constants {}
impl Constants {
fn to_raw(self) -> i32 {
self as i32
}
}
#[allow(non_camel_case_types)]
#[derive(Clone, Debug, PartialEq)]
pub enum Mechanism {
ZMQ_NULL = 0,
ZMQ_PLAIN = 1,
ZMQ_CURVE = 2,
ZMQ_GSSAPI = 3,
}
impl Copy for Mechanism {}
#[derive(Clone, Eq, PartialEq)]
pub enum Error {
EACCES = errno::EACCES as isize,
EADDRINUSE = errno::EADDRINUSE as isize,
EAGAIN = errno::EAGAIN as isize,
EBUSY = errno::EBUSY as isize,
ECONNREFUSED = errno::ECONNREFUSED as isize,
EFAULT = errno::EFAULT as isize,
EINTR = errno::EINTR as isize,
EHOSTUNREACH = errno::EHOSTUNREACH as isize,
EINPROGRESS = errno::EINPROGRESS as isize,
EINVAL = errno::EINVAL as isize,
EMFILE = errno::EMFILE as isize,
EMSGSIZE = errno::EMSGSIZE as isize,
ENAMETOOLONG = errno::ENAMETOOLONG as isize,
ENODEV = errno::ENODEV as isize,
ENOENT = errno::ENOENT as isize,
ENOMEM = errno::ENOMEM as isize,
ENOTCONN = errno::ENOTCONN as isize,
ENOTSOCK = errno::ENOTSOCK as isize,
EPROTO = errno::EPROTO as isize,
EPROTONOSUPPORT = errno::EPROTONOSUPPORT as isize,
ENOTSUP = errno::ENOTSUP as isize,
ENOBUFS = errno::ENOBUFS as isize,
ENETDOWN = errno::ENETDOWN as isize,
EADDRNOTAVAIL = errno::EADDRNOTAVAIL as isize,
EFSM = errno::EFSM as isize,
ENOCOMPATPROTO = errno::ENOCOMPATPROTO as isize,
ETERM = errno::ETERM as isize,
EMTHREAD = errno::EMTHREAD as isize,
}
impl Copy for Error {}
impl Error {
pub fn to_raw(self) -> i32 {
self as i32
}
pub fn from_raw(raw: i32) -> Error {
match raw {
errno::EACCES => Error::EACCES,
errno::EADDRINUSE => Error::EADDRINUSE,
errno::EAGAIN => Error::EAGAIN,
errno::EBUSY => Error::EBUSY,
errno::ECONNREFUSED => Error::ECONNREFUSED,
errno::EFAULT => Error::EFAULT,
errno::EHOSTUNREACH => Error::EHOSTUNREACH,
errno::EINPROGRESS => Error::EINPROGRESS,
errno::EINVAL => Error::EINVAL,
errno::EMFILE => Error::EMFILE,
errno::EMSGSIZE => Error::EMSGSIZE,
errno::ENAMETOOLONG => Error::ENAMETOOLONG,
errno::ENODEV => Error::ENODEV,
errno::ENOENT => Error::ENOENT,
errno::ENOMEM => Error::ENOMEM,
errno::ENOTCONN => Error::ENOTCONN,
errno::ENOTSOCK => Error::ENOTSOCK,
errno::EPROTO => Error::EPROTO,
errno::EPROTONOSUPPORT => Error::EPROTONOSUPPORT,
errno::ENOTSUP => Error::ENOTSUP,
errno::ENOBUFS => Error::ENOBUFS,
errno::ENETDOWN => Error::ENETDOWN,
errno::EADDRNOTAVAIL => Error::EADDRNOTAVAIL,
errno::EINTR => Error::EINTR,
156_384_714 => Error::EPROTONOSUPPORT,
156_384_715 => Error::ENOBUFS,
156_384_716 => Error::ENETDOWN,
156_384_717 => Error::EADDRINUSE,
156_384_718 => Error::EADDRNOTAVAIL,
156_384_719 => Error::ECONNREFUSED,
156_384_720 => Error::EINPROGRESS,
156_384_721 => Error::ENOTSOCK,
156_384_763 => Error::EFSM,
156_384_764 => Error::ENOCOMPATPROTO,
156_384_765 => Error::ETERM,
156_384_766 => Error::EMTHREAD,
x => unsafe {
let s = zmq_sys::zmq_strerror(x);
panic!(
"unknown error [{}]: {}",
x,
str::from_utf8(ffi::CStr::from_ptr(s).to_bytes()).unwrap()
)
},
}
}
}
impl std::error::Error for Error {
fn description(&self) -> &str {
unsafe {
let s = zmq_sys::zmq_strerror(*self as c_int);
let v: &'static [u8] = mem::transmute(ffi::CStr::from_ptr(s).to_bytes());
str::from_utf8(v).unwrap()
}
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
unsafe {
let s = zmq_sys::zmq_strerror(*self as c_int);
let v: &'static [u8] = mem::transmute(ffi::CStr::from_ptr(s).to_bytes());
write!(f, "{}", str::from_utf8(v).unwrap())
}
}
}
impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
unsafe {
let s = zmq_sys::zmq_strerror(*self as c_int);
write!(
f,
"{}",
str::from_utf8(ffi::CStr::from_ptr(s).to_bytes()).unwrap()
)
}
}
}
impl From<Error> for std::io::Error {
fn from(error: Error) -> Self {
use std::io::ErrorKind;
let kind = match error {
Error::ENOENT => ErrorKind::NotFound,
Error::EACCES => ErrorKind::PermissionDenied,
Error::ECONNREFUSED => ErrorKind::ConnectionRefused,
Error::ENOTCONN => ErrorKind::NotConnected,
Error::EADDRINUSE => ErrorKind::AddrInUse,
Error::EADDRNOTAVAIL => ErrorKind::AddrNotAvailable,
Error::EAGAIN => ErrorKind::WouldBlock,
Error::EINVAL => ErrorKind::InvalidInput,
Error::EINTR => ErrorKind::Interrupted,
_ => ErrorKind::Other,
};
std::io::Error::new(kind, error)
}
}
fn errno_to_error() -> Error {
Error::from_raw(unsafe { zmq_sys::zmq_errno() })
}
pub fn version() -> (i32, i32, i32) {
let mut major = 0;
let mut minor = 0;
let mut patch = 0;
unsafe {
zmq_sys::zmq_version(&mut major, &mut minor, &mut patch);
}
(major as i32, minor as i32, patch as i32)
}
struct RawContext {
ctx: *mut c_void,
}
impl RawContext {
fn destroy(&self) -> Result<()> {
zmq_try!(unsafe { zmq_sys::zmq_ctx_destroy(self.ctx) });
Ok(())
}
}
unsafe impl Send for RawContext {}
unsafe impl Sync for RawContext {}
impl Drop for RawContext {
fn drop(&mut self) {
debug!("context dropped");
let mut e = self.destroy();
while e == Err(Error::EINTR) {
e = self.destroy();
}
}
}
#[derive(Clone)]
pub struct Context {
raw: Arc<RawContext>,
}
impl Context {
pub fn new() -> Context {
Context {
raw: Arc::new(RawContext {
ctx: unsafe { zmq_sys::zmq_ctx_new() },
}),
}
}
pub fn socket(&self, socket_type: SocketType) -> Result<Socket> {
let sock = unsafe { zmq_sys::zmq_socket(self.raw.ctx, socket_type as c_int) };
if sock.is_null() {
return Err(errno_to_error());
}
Ok(Socket {
sock,
context: Some(self.clone()),
owned: true,
})
}
pub fn destroy(&mut self) -> Result<()> {
self.raw.destroy()
}
}
impl Default for Context {
fn default() -> Self {
Context::new()
}
}
pub struct Socket {
sock: *mut c_void,
#[allow(dead_code)]
context: Option<Context>,
owned: bool,
}
unsafe impl Send for Socket {}
impl Drop for Socket {
fn drop(&mut self) {
if self.owned {
if unsafe { zmq_sys::zmq_close(self.sock) } == -1 {
panic!(errno_to_error());
} else {
debug!("socket dropped");
}
}
}
}
macro_rules! sockopt_getter {
( $(#[$meta:meta])*
pub $getter:ident => $constant_name:ident as $ty:ty
) => {
$(#[$meta])*
pub fn $getter(&self) -> Result<$ty> {
<$ty as sockopt::Getter>::get(self.sock, Constants::$constant_name.to_raw())
}
};
}
macro_rules! sockopt_setter {
( $(#[$meta:meta])*
pub $setter:ident => $constant_name:ident as $ty:ty
) => {
$(#[$meta])*
pub fn $setter(&self, value: $ty) -> Result<()> {
<$ty as sockopt::Setter>::set(self.sock, Constants::$constant_name.to_raw(), value)
}
};
}
macro_rules! sockopt_seq {
( META { $($meta:meta)* }, ) => ();
( META { $($meta:meta)* }, $(#[$item_meta:meta])* (_, $setter:ident) => $constant_name:ident as $ty:ty,
$($rest:tt)*
) => {
sockopt_setter! {
$(#[$meta])* $(#[$item_meta])*
pub $setter => $constant_name as $ty
}
sockopt_seq!(META { $($meta)* }, $($rest)*);
};
( META { $($meta:meta)* }, $(#[$item_meta:meta])* ($getter:ident) => $constant_name:ident as $ty:ty,
$($rest:tt)*
) => {
sockopt_getter! {
$(#[$meta])* $(#[$item_meta])*
pub $getter => $constant_name as $ty
}
sockopt_seq!(META { $($meta)* }, $($rest)*);
};
( META { $($meta:meta)* }, $(#[$item_meta:meta])* ($getter:ident, $setter:ident) => $constant_name:ident as $ty:ty,
$($rest:tt)*
) => {
sockopt_getter! {
$(#[$meta])* $(#[$item_meta])*
pub $getter => $constant_name as $ty
}
sockopt_setter! {
$(#[$meta])* $(#[$item_meta])*
pub $setter => $constant_name as $ty
}
sockopt_seq!(META { $($meta)* }, $($rest)*);
};
}
macro_rules! sockopts {
() => ();
( $($rest:tt)* ) => {
sockopt_seq!(META {}, $($rest)*);
};
}
pub trait Sendable {
fn send(self, socket: &Socket, flags: i32) -> Result<()>;
}
impl<T> Sendable for T
where
T: Into<Message>,
{
fn send(self, socket: &Socket, flags: i32) -> Result<()> {
let mut msg = self.into();
zmq_try!(unsafe { zmq_sys::zmq_msg_send(msg_ptr(&mut msg), socket.sock, flags as c_int) });
Ok(())
}
}
impl Socket {
pub fn into_raw(mut self) -> *mut c_void {
self.owned = false;
self.sock
}
pub unsafe fn from_raw(sock: *mut c_void) -> Socket {
Socket {
sock,
context: None,
owned: true,
}
}
pub fn as_mut_ptr(&mut self) -> *mut c_void {
self.sock
}
pub fn bind(&self, endpoint: &str) -> Result<()> {
let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
zmq_try!(unsafe { zmq_sys::zmq_bind(self.sock, c_str.as_ptr()) });
Ok(())
}
pub fn connect(&self, endpoint: &str) -> Result<()> {
let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
zmq_try!(unsafe { zmq_sys::zmq_connect(self.sock, c_str.as_ptr()) });
Ok(())
}
pub fn disconnect(&self, endpoint: &str) -> Result<()> {
let c_str = ffi::CString::new(endpoint.as_bytes()).unwrap();
zmq_try!(unsafe { zmq_sys::zmq_disconnect(self.sock, c_str.as_ptr()) });
Ok(())
}
pub fn monitor(&self, monitor_endpoint: &str, events: i32) -> Result<()> {
let c_str = ffi::CString::new(monitor_endpoint.as_bytes()).unwrap();
zmq_try!(unsafe {
zmq_sys::zmq_socket_monitor(self.sock, c_str.as_ptr(), events as c_int)
});
Ok(())
}
pub fn send<T>(&self, data: T, flags: i32) -> Result<()>
where
T: Sendable,
{
data.send(self, flags)
}
#[deprecated(since = "0.9.0", note = "Use `send` instead")]
pub fn send_msg(&self, msg: Message, flags: i32) -> Result<()> {
self.send(msg, flags)
}
#[deprecated(since = "0.9.0", note = "Use `send` instead")]
pub fn send_str(&self, data: &str, flags: i32) -> Result<()> {
self.send(data, flags)
}
pub fn send_multipart<I, T>(&self, iter: I, flags: i32) -> Result<()>
where
I: IntoIterator<Item = T>,
T: Into<Message>,
{
let mut last_part: Option<T> = None;
for part in iter {
let maybe_last = last_part.take();
if let Some(last) = maybe_last {
self.send(last.into(), flags | SNDMORE)?;
}
last_part = Some(part);
}
if let Some(last) = last_part {
self.send(last.into(), flags)
} else {
Ok(())
}
}
pub fn recv(&self, msg: &mut Message, flags: i32) -> Result<()> {
zmq_try!(unsafe { zmq_sys::zmq_msg_recv(msg_ptr(msg), self.sock, flags as c_int) });
Ok(())
}
pub fn recv_into(&self, bytes: &mut [u8], flags: i32) -> Result<usize> {
let bytes_ptr = bytes.as_mut_ptr() as *mut c_void;
let rc = zmq_try!(unsafe {
zmq_sys::zmq_recv(self.sock, bytes_ptr, bytes.len(), flags as c_int)
});
Ok(rc as usize)
}
pub fn recv_msg(&self, flags: i32) -> Result<Message> {
let mut msg = Message::new();
self.recv(&mut msg, flags).map(|_| msg)
}
pub fn recv_bytes(&self, flags: i32) -> Result<Vec<u8>> {
self.recv_msg(flags).map(|msg| msg.to_vec())
}
pub fn recv_string(&self, flags: i32) -> Result<result::Result<String, Vec<u8>>> {
self.recv_bytes(flags)
.map(|bytes| String::from_utf8(bytes).map_err(FromUtf8Error::into_bytes))
}
pub fn recv_multipart(&self, flags: i32) -> Result<Vec<Vec<u8>>> {
let mut parts: Vec<Vec<u8>> = vec![];
loop {
let part = self.recv_bytes(flags)?;
parts.push(part);
let more_parts = self.get_rcvmore()?;
if !more_parts {
break;
}
}
Ok(parts)
}
sockopts! {
(is_ipv6, set_ipv6) => ZMQ_IPV6 as bool,
(is_immediate, set_immediate) => ZMQ_IMMEDIATE as bool,
(is_plain_server, set_plain_server) => ZMQ_PLAIN_SERVER as bool,
(is_conflate, set_conflate) => ZMQ_CONFLATE as bool,
(is_probe_router, set_probe_router) => ZMQ_PROBE_ROUTER as bool,
(is_router_mandatory, set_router_mandatory) => ZMQ_ROUTER_MANDATORY as bool,
(is_router_handover, set_router_handover) => ZMQ_ROUTER_HANDOVER as bool,
(is_curve_server, set_curve_server) => ZMQ_CURVE_SERVER as bool,
(is_gssapi_server, set_gssapi_server) => ZMQ_GSSAPI_SERVER as bool,
(is_gssapi_plaintext, set_gssapi_plaintext) => ZMQ_GSSAPI_PLAINTEXT as bool,
}
pub fn get_socket_type(&self) -> Result<SocketType> {
sockopt::get(self.sock, Constants::ZMQ_TYPE.to_raw()).map(|ty| match ty {
0 => SocketType::PAIR,
1 => SocketType::PUB,
2 => SocketType::SUB,
3 => SocketType::REQ,
4 => SocketType::REP,
5 => SocketType::DEALER,
6 => SocketType::ROUTER,
7 => SocketType::PULL,
8 => SocketType::PUSH,
9 => SocketType::XPUB,
10 => SocketType::XSUB,
11 => SocketType::STREAM,
_ => panic!("socket type is out of range!"),
})
}
pub fn get_rcvmore(&self) -> Result<bool> {
sockopt::get(self.sock, Constants::ZMQ_RCVMORE.to_raw()).map(|o: i64| o == 1i64)
}
sockopts! {
(get_maxmsgsize, set_maxmsgsize) => ZMQ_MAXMSGSIZE as i64,
(get_sndhwm, set_sndhwm) => ZMQ_SNDHWM as i32,
(get_rcvhwm, set_rcvhwm) => ZMQ_RCVHWM as i32,
(get_affinity, set_affinity) => ZMQ_AFFINITY as u64,
(get_rate, set_rate) => ZMQ_RATE as i32,
(get_recovery_ivl, set_recovery_ivl) => ZMQ_RECOVERY_IVL as i32,
(get_sndbuf, set_sndbuf) => ZMQ_SNDBUF as i32,
(get_rcvbuf, set_rcvbuf) => ZMQ_RCVBUF as i32,
(get_tos, set_tos) => ZMQ_TOS as i32,
(get_linger, set_linger) => ZMQ_LINGER as i32,
(get_reconnect_ivl, set_reconnect_ivl) => ZMQ_RECONNECT_IVL as i32,
(get_reconnect_ivl_max, set_reconnect_ivl_max) => ZMQ_RECONNECT_IVL_MAX as i32,
(get_backlog, set_backlog) => ZMQ_BACKLOG as i32,
(get_fd) => ZMQ_FD as RawFd,
(get_events) => ZMQ_EVENTS as PollEvents,
(get_multicast_hops, set_multicast_hops) => ZMQ_MULTICAST_HOPS as i32,
(get_rcvtimeo, set_rcvtimeo) => ZMQ_RCVTIMEO as i32,
(get_sndtimeo, set_sndtimeo) => ZMQ_SNDTIMEO as i32,
(get_tcp_keepalive, set_tcp_keepalive) => ZMQ_TCP_KEEPALIVE as i32,
(get_tcp_keepalive_cnt, set_tcp_keepalive_cnt) => ZMQ_TCP_KEEPALIVE_CNT as i32,
(get_tcp_keepalive_idle, set_tcp_keepalive_idle) => ZMQ_TCP_KEEPALIVE_IDLE as i32,
(get_tcp_keepalive_intvl, set_tcp_keepalive_intvl) => ZMQ_TCP_KEEPALIVE_INTVL as i32,
(get_handshake_ivl, set_handshake_ivl) => ZMQ_HANDSHAKE_IVL as i32,
(_, set_identity) => ZMQ_IDENTITY as &[u8],
(_, set_subscribe) => ZMQ_SUBSCRIBE as &[u8],
(_, set_unsubscribe) => ZMQ_UNSUBSCRIBE as &[u8],
(get_heartbeat_ivl, set_heartbeat_ivl) => ZMQ_HEARTBEAT_IVL as i32,
(get_heartbeat_ttl, set_heartbeat_ttl) => ZMQ_HEARTBEAT_TTL as i32,
(get_heartbeat_timeout, set_heartbeat_timeout) => ZMQ_HEARTBEAT_TIMEOUT as i32,
(get_connect_timeout, set_connect_timeout) => ZMQ_CONNECT_TIMEOUT as i32,
}
pub fn get_identity(&self) -> Result<Vec<u8>> {
sockopt::get_bytes(self.sock, Constants::ZMQ_IDENTITY.to_raw(), 255)
}
pub fn get_socks_proxy(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(self.sock, Constants::ZMQ_SOCKS_PROXY.to_raw(), 255, true)
}
pub fn get_mechanism(&self) -> Result<Mechanism> {
sockopt::get(self.sock, Constants::ZMQ_MECHANISM.to_raw()).map(|mech| match mech {
0 => Mechanism::ZMQ_NULL,
1 => Mechanism::ZMQ_PLAIN,
2 => Mechanism::ZMQ_CURVE,
3 => Mechanism::ZMQ_GSSAPI,
_ => panic!("Mechanism is out of range!"),
})
}
pub fn get_plain_username(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(self.sock, Constants::ZMQ_PLAIN_USERNAME.to_raw(), 255, true)
}
pub fn get_plain_password(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(self.sock, Constants::ZMQ_PLAIN_PASSWORD.to_raw(), 256, true)
}
pub fn get_zap_domain(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(self.sock, Constants::ZMQ_ZAP_DOMAIN.to_raw(), 255, true)
}
pub fn get_last_endpoint(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(
self.sock,
Constants::ZMQ_LAST_ENDPOINT.to_raw(),
256 + 9 + 1,
true,
)
}
pub fn get_curve_publickey(&self) -> Result<Vec<u8>> {
sockopt::get_bytes(self.sock, Constants::ZMQ_CURVE_PUBLICKEY.to_raw(), 32)
}
pub fn get_curve_secretkey(&self) -> Result<Vec<u8>> {
sockopt::get_bytes(self.sock, Constants::ZMQ_CURVE_SECRETKEY.to_raw(), 32)
}
pub fn get_curve_serverkey(&self) -> Result<Vec<u8>> {
sockopt::get_bytes(self.sock, Constants::ZMQ_CURVE_SERVERKEY.to_raw(), 32)
}
pub fn get_gssapi_principal(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(
self.sock,
Constants::ZMQ_GSSAPI_PRINCIPAL.to_raw(),
260,
true,
)
}
pub fn get_gssapi_service_principal(&self) -> Result<result::Result<String, Vec<u8>>> {
sockopt::get_string(
self.sock,
Constants::ZMQ_GSSAPI_SERVICE_PRINCIPAL.to_raw(),
260,
true,
)
}
sockopts! {
(_, set_socks_proxy) => ZMQ_SOCKS_PROXY as Option<&str>,
(_, set_plain_username) => ZMQ_PLAIN_USERNAME as Option<&str>,
(_, set_plain_password) => ZMQ_PLAIN_PASSWORD as Option<&str>,
(_, set_zap_domain) => ZMQ_ZAP_DOMAIN as &str,
(_, set_xpub_welcome_msg) => ZMQ_XPUB_WELCOME_MSG as Option<&str>,
(_, set_xpub_verbose) => ZMQ_XPUB_VERBOSE as bool,
(_, set_curve_publickey) => ZMQ_CURVE_PUBLICKEY as &[u8],
(_, set_curve_secretkey) => ZMQ_CURVE_SECRETKEY as &[u8],
(_, set_curve_serverkey) => ZMQ_CURVE_SERVERKEY as &[u8],
(_, set_gssapi_principal) => ZMQ_GSSAPI_PRINCIPAL as &str,
(_, set_gssapi_service_principal) => ZMQ_GSSAPI_SERVICE_PRINCIPAL as &str,
}
pub fn as_poll_item(&self, events: PollEvents) -> PollItem {
PollItem {
socket: self.sock,
fd: 0,
events: events.bits(),
revents: 0,
marker: PhantomData,
}
}
pub fn poll(&self, events: PollEvents, timeout_ms: i64) -> Result<i32> {
poll(&mut [self.as_poll_item(events)], timeout_ms)
}
}
bitflags! {
pub struct PollEvents: i16 {
const POLLIN = 1;
const POLLOUT = 2;
const POLLERR = 4;
}
}
pub const POLLIN: PollEvents = PollEvents::POLLIN;
pub const POLLOUT: PollEvents = PollEvents::POLLOUT;
pub const POLLERR: PollEvents = PollEvents::POLLERR;
#[repr(C)]
pub struct PollItem<'a> {
socket: *mut c_void,
fd: RawFd,
events: c_short,
revents: c_short,
marker: PhantomData<&'a Socket>,
}
impl<'a> PollItem<'a> {
pub fn from_fd(fd: RawFd, events: PollEvents) -> PollItem<'a> {
PollItem {
socket: ptr::null_mut(),
fd,
events: events.bits(),
revents: 0,
marker: PhantomData,
}
}
pub fn set_events(&mut self, events: PollEvents) {
self.events = events.bits();
}
pub fn get_revents(&self) -> PollEvents {
PollEvents::from_bits_truncate(self.revents)
}
pub fn is_readable(&self) -> bool {
(self.revents & POLLIN.bits()) != 0
}
pub fn is_writable(&self) -> bool {
(self.revents & POLLOUT.bits()) != 0
}
pub fn is_error(&self) -> bool {
(self.revents & POLLERR.bits()) != 0
}
}
pub fn poll(items: &mut [PollItem], timeout: i64) -> Result<i32> {
let rc = zmq_try!(unsafe {
zmq_sys::zmq_poll(
items.as_mut_ptr() as *mut zmq_sys::zmq_pollitem_t,
items.len() as c_int,
timeout as c_long,
)
});
Ok(rc as i32)
}
pub fn proxy(frontend: &Socket, backend: &Socket) -> Result<()> {
zmq_try!(unsafe { zmq_sys::zmq_proxy(frontend.sock, backend.sock, ptr::null_mut()) });
Ok(())
}
pub fn proxy_with_capture(
frontend: &mut Socket,
backend: &mut Socket,
capture: &mut Socket,
) -> Result<()> {
zmq_try!(unsafe { zmq_sys::zmq_proxy(frontend.sock, backend.sock, capture.sock) });
Ok(())
}
pub fn proxy_steerable(
frontend: &mut Socket,
backend: &mut Socket,
control: &mut Socket,
) -> Result<()> {
zmq_try!(unsafe {
zmq_sys::zmq_proxy_steerable(frontend.sock, backend.sock, ptr::null_mut(), control.sock)
});
Ok(())
}
pub fn proxy_steerable_with_capture(
frontend: &mut Socket,
backend: &mut Socket,
capture: &mut Socket,
control: &mut Socket,
) -> Result<()> {
zmq_try!(unsafe {
zmq_sys::zmq_proxy_steerable(frontend.sock, backend.sock, capture.sock, control.sock)
});
Ok(())
}
pub fn has(capability: &str) -> Option<bool> {
let c_str = ffi::CString::new(capability).unwrap();
unsafe { Some(zmq_sys::zmq_has(c_str.as_ptr()) == 1) }
}
#[derive(Debug)]
pub struct CurveKeyPair {
pub public_key: [u8; 32],
pub secret_key: [u8; 32],
}
impl CurveKeyPair {
pub fn new() -> Result<CurveKeyPair> {
let mut ffi_public_key = [0u8; 41];
let mut ffi_secret_key = [0u8; 41];
zmq_try!(unsafe {
zmq_sys::zmq_curve_keypair(
ffi_public_key.as_mut_ptr() as *mut libc::c_char,
ffi_secret_key.as_mut_ptr() as *mut libc::c_char,
)
});
let mut pair = CurveKeyPair {
public_key: [0; 32],
secret_key: [0; 32],
};
unsafe {
zmq_sys::zmq_z85_decode(
pair.public_key.as_mut_ptr(),
ffi_public_key.as_ptr() as *mut libc::c_char,
);
zmq_sys::zmq_z85_decode(
pair.secret_key.as_mut_ptr(),
ffi_secret_key.as_ptr() as *mut libc::c_char,
);
}
Ok(pair)
}
}
#[derive(Debug)]
pub enum EncodeError {
BadLength,
FromUtf8Error(FromUtf8Error),
}
impl From<FromUtf8Error> for EncodeError {
fn from(err: FromUtf8Error) -> Self {
EncodeError::FromUtf8Error(err)
}
}
impl fmt::Display for EncodeError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
EncodeError::BadLength => write!(f, "Invalid data length. Should be multiple of 4."),
EncodeError::FromUtf8Error(ref e) => write!(f, "UTF8 conversion error: {}", e),
}
}
}
impl std::error::Error for EncodeError {
fn description(&self) -> &str {
match *self {
EncodeError::BadLength => "invalid data length",
EncodeError::FromUtf8Error(ref e) => e.description(),
}
}
}
pub fn z85_encode(data: &[u8]) -> result::Result<String, EncodeError> {
if data.len() % 4 != 0 {
return Err(EncodeError::BadLength);
}
let len = data.len() * 5 / 4 + 1;
let mut dest = vec![0u8; len];
unsafe {
zmq_sys::zmq_z85_encode(
dest.as_mut_ptr() as *mut libc::c_char,
data.as_ptr(),
data.len(),
);
}
dest.truncate(len - 1);
String::from_utf8(dest).map_err(EncodeError::FromUtf8Error)
}
#[derive(Debug)]
pub enum DecodeError {
BadLength,
NulError(ffi::NulError),
}
impl From<ffi::NulError> for DecodeError {
fn from(err: ffi::NulError) -> Self {
DecodeError::NulError(err)
}
}
impl fmt::Display for DecodeError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
DecodeError::BadLength => write!(f, "Invalid data length. Should be multiple of 5."),
DecodeError::NulError(ref e) => write!(f, "Nul byte error: {}", e),
}
}
}
impl std::error::Error for DecodeError {
fn description(&self) -> &str {
match *self {
DecodeError::BadLength => "invalid data length",
DecodeError::NulError(ref e) => e.description(),
}
}
}
pub fn z85_decode(data: &str) -> result::Result<Vec<u8>, DecodeError> {
if data.len() % 5 != 0 {
return Err(DecodeError::BadLength);
}
let len = data.len() * 4 / 5;
let mut dest = vec![0u8; len];
let c_str = ffi::CString::new(data)?;
unsafe {
zmq_sys::zmq_z85_decode(dest.as_mut_ptr(), c_str.into_raw());
}
Ok(dest)
}