1#![allow(dead_code)]
2
3extern crate libc;
4extern crate zmq_ffi;
5#[macro_use]
6extern crate cfg_if;
7
8mod socket;
9mod errno;
10pub use socket::*;
11pub use errno::*;
12
13use std::ops::{ Deref, DerefMut };
14use std::ffi;
15use std::vec::Vec;
16use std::slice;
17use std::mem::transmute;
18use libc::{ c_int, c_void, size_t };
19
20pub const ZMQ_VERSION_MAJOR:i32 = 4;
21pub const ZMQ_VERSION_MINOR:i32 = 1;
22pub const ZMQ_VERSION_PATCH:i32 = 4;
23
24macro_rules! ret_when_null {
25 ($ptr: expr) => {{
26 if $ptr.is_null() {
27 return Err(Error::from_last_err());
28 }
29 }}
30}
31
32#[macro_export]
33macro_rules! ZMQ_MAKE_VERSION {
34 ($major: expr, $minor: expr, $patch: expr) => {
35 {
36 $major * 10000 + $minor * 100 + $patch
37 }
38 }
39}
40
41pub const ZMQ_VERSION:i32 = ZMQ_MAKE_VERSION!(
42 ZMQ_VERSION_MAJOR,
43 ZMQ_VERSION_MINOR,
44 ZMQ_VERSION_PATCH
45);
46
47fn errno() -> c_int {
48 unsafe {
49 zmq_ffi::zmq_errno()
50 }
51}
52
53fn strerror(errnum: c_int) -> String {
54 unsafe {
55 let s = zmq_ffi::zmq_strerror(errnum);
56 ffi::CStr::from_ptr(s).to_str().unwrap().to_string()
57 }
58}
59
60pub fn version() -> (i32, i32, i32) {
66 let mut major = 0;
67 let mut minor = 0;
68 let mut patch = 0;
69
70 unsafe {
71 zmq_ffi::zmq_version(&mut major, &mut minor, &mut patch);
72 }
73
74 (major as i32, minor as i32, patch as i32)
75}
76
77#[derive(Clone)]
78pub struct Error {
79 err_num: c_int,
80 err_str: String,
81}
82
83impl Error {
84 fn from_last_err() -> Error {
85 let err_num = errno();
86 let err_str = strerror(err_num);
87
88 Error {
89 err_num: err_num,
90 err_str: err_str,
91 }
92 }
93
94 pub fn get_errno(&self) -> Errno {
95 self.err_num as Errno
96 }
97}
98
99impl std::fmt::Display for Error {
100 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
101 write!(f, "{} (code {})", self.err_str, self.err_num)
102 }
103}
104
105impl std::fmt::Debug for Error {
106 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
107 std::fmt::Display::fmt(self, f)
108 }
109}
110
111impl std::error::Error for Error {
112 fn description(&self) -> &str {
113 &self.err_str
114 }
115}
116
117type ContextOption = c_int;
118
119const IO_THREADS: ContextOption = 1; const MAX_SOCKETS: ContextOption = 2; const SOCKET_LIMIT: ContextOption = 3; const THREAD_PRIORITY: ContextOption = 3; const THREAD_SCHED_POLICY: ContextOption = 4;const IPV6: ContextOption = 42; macro_rules! getctxopt_template {
127 ($name: ident, $opt: expr) => {
128 pub fn $name(&self) -> Result<i32, Error> {
129 let rc = unsafe { zmq_ffi::zmq_ctx_get(self.ctx_ptr, $opt as c_int) };
130 if rc == -1 {
131 Err(Error::from_last_err())
132 } else {
133 Ok(rc)
134 }
135 }
136 };
137 ($name: ident, $opt: expr, $map: expr, $rt: ty) => {
138 pub fn $name(&self) -> Result<$rt, Error> {
139 let rc = unsafe { zmq_ffi::zmq_ctx_get(self.ctx_ptr, $opt as c_int) };
140 if rc == -1 {
141 Err(Error::from_last_err())
142 } else {
143 Ok($map(rc))
144 }
145 }
146 };
147}
148
149macro_rules! setctxopt_template {
150 ($name: ident, $opt: expr) => {
151 pub fn $name(&mut self, optval: i32) -> Result<(), Error> {
152 let rc = unsafe { zmq_ffi::zmq_ctx_set(self.ctx_ptr, $opt as c_int, optval as c_int) };
153 if rc == -1 {
154 Err(Error::from_last_err())
155 } else {
156 Ok(())
157 }
158 }
159 };
160}
161
162
163pub struct Context {
164 ctx_ptr: *mut c_void,
165}
166
167impl Context {
168 pub fn new() -> Result<Context, Error> {
177 let ctx_ptr = unsafe { zmq_ffi::zmq_ctx_new() };
178 ret_when_null!(ctx_ptr);
179 Ok(Context {
180 ctx_ptr: ctx_ptr,
181 })
182 }
183
184 fn term(&mut self) -> Result<(), Error> {
189 let rc = unsafe { zmq_ffi::zmq_ctx_term(self.ctx_ptr) };
190 if rc == -1 {
191 Err(Error::from_last_err())
192 } else {
193 Ok(())
194 }
195 }
196
197 pub fn shutdown(&mut self) -> Result<(), Error> {
205 let rc = unsafe { zmq_ffi::zmq_ctx_shutdown(self.ctx_ptr) };
206 if rc == -1 {
207 Err(Error::from_last_err())
208 } else {
209 Ok(())
210 }
211 }
212
213 getctxopt_template!(get_io_threads, IO_THREADS);
214 getctxopt_template!(get_max_sockets, MAX_SOCKETS);
215 getctxopt_template!(get_socket_limit, SOCKET_LIMIT);
216 getctxopt_template!(is_ipv6_enabled, IPV6, |r| { r > 0 }, bool);
217
218 setctxopt_template!(set_io_threads, IO_THREADS);
219 setctxopt_template!(set_max_sockets, MAX_SOCKETS);
220 setctxopt_template!(set_thread_priority, THREAD_PRIORITY);
221 setctxopt_template!(set_thread_sched_policy, THREAD_SCHED_POLICY);
222 setctxopt_template!(set_ipv6, IPV6);
223
224 pub fn socket(&self, t: SocketType) -> Result<Socket, Error> {
233 let socket = unsafe { zmq_ffi::zmq_socket(self.ctx_ptr, t as c_int) };
234 ret_when_null!(socket);
235 Ok(Socket::from_raw(socket))
236 }
237}
238
239unsafe impl Send for Context {}
240unsafe impl Sync for Context {}
241
242impl Drop for Context {
243 fn drop(&mut self) {
244 loop {
245 match self.term() {
246 Ok(_) => { },
247 Err(e) => {
248 if e.get_errno() == EINTR {
249 continue;
250 } else {
251 break;
252 }
253 }
254 }
255 }
256
257 }
258}
259
260const MSG_SIZE: usize = 64;
261
262pub struct Message {
263 msg: zmq_ffi::zmq_msg_t,
264}
265
266unsafe extern "C" fn zmq_free_fn(data: *mut c_void, hint: *mut c_void) {
267 let slice = slice::from_raw_parts_mut(data as *mut u8, hint as usize);
268 let _: Box<[u8]> = Box::from_raw(slice);
269}
270
271impl Message {
272 pub fn new() -> Result<Message, Error> {
279 let mut msg = zmq_ffi::zmq_msg_t { unknown: [0; MSG_SIZE] };
280 let rc = unsafe { zmq_ffi::zmq_msg_init(&mut msg) };
281 if rc == -1 {
282 Err(Error::from_last_err())
283 } else {
284 Ok(Message { msg: msg })
285 }
286 }
287
288 pub fn with_capcity(len: usize) -> Result<Message, Error> {
295 let mut msg = zmq_ffi::zmq_msg_t { unknown: [0; MSG_SIZE] };
296 let rc = unsafe { zmq_ffi::zmq_msg_init_size(&mut msg, len as size_t) };
297 if rc == -1 {
298 Err(Error::from_last_err())
299 } else {
300 Ok(Message { msg: msg })
301 }
302 }
303
304 pub fn from_vec(vec: Vec<u8>) -> Result<Message, Error> {
314 let len = vec.len() as size_t;
315 let data = vec.into_boxed_slice();
316
317 let mut msg = zmq_ffi::zmq_msg_t { unknown: [0; MSG_SIZE] };
318 let rc = unsafe {
319 zmq_ffi::zmq_msg_init_data(&mut msg, Box::into_raw(data) as *mut c_void, len,
320 zmq_free_fn, len as *mut _)
321 };
322 if rc == -1 {
323 Err(Error::from_last_err())
324 } else {
325 Ok(Message { msg: msg })
326 }
327 }
328
329 pub fn from_slice(data: &[u8]) -> Result<Message, Error> {
330 unsafe {
331 let mut msg = try!(Message::with_capcity(data.len()));
332 std::ptr::copy_nonoverlapping(data.as_ptr(), msg.as_mut_ptr(), data.len());
333 Ok(msg)
334 }
335 }
336
337 pub fn msg_move(dest: &mut Message, src: &mut Message) -> Result<(), Error> {
347 let rc = unsafe {
348 zmq_ffi::zmq_msg_move(&mut dest.msg, &mut src.msg)
349 };
350 if rc == -1 {
351 Err(Error::from_last_err())
352 } else {
353 Ok(())
354 }
355 }
356
357 pub fn msg_copy(dest: &mut Message, src: &Message) -> Result<(), Error> {
364 let rc = unsafe {
365 zmq_ffi::zmq_msg_copy(&mut dest.msg, transmute(&src.msg))
366 };
367 if rc == -1 {
368 Err(Error::from_last_err())
369 } else {
370 Ok(())
371 }
372 }
373
374 pub unsafe fn get_data_ptr(&mut self) -> *mut c_void {
380 zmq_ffi::zmq_msg_data(&mut self.msg)
381 }
382
383 pub unsafe fn get_const_data_ptr(&self) -> *const c_void {
389 zmq_ffi::zmq_msg_data(transmute(&self.msg))
390 }
391
392 pub fn len(&self) -> usize {
398 unsafe { zmq_ffi::zmq_msg_size(transmute(&self.msg)) }
399 }
400
401 pub fn has_more(&self) -> bool {
408 unsafe { zmq_ffi::zmq_msg_more(transmute(&self.msg)) > 0 }
409 }
410
411 pub fn get_property(&self, property: MessageProperty) -> Result<i32, Error> {
417 let rc = unsafe { zmq_ffi::zmq_msg_get(transmute(&self.msg), property as c_int) };
418 if rc == -1 {
419 Err(Error::from_last_err())
420 } else {
421 Ok(rc)
422 }
423 }
424
425 pub fn get_meta<'a>(&'a self, property: &str) -> Option<&'a str> {
442 let prop_cstr = ffi::CString::new(property).unwrap();
443
444 let returned_str_ptr = unsafe { zmq_ffi::zmq_msg_gets(transmute(&self.msg), transmute(prop_cstr.as_ptr())) };
445 if returned_str_ptr.is_null() {
446 None
447 } else {
448 unsafe { Some(ffi::CStr::from_ptr(returned_str_ptr).to_str().unwrap()) }
449 }
450 }
451}
452
453impl Deref for Message {
454 type Target = [u8];
455
456 fn deref<'a>(&'a self) -> &'a [u8] {
457 unsafe {
458 let ptr = self.get_const_data_ptr();
459 let len = self.len() as usize;
460 slice::from_raw_parts(transmute(ptr), len)
461 }
462 }
463}
464
465impl DerefMut for Message {
466 fn deref_mut<'a>(&'a mut self) -> &'a mut [u8] {
467 unsafe {
468 let ptr = self.get_data_ptr();
469 let len = self.len() as usize;
470 slice::from_raw_parts_mut(transmute(ptr), len)
471 }
472 }
473}
474
475impl Drop for Message {
476 fn drop(&mut self) {
477 loop {
478 let rc = unsafe { zmq_ffi::zmq_msg_close(&mut self.msg) };
479 if rc != 0 {
480 let e = Error::from_last_err();
481 if e.get_errno() == EINTR {
482 continue;
483 } else {
484 panic!(e);
485 }
486
487 } else {
488 break;
489 }
490 }
491 }
492}
493
494pub type SocketType = c_int;
495pub const PAIR: SocketType = 0;
496pub const PUB: SocketType = 1;
497pub const SUB: SocketType = 2;
498pub const REQ: SocketType = 3;
499pub const REP: SocketType = 4;
500pub const DEALER: SocketType = 5;
501pub const ROUTER: SocketType = 6;
502pub const PULL: SocketType = 7;
503pub const PUSH: SocketType = 8;
504pub const XPUB: SocketType = 9;
505pub const XSUB: SocketType = 10;
506pub const STREAM: SocketType = 11;
507
508pub type MessageProperty = c_int;
509pub const MORE: MessageProperty = 1;
510pub const SRCFD: MessageProperty = 2;
511pub const SHARED: MessageProperty = 3;
512
513
514pub type SecurityMechanism = c_int;
515pub const ZMQ_NULL: SecurityMechanism = 0;
516pub const ZMQ_PLAIN: SecurityMechanism = 1;
517pub const ZMQ_CURVE: SecurityMechanism = 2;
518pub const ZMQ_GSSAPI: SecurityMechanism = 3;
519
520pub fn has_capability(capability: &str) -> bool {
526 let capability_cstr = ffi::CString::new(capability).unwrap();
527 let rc = unsafe { zmq_ffi::zmq_has(capability_cstr.as_ptr()) };
528 rc == 1
529}
530
531pub fn z85_encode(data: &[u8]) -> Result<String, Error> {
542 let len = data.len() as i32 * 5 / 4 + 1;
543 let mut dest: Vec<u8> = Vec::with_capacity(len as usize);
544
545 let rc = unsafe { zmq_ffi::zmq_z85_encode(transmute(dest.as_mut_ptr()), data.as_ptr(), data.len()) };
546 if rc.is_null() {
547 Err(Error::from_last_err())
548 } else {
549 unsafe {
550 dest.set_len(len as usize);
551 let cstr = ffi::CStr::from_ptr(transmute(dest.as_ptr()));
552
553 Ok(String::from_utf8(cstr.to_bytes().to_vec()).unwrap())
554 }
555 }
556}
557
558pub fn z85_decode(encoded: &str) -> Result<Vec<u8>, Error> {
564 let encoded_cstr = ffi::CString::new(encoded).unwrap();
565 let len = (encoded_cstr.as_bytes().len() as i32 * 4 / 5) as i32;
566 let mut dest: Vec<u8> = Vec::with_capacity(len as usize);
567
568 let rc = unsafe { zmq_ffi::zmq_z85_decode(dest.as_mut_ptr(), encoded_cstr.as_ptr()) };
569 if rc.is_null() {
570 Err(Error::from_last_err())
571 } else {
572 unsafe {
573 dest.set_len(len as usize);
574 }
575 Ok(dest)
576 }
577}
578
579
580pub fn gen_curve_keypair() -> Result<(String, String), Error> {
587 let mut public_key: Vec<u8> = Vec::with_capacity(41);
588 let mut secret_key: Vec<u8> = Vec::with_capacity(41);
589
590 let rc = unsafe {
591 zmq_ffi::zmq_curve_keypair(
592 transmute(public_key.as_mut_ptr()),
593 transmute(secret_key.as_mut_ptr())
594 )
595 };
596 if rc == -1 {
597 Err(Error::from_last_err())
598 } else {
599 unsafe {
600 public_key.set_len(40);
601 secret_key.set_len(40);
602 }
603 Ok((String::from_utf8(public_key).unwrap(), String::from_utf8(secret_key).unwrap()))
604 }
605}