zmq_rs/
lib.rs

1#![allow(dead_code)]
2
3extern crate libc;
4extern crate zmq_ffi;
5#[macro_use]
6extern crate cfg_if;
7
8mod socket;
9mod errno;
10pub use socket::*;
11pub use errno::*;
12
13use std::ops::{ Deref, DerefMut };
14use std::ffi;
15use std::vec::Vec;
16use std::slice;
17use std::mem::transmute;
18use libc::{ c_int, c_void, size_t };
19
20pub const ZMQ_VERSION_MAJOR:i32 = 4;
21pub const ZMQ_VERSION_MINOR:i32 = 1;
22pub const ZMQ_VERSION_PATCH:i32 = 4;
23
24macro_rules! ret_when_null {
25    ($ptr: expr) => {{
26        if $ptr.is_null() {
27            return Err(Error::from_last_err());
28        }
29    }}
30}
31
32#[macro_export]
33macro_rules! ZMQ_MAKE_VERSION {
34    ($major: expr, $minor: expr, $patch: expr) => {
35        {
36            $major * 10000 + $minor * 100 + $patch
37        }
38    }
39}
40
41pub const ZMQ_VERSION:i32 = ZMQ_MAKE_VERSION!(
42    ZMQ_VERSION_MAJOR,
43    ZMQ_VERSION_MINOR,
44    ZMQ_VERSION_PATCH
45);
46
47fn errno() -> c_int {
48    unsafe {
49        zmq_ffi::zmq_errno()
50    }
51}
52
53fn strerror(errnum: c_int) -> String {
54    unsafe {
55        let s = zmq_ffi::zmq_strerror(errnum);
56        ffi::CStr::from_ptr(s).to_str().unwrap().to_string()
57    }
58}
59
60/// Report 0MQ library version
61///
62/// Binding of `void zmq_version (int *major, int *minor, int *patch)`
63///
64/// The function will return tuple of major, minor and patch of the ØMQ library version.
65pub fn version() -> (i32, i32, i32) {
66    let mut major = 0;
67    let mut minor = 0;
68    let mut patch = 0;
69
70    unsafe {
71        zmq_ffi::zmq_version(&mut major, &mut minor, &mut patch);
72    }
73
74    (major as i32, minor as i32, patch as i32)
75}
76
77#[derive(Clone)]
78pub struct Error {
79    err_num: c_int,
80    err_str: String,
81}
82
83impl Error {
84    fn from_last_err() -> Error {
85        let err_num = errno();
86        let err_str = strerror(err_num);
87
88        Error {
89            err_num: err_num,
90            err_str: err_str,
91        }
92    }
93
94    pub fn get_errno(&self) -> Errno {
95        self.err_num as Errno
96    }
97}
98
99impl std::fmt::Display for Error {
100    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
101        write!(f, "{} (code {})", self.err_str, self.err_num)
102    }
103}
104
105impl std::fmt::Debug for Error {
106    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
107        std::fmt::Display::fmt(self, f)
108    }
109}
110
111impl std::error::Error for Error {
112    fn description(&self) -> &str {
113        &self.err_str
114    }
115}
116
117type ContextOption = c_int;
118
119const IO_THREADS: ContextOption = 1;         //  get     /   set
120const MAX_SOCKETS: ContextOption = 2;        //  get     /   set
121const SOCKET_LIMIT: ContextOption = 3;       //  get     /
122const THREAD_PRIORITY: ContextOption = 3;    //          /   set
123const THREAD_SCHED_POLICY: ContextOption = 4;//          /   set
124const IPV6: ContextOption = 42;              //  get     /   set
125
126macro_rules! getctxopt_template {
127    ($name: ident, $opt: expr) => {
128        pub fn $name(&self) -> Result<i32, Error> {
129            let rc = unsafe { zmq_ffi::zmq_ctx_get(self.ctx_ptr, $opt as c_int) };
130            if rc == -1 {
131                Err(Error::from_last_err())
132            } else {
133                Ok(rc)
134            }
135        }
136    };
137    ($name: ident, $opt: expr, $map: expr, $rt: ty) => {
138        pub fn $name(&self) -> Result<$rt, Error> {
139            let rc = unsafe { zmq_ffi::zmq_ctx_get(self.ctx_ptr, $opt as c_int) };
140            if rc == -1 {
141                Err(Error::from_last_err())
142            } else {
143                Ok($map(rc))
144            }
145        }
146    };
147}
148
149macro_rules! setctxopt_template {
150    ($name: ident, $opt: expr) => {
151        pub fn $name(&mut self, optval: i32) -> Result<(), Error> {
152            let rc = unsafe { zmq_ffi::zmq_ctx_set(self.ctx_ptr,  $opt as c_int, optval as c_int) };
153            if rc == -1 {
154                Err(Error::from_last_err())
155            } else {
156                Ok(())
157            }
158        }
159    };
160}
161
162
163pub struct Context {
164    ctx_ptr: *mut c_void,
165}
166
167impl Context {
168    /// Create new 0MQ context
169    ///
170    /// Binding of `void *zmq_ctx_new ();`
171    ///
172    /// The function creates a new ØMQ context.
173    /// # Thread safety
174    /// A ØMQ context is thread safe and may be shared among as many application threads as necessary,
175    /// without any additional locking required on the part of the caller.
176    pub fn new() -> Result<Context, Error> {
177        let ctx_ptr = unsafe { zmq_ffi::zmq_ctx_new() };
178        ret_when_null!(ctx_ptr);
179        Ok(Context {
180            ctx_ptr: ctx_ptr,
181        })
182    }
183
184    /// Destroy a 0MQ context
185    ///
186    /// Binding of `int zmq_ctx_term (void *context);`
187    /// This function will be called automatically when context goes out of scope
188    fn term(&mut self) -> Result<(), Error> {
189        let rc = unsafe { zmq_ffi::zmq_ctx_term(self.ctx_ptr) };
190        if rc == -1 {
191            Err(Error::from_last_err())
192        } else {
193            Ok(())
194        }
195    }
196
197    /// Shutdown a 0MQ context
198    ///
199    /// Binding of `int zmq_ctx_shutdown (void *context);`
200    ///
201    /// The function will shutdown the ØMQ context context.
202    /// Context shutdown will cause any blocking operations currently in progress on sockets open within context to return immediately with an error code of ETERM.
203    /// With the exception of Socket::Close(), any further operations on sockets open within context will fail with an error code of ETERM.
204    pub fn shutdown(&mut self) -> Result<(), Error> {
205        let rc = unsafe { zmq_ffi::zmq_ctx_shutdown(self.ctx_ptr) };
206        if rc == -1 {
207            Err(Error::from_last_err())
208        } else {
209            Ok(())
210        }
211    }
212
213    getctxopt_template!(get_io_threads, IO_THREADS);
214    getctxopt_template!(get_max_sockets, MAX_SOCKETS);
215    getctxopt_template!(get_socket_limit, SOCKET_LIMIT);
216    getctxopt_template!(is_ipv6_enabled, IPV6, |r| { r > 0 }, bool);
217
218    setctxopt_template!(set_io_threads, IO_THREADS);
219    setctxopt_template!(set_max_sockets, MAX_SOCKETS);
220    setctxopt_template!(set_thread_priority, THREAD_PRIORITY);
221    setctxopt_template!(set_thread_sched_policy, THREAD_SCHED_POLICY);
222    setctxopt_template!(set_ipv6, IPV6);
223
224    /// Create 0MQ socket
225    ///
226    /// Binding of `void *zmq_socket (void *context, int type);`
227    ///
228    /// The type argument specifies the socket type, which determines the semantics of communication over the socket.
229    /// The newly created socket is initially unbound, and not associated with any endpoints.
230    /// In order to establish a message flow a socket must first be connected to at least one endpoint with Scoket::Connect,
231    /// or at least one endpoint must be created for accepting incoming connections with Socket::Bind().
232    pub fn socket(&self, t: SocketType) -> Result<Socket, Error> {
233        let socket = unsafe { zmq_ffi::zmq_socket(self.ctx_ptr, t as c_int) };
234        ret_when_null!(socket);
235        Ok(Socket::from_raw(socket))
236    }
237}
238
239unsafe impl Send for Context {}
240unsafe impl Sync for Context {}
241
242impl Drop for Context {
243    fn drop(&mut self) {
244        loop {
245            match self.term() {
246                Ok(_) => { },
247                Err(e) => {
248                    if e.get_errno() == EINTR {
249                        continue;
250                    } else {
251                        break;
252                    }
253                }
254            }
255        }
256
257    }
258}
259
260const MSG_SIZE: usize = 64;
261
262pub struct Message {
263    msg: zmq_ffi::zmq_msg_t,
264}
265
266unsafe extern "C" fn zmq_free_fn(data: *mut c_void, hint: *mut c_void) {
267    let slice = slice::from_raw_parts_mut(data as *mut u8, hint as usize);
268    let _: Box<[u8]> = Box::from_raw(slice);
269}
270
271impl Message {
272    /// initialise empty 0MQ message.
273    ///
274    /// Binding of `int zmq_msg_init (zmq_msg_t *msg);`.
275    ///
276    /// The function will return a message object to represent an empty message.
277    /// This function is most useful when called before receiving a message.
278    pub fn new() -> Result<Message, Error> {
279        let mut msg = zmq_ffi::zmq_msg_t { unknown: [0; MSG_SIZE] };
280        let rc = unsafe { zmq_ffi::zmq_msg_init(&mut msg) };
281        if rc == -1 {
282            Err(Error::from_last_err())
283        } else {
284            Ok(Message { msg: msg })
285        }
286    }
287
288    ///  Initialise 0MQ message of a specified size.
289    ///
290    /// Binding of `int zmq_msg_init_size (zmq_msg_t *msg, size_t size);`.
291    ///
292    /// The function will allocate any resources required to store a message size bytes long and
293    /// return a message object to represent the newly allocated message.
294    pub fn with_capcity(len: usize) -> Result<Message, Error> {
295        let mut msg = zmq_ffi::zmq_msg_t { unknown: [0; MSG_SIZE] };
296        let rc = unsafe { zmq_ffi::zmq_msg_init_size(&mut msg, len as size_t) };
297        if rc == -1 {
298            Err(Error::from_last_err())
299        } else {
300            Ok(Message { msg: msg })
301        }
302    }
303
304    /// Initialise 0MQ message from a supplied std::vec::Vec<u8>.
305    ///
306    /// Binding of `int zmq_msg_init_data (zmq_msg_t *msg, void *data,
307    ///    size_t size, zmq_free_fn *ffn, void *hint);`.
308    ///
309    /// The function will take ownership of the Vec and
310    /// return a message object to represent the content referenced by the Vec.
311    ///
312    /// No copy of data will be performed.
313    pub fn from_vec(vec: Vec<u8>) -> Result<Message, Error> {
314        let len = vec.len() as size_t;
315        let data = vec.into_boxed_slice();
316
317        let mut msg = zmq_ffi::zmq_msg_t { unknown: [0; MSG_SIZE] };
318        let rc = unsafe {
319            zmq_ffi::zmq_msg_init_data(&mut msg, Box::into_raw(data) as *mut c_void, len,
320                zmq_free_fn, len as *mut _)
321            };
322        if rc == -1 {
323            Err(Error::from_last_err())
324        } else {
325            Ok(Message { msg: msg })
326        }
327    }
328
329    pub fn from_slice(data: &[u8]) -> Result<Message, Error> {
330        unsafe {
331            let mut msg = try!(Message::with_capcity(data.len()));
332            std::ptr::copy_nonoverlapping(data.as_ptr(), msg.as_mut_ptr(), data.len());
333            Ok(msg)
334        }
335    }
336
337    /// Move content of a message to another message.
338    ///
339    /// Binding of `int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);`.
340    ///
341    /// Move the content of the message object referenced by src to the message object referenced by dest.
342    /// No actual copying of message content is performed,
343    /// dest is simply updated to reference the new content.
344    /// src becomes an empty message after calling Message::msg_move().
345    /// The original content of dest, if any, will be released
346    pub fn msg_move(dest: &mut Message, src: &mut Message) -> Result<(), Error> {
347        let rc = unsafe {
348            zmq_ffi::zmq_msg_move(&mut dest.msg, &mut src.msg)
349        };
350        if rc == -1 {
351            Err(Error::from_last_err())
352        } else {
353            Ok(())
354        }
355    }
356
357    /// Copy content of a message to another message.
358    ///
359    /// Binding of `int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);`.
360    ///
361    /// Copy the message object referenced by src to the message object referenced by dest.
362    /// The original content of dest, if any, will be released.
363    pub fn msg_copy(dest: &mut Message, src: &Message) -> Result<(), Error> {
364        let rc = unsafe {
365            zmq_ffi::zmq_msg_copy(&mut dest.msg, transmute(&src.msg))
366        };
367        if rc == -1 {
368            Err(Error::from_last_err())
369        } else {
370            Ok(())
371        }
372    }
373
374    /// Retrieve pointer to message content.
375    ///
376    /// Binding of `void *zmq_msg_data (zmq_msg_t *msg);`.
377    ///
378    /// The function will return a pointer to the message content.
379    pub unsafe fn get_data_ptr(&mut self) -> *mut c_void {
380        zmq_ffi::zmq_msg_data(&mut self.msg)
381    }
382
383    /// Retrieve pointer to message content.
384    ///
385    /// Binding of `void *zmq_msg_data (zmq_msg_t *msg);`.
386    ///
387    /// The function will return a pointer to the message content.
388    pub unsafe fn get_const_data_ptr(&self) -> *const c_void {
389        zmq_ffi::zmq_msg_data(transmute(&self.msg))
390    }
391
392    /// Retrieve message content size in bytes
393    ///
394    /// Binding of `size_t zmq_msg_size (zmq_msg_t *msg);`
395    ///
396    /// The function will return the size in bytes of the content of the message.
397    pub fn len(&self) -> usize {
398        unsafe { zmq_ffi::zmq_msg_size(transmute(&self.msg)) }
399    }
400
401    ///  Indicate if there are more message parts to receive
402    ///
403    /// Binding of `int zmq_msg_more (zmq_msg_t *message);`
404    ///
405    /// The function indicates whether this is part of a multi-part message, and there are further parts to receive.
406    /// This method is identical to xxxxx with an argument of ZMQ_MORE.
407    pub fn has_more(&self) -> bool {
408        unsafe { zmq_ffi::zmq_msg_more(transmute(&self.msg)) > 0 }
409    }
410
411    /// Get message property
412    ///
413    /// Binding of `int zmq_msg_get (zmq_msg_t *message, int property);`
414    ///
415    /// The function will return the value for the property specified by the property argument.
416    pub fn get_property(&self, property: MessageProperty) -> Result<i32, Error> {
417        let rc = unsafe { zmq_ffi::zmq_msg_get(transmute(&self.msg), property as c_int) };
418        if rc == -1 {
419            Err(Error::from_last_err())
420        } else  {
421            Ok(rc)
422        }
423    }
424
425    // zmq_msg_set is not used this while
426    // pub fn set_property(&mut self, property: c_int, optval: i32) -> Result<(), Error> { }
427
428    /// Get message metadata property
429    ///
430    /// Binding of `const char *zmq_msg_gets (zmq_msg_t *message, const char *property);`
431    ///
432    /// The function will return the string value for the metadata property specified by the property argument.
433    /// Metadata is defined on a per-connection basis during the ZeroMQ connection handshake as specified in <rfc.zeromq.org/spec:37>.
434    /// The following ZMTP properties can be retrieved with the function:
435    /// `Socket-Type`
436    /// `Identity`
437    /// `Resource`
438    /// Additionally, when available for the underlying transport,
439    /// the Peer-Address property will return the IP address of the remote endpoint as returned by getnameinfo(2).
440    /// Other properties may be defined based on the underlying security mechanism.
441    pub fn get_meta<'a>(&'a self, property: &str) -> Option<&'a str> {
442        let prop_cstr = ffi::CString::new(property).unwrap();
443
444        let returned_str_ptr = unsafe { zmq_ffi::zmq_msg_gets(transmute(&self.msg), transmute(prop_cstr.as_ptr())) };
445        if returned_str_ptr.is_null() {
446            None
447        } else {
448            unsafe { Some(ffi::CStr::from_ptr(returned_str_ptr).to_str().unwrap()) }
449        }
450    }
451}
452
453impl Deref for Message {
454    type Target = [u8];
455
456    fn deref<'a>(&'a self) -> &'a [u8] {
457        unsafe {
458            let ptr = self.get_const_data_ptr();
459            let len = self.len() as usize;
460            slice::from_raw_parts(transmute(ptr), len)
461        }
462    }
463}
464
465impl DerefMut for Message {
466    fn deref_mut<'a>(&'a mut self) -> &'a mut [u8] {
467        unsafe {
468            let ptr = self.get_data_ptr();
469            let len = self.len() as usize;
470            slice::from_raw_parts_mut(transmute(ptr), len)
471        }
472    }
473}
474
475impl Drop for Message {
476    fn drop(&mut self) {
477        loop {
478            let rc = unsafe { zmq_ffi::zmq_msg_close(&mut self.msg) };
479            if rc != 0 {
480                let e = Error::from_last_err();
481                if e.get_errno() == EINTR {
482                    continue;
483                } else {
484                    panic!(e);
485                }
486
487            } else {
488                break;
489            }
490        }
491    }
492}
493
494pub type SocketType = c_int;
495pub const PAIR: SocketType = 0;
496pub const PUB: SocketType = 1;
497pub const SUB: SocketType = 2;
498pub const REQ: SocketType = 3;
499pub const REP: SocketType = 4;
500pub const DEALER: SocketType = 5;
501pub const ROUTER: SocketType = 6;
502pub const PULL: SocketType = 7;
503pub const PUSH: SocketType = 8;
504pub const XPUB: SocketType = 9;
505pub const XSUB: SocketType = 10;
506pub const STREAM: SocketType = 11;
507
508pub type MessageProperty = c_int;
509pub const MORE: MessageProperty = 1;
510pub const SRCFD: MessageProperty = 2;
511pub const SHARED: MessageProperty = 3;
512
513
514pub type SecurityMechanism = c_int;
515pub const ZMQ_NULL: SecurityMechanism = 0;
516pub const ZMQ_PLAIN: SecurityMechanism = 1;
517pub const ZMQ_CURVE: SecurityMechanism = 2;
518pub const ZMQ_GSSAPI: SecurityMechanism = 3;
519
520/// Check a ZMQ capability
521///
522/// Bindng of `int zmq_has (const char *capability);`
523///
524/// The function shall report whether a specified capability is available in the library
525pub fn has_capability(capability: &str) -> bool {
526    let capability_cstr = ffi::CString::new(capability).unwrap();
527    let rc = unsafe { zmq_ffi::zmq_has(capability_cstr.as_ptr()) };
528    rc == 1
529}
530
531//  Encryption functions
532/*  Encode data with Z85 encoding. Returns encoded data                       */
533//ZMQ_EXPORT char *zmq_z85_encode (char *dest, const uint8_t *data, size_t size);
534
535/// Encode a binary key as Z85 printable text
536///
537/// Binding of `char *zmq_z85_encode (char *dest, const uint8_t *data, size_t size);`
538///
539/// The function will encode the binary block specified by data and size into a string in dest.
540/// The size of the binary block must be divisible by 4.
541pub fn z85_encode(data: &[u8]) -> Result<String, Error> {
542    let len = data.len() as i32 * 5 / 4 + 1;
543    let mut dest: Vec<u8> = Vec::with_capacity(len as usize);
544
545    let rc = unsafe { zmq_ffi::zmq_z85_encode(transmute(dest.as_mut_ptr()), data.as_ptr(), data.len()) };
546    if rc.is_null() {
547        Err(Error::from_last_err())
548    } else {
549        unsafe {
550            dest.set_len(len as usize);
551            let cstr = ffi::CStr::from_ptr(transmute(dest.as_ptr()));
552
553            Ok(String::from_utf8(cstr.to_bytes().to_vec()).unwrap())
554        }
555    }
556}
557
558///  Decode a binary key from Z85 printable text
559///
560/// Binding of `uint8_t *zmq_z85_decode (uint8_t *dest, const char *string);`
561///
562/// The function will decode string into dest. The length of string in bytes shall be divisible by 5
563pub fn z85_decode(encoded: &str) -> Result<Vec<u8>, Error> {
564    let encoded_cstr = ffi::CString::new(encoded).unwrap();
565    let len = (encoded_cstr.as_bytes().len() as i32 * 4 / 5) as i32;
566    let mut dest: Vec<u8> = Vec::with_capacity(len as usize);
567
568    let rc = unsafe { zmq_ffi::zmq_z85_decode(dest.as_mut_ptr(), encoded_cstr.as_ptr()) };
569    if rc.is_null() {
570        Err(Error::from_last_err())
571    } else  {
572        unsafe {
573            dest.set_len(len as usize);
574        }
575        Ok(dest)
576    }
577}
578
579
580/// Generate z85-encoded public and private keypair with libsodium.
581///
582/// Binding of `int zmq_curve_keypair (char *z85_public_key, char *z85_secret_key);`
583///
584/// The function will return a newly generated random keypair consisting of a public key and a secret key.
585/// The keys are encoded using z85_encode().
586pub fn gen_curve_keypair() -> Result<(String, String), Error> {
587    let mut public_key: Vec<u8> = Vec::with_capacity(41);
588    let mut secret_key: Vec<u8> = Vec::with_capacity(41);
589
590    let rc = unsafe {
591        zmq_ffi::zmq_curve_keypair(
592            transmute(public_key.as_mut_ptr()),
593            transmute(secret_key.as_mut_ptr())
594        )
595    };
596    if rc == -1 {
597        Err(Error::from_last_err())
598    } else  {
599        unsafe {
600            public_key.set_len(40);
601            secret_key.set_len(40);
602        }
603        Ok((String::from_utf8(public_key).unwrap(), String::from_utf8(secret_key).unwrap()))
604    }
605}