zmq_rs/
socket.rs

1extern crate libc;
2
3use std::mem::transmute;
4use std::ffi;
5
6use libc::{ c_int, c_void, size_t, c_short, c_long, c_ushort };
7use super::{ Error, Message, SocketType };
8use ::std;
9use ::zmq_ffi;
10
11fn bytes_to_string(bytes: Vec<u8>) -> String {
12    unsafe { ffi::CStr::from_ptr(transmute(bytes.as_ptr())).to_str().unwrap().to_string() }
13}
14
15fn str_to_cstr_bytes(s: &str) -> Vec<u8> {
16    let cstr = ffi::CString::new(s).unwrap();
17    //cstr.into_bytes_with_nul()            // currently unstable
18    cstr.as_bytes_with_nul().to_vec()
19}
20
21macro_rules! getsockopt_template(
22    // function name to declare, option name, query/return type
23    ($name: ident, $opt: expr, $t: ty) => {
24        pub fn $name(&self) -> Result<$t, Error> {
25            let mut optval: $t = std::default::Default::default();
26            let mut optval_len: size_t = std::mem::size_of::<$t>() as size_t;
27            let optval_ptr = &mut optval as *mut $t;
28
29            let rc = unsafe { zmq_ffi::zmq_getsockopt(self.socket, $opt as c_int, transmute(optval_ptr), &mut optval_len) };
30            if rc == -1 {
31                Err(Error::from_last_err())
32            } else {
33                Ok(optval)
34            }
35        }
36    };
37    // function name to declare, option name, query type, query count, map queried value to return value, return type
38    ($name: ident, $opt: expr, $t: ty, $n: expr, $rmap: expr, $r: ty) => {
39        pub fn $name(&self) -> Result<$r, Error> {
40            let mut optval: Vec<$t> = Vec::with_capacity($n);
41
42            let mut optval_len: size_t = (optval.capacity() * std::mem::size_of::<$t>()) as size_t;
43            let optval_ptr = optval.as_mut_ptr();
44
45            let rc = unsafe {
46                zmq_ffi::zmq_getsockopt(self.socket, $opt as c_int,
47                    transmute(optval_ptr), &mut optval_len)
48            };
49
50            if rc == -1 {
51                Err(Error::from_last_err())
52            } else {
53                unsafe { optval.set_len(optval_len); }
54                Ok($rmap(optval))
55            }
56        }
57    };
58    // function name to declare, option name, query type, map queried value to return value, return type
59    ($name: ident, $opt: expr, $t: ty, $rmap: expr, $r: ty) => {
60        pub fn $name(&self) -> Result<$r, Error> {
61            let mut optval: $t = std::default::Default::default();
62            let mut optval_len: size_t = std::mem::size_of::<$t>() as size_t;
63            let optval_ptr = &mut optval as *mut $t;
64
65            let rc = unsafe { zmq_ffi::zmq_getsockopt(self.socket, $opt as c_int, transmute(optval_ptr), &mut optval_len) };
66            if rc == -1 {
67                Err(Error::from_last_err())
68            } else {
69                Ok($rmap(optval))
70            }
71        }
72    };
73    // function name to declare, option name, query type, query count
74    ($name: ident, $opt: expr, $t: ty, $n: expr) => {
75        pub fn $name(&self) -> Result<Vec<$t>, Error> {
76            let mut optval: Vec<$t> = Vec::with_capacity($n);
77
78            let mut optval_len: size_t = (optval.capacity() * std::mem::size_of::<$t>()) as size_t;
79            let optval_ptr = optval.as_mut_ptr();
80
81            let rc = unsafe {
82                zmq_ffi::zmq_getsockopt(self.socket, $opt as c_int,
83                    transmute(optval_ptr), &mut optval_len)
84            };
85
86            if rc == -1 {
87                Err(Error::from_last_err())
88            } else {
89                unsafe { optval.set_len(optval_len); }
90                Ok(optval)
91            }
92        }
93    };
94);
95
96macro_rules! setsockopt_nullptr_template(
97    ($name: ident, $opt: expr) => {
98        pub fn $name(&self) -> Result<(), Error> {
99            let optval_len: size_t = 0;
100            let optval_ptr: *const u8 = std::ptr::null();
101
102            let rc = unsafe {
103                zmq_ffi::zmq_setsockopt(self.socket, $opt as c_int,
104                    transmute(optval_ptr), optval_len)
105            };
106
107            if rc == -1 {
108                Err(Error::from_last_err())
109            } else {
110                Ok(())
111            }
112        }
113    };
114);
115
116macro_rules! setsockopt_template(
117    // function name to declare, option name, optval type
118    ($name: ident, $opt: expr, $t: ty) => {
119        pub fn $name(&self, optval: &$t) -> Result<(), Error> {
120            let optval_len: size_t = std::mem::size_of::<$t>() as size_t;
121            let optval_ptr = optval as *const $t;
122
123            let rc = unsafe { zmq_ffi::zmq_setsockopt(self.socket, $opt as c_int, transmute(optval_ptr), optval_len) };
124            if rc == -1 {
125                Err(Error::from_last_err())
126            } else {
127                Ok(())
128            }
129        }
130    };
131
132    // function name to declare, option name
133    ($name: ident, $opt: expr) => {
134        pub fn $name(&self, optval: &[u8]) -> Result<(), Error> {
135            let optval_len: size_t = optval.len() as size_t;
136            let optval_ptr: *const u8 = optval.as_ptr();
137
138            let rc = unsafe {
139                zmq_ffi::zmq_setsockopt(self.socket, $opt as c_int,
140                    transmute(optval_ptr), optval_len)
141            };
142
143            if rc == -1 {
144                Err(Error::from_last_err())
145            } else {
146                Ok(())
147            }
148        }
149    };
150
151    ($name: ident, $opt: expr, $t: ty, $map: expr) => {
152        pub fn $name(&self, optval: $t) -> Result<(), Error> {
153            let optval: Vec<u8> = $map(optval);
154            let optval_len: size_t = optval.len() as size_t;
155
156            let optval_ptr: *const u8 = optval.as_ptr();
157
158            let rc = unsafe {
159                zmq_ffi::zmq_setsockopt(self.socket, $opt as c_int,
160                    transmute(optval_ptr), optval_len)
161            };
162
163            if rc == -1 {
164                Err(Error::from_last_err())
165            } else {
166                Ok(())
167            }
168        }
169    };
170);
171
172#[allow(non_camel_case_types)]
173#[derive(Copy, Clone, Debug)]
174enum SocketOption {
175    AFFINITY = 4,
176    IDENTITY = 5,
177    SUBSCRIBE = 6,
178    UNSUBSCRIBE = 7,
179    RATE = 8,
180    RECOVERY_IVL = 9,
181    SNDBUF = 11,
182    RCVBUF = 12,
183    RCVMORE = 13,
184    FD = 14,
185    EVENTS = 15,
186    TYPE = 16,
187    LINGER = 17,
188    RECONNECT_IVL = 18,
189    BACKLOG = 19,
190    RECONNECT_IVL_MAX = 21,
191    MAXMSGSIZE = 22,
192    SNDHWM = 23,
193    RCVHWM = 24,
194    MULTICAST_HOPS = 25,
195    RCVTIMEO = 27,
196    SNDTIMEO = 28,
197    LAST_ENDPOINT = 32,
198    ROUTER_MANDATORY = 33,
199    TCP_KEEPALIVE = 34,
200    TCP_KEEPALIVE_CNT = 35,
201    TCP_KEEPALIVE_IDLE = 36,
202    TCP_KEEPALIVE_INTVL = 37,
203    IMMEDIATE = 39,
204    XPUB_VERBOSE = 40,
205    ROUTER_RAW = 41,
206    IPV6 = 42,
207    MECHANISM = 43,
208    PLAIN_SERVER = 44,
209    PLAIN_USERNAME = 45,
210    PLAIN_PASSWORD = 46,
211    CURVE_SERVER = 47,
212    CURVE_PUBLICKEY = 48,
213    CURVE_SECRETKEY = 49,
214    CURVE_SERVERKEY = 50,
215    PROBE_ROUTER = 51,
216    REQ_CORRELATE = 52,
217    REQ_RELAXED = 53,
218    CONFLATE = 54,
219    ZAP_DOMAIN = 55,
220    ROUTER_HANDOVER = 56,
221    TOS = 57,
222    CONNECT_RID = 61,
223    GSSAPI_SERVER = 62,
224    GSSAPI_PRINCIPAL = 63,
225    GSSAPI_SERVICE_PRINCIPAL = 64,
226    GSSAPI_PLAINTEXT = 65,
227    HANDSHAKE_IVL = 66,
228    SOCKS_PROXY = 68,
229    XPUB_NODROP = 69,
230}
231
232pub type SocketEvent = c_ushort;
233
234pub const CONNECTED: SocketEvent = 0x0001;
235pub const CONNECT_DELAYED: SocketEvent = 0x0002;
236pub const CONNECT_RETRIED: SocketEvent = 0x0004;
237pub const LISTENING: SocketEvent = 0x0008;
238pub const BIND_FAILED: SocketEvent = 0x0010;
239pub const ACCEPTED: SocketEvent = 0x0020;
240pub const ACCEPT_FAILED: SocketEvent = 0x0040;
241pub const CLOSED: SocketEvent = 0x0080;
242pub const CLOSE_FAILED: SocketEvent = 0x0100;
243pub const DISCONNECTED: SocketEvent = 0x0200;
244pub const MONITOR_STOPPED: SocketEvent = 0x0400;
245pub const ALL: SocketEvent = 0xFFFF;
246
247pub type SocketFlag = c_int;
248
249pub const DONTWAIT: SocketFlag = 1;
250pub const SNDMORE: SocketFlag = 2;
251
252pub struct Socket {
253    socket: *mut c_void,
254    closed: bool,
255}
256
257unsafe impl Send for Socket {}
258
259impl Socket {
260    pub fn from_raw(socket: *mut c_void) -> Socket {
261        Socket { socket: socket, closed: false }
262    }
263
264    /// Close 0MQ socket
265    ///
266    /// Binding of `int zmq_close (void *s);`
267    ///
268    /// It's not mandatory to call this function since socket can be closed automatically on dropping
269    /// The function will destroy this socket.
270    /// Any outstanding messages physically received from the network
271    /// but not yet received by the application with recv() shall be discarded.
272    /// The behaviour for discarding messages sent by the application with send()
273    /// but not yet physically transferred to the network depends on the value of
274    /// the ZMQ_LINGER socket option for the specified socket.
275    pub fn close(&mut self) -> Result<(), Error> {
276        self.closed = true;
277        self.close_underly()
278    }
279
280    fn close_underly(&mut self) -> Result<(), Error> {
281        loop {
282            let rc = unsafe { zmq_ffi::zmq_close(self.socket) };
283            if rc != 0 {
284                let e = Error::from_last_err();
285                if e.get_errno() == ::errno::EINTR {
286                    continue;
287                } else {
288                    return Err(e);
289                }
290
291            } else {
292                break;
293            }
294        }
295        Ok(())
296    }
297
298    ///  Accept incoming connections on a socket
299    ///
300    /// Binding of `int zmq_bind (void *socket, const char *endpoint);`
301    ///
302    /// The function binds the socket to a local endpoint and then accepts incoming connections on that endpoint.
303    pub fn bind(&mut self, endpoint: &str) -> Result<(), Error> {
304        let endpoint_cstr = ffi::CString::new(endpoint).unwrap();
305        let rc = unsafe { zmq_ffi::zmq_bind(self.socket, endpoint_cstr.as_ptr()) };
306        if rc == -1 {
307            Err(Error::from_last_err())
308        } else {
309            Ok(())
310        }
311    }
312
313    /// Create outgoing connection from socket
314    ///
315    /// Binding of `int zmq_connect (void *socket, const char *endpoint);`
316    ///
317    /// The function connects the socket to an endpoint and then accepts incoming connections on that endpoint.
318    pub fn connect(&mut self, endpoint: &str) -> Result<(), Error> {
319        let endpoint_cstr = ffi::CString::new(endpoint).unwrap();
320        let rc = unsafe { zmq_ffi::zmq_connect(self.socket, endpoint_cstr.as_ptr()) };
321        if rc == -1 {
322            Err(Error::from_last_err())
323        } else {
324            Ok(())
325        }
326    }
327
328    /// Stop accepting connections on a socket
329    ///
330    /// Binding of `int zmq_unbind (void *socket, const char *endpoint);`
331    ///
332    /// The function will unbind a socket specified by the socket argument from the endpoint specified by the endpoint argument.
333    pub fn unbind(&mut self, endpoint: &str) -> Result<(), Error> {
334        let endpoint_cstr = ffi::CString::new(endpoint).unwrap();
335        let rc = unsafe { zmq_ffi::zmq_unbind(self.socket, endpoint_cstr.as_ptr()) };
336        if rc == -1 {
337            Err(Error::from_last_err())
338        } else {
339            Ok(())
340        }
341    }
342
343    /// Disconnect a socket
344    ///
345    /// Binding of `int zmq_disconnect (void *socket, const char *endpoint);`
346    ///
347    /// The function will disconnect socket from the endpoint specified by the endpoint argument.
348    /// Any outstanding messages physically received from the network but not yet received by the application with recv() will be discarded.
349    /// The behaviour for discarding messages sent by the application with send() but
350    /// not yet physically transferred to the network depends on the value of the ZMQ_LINGER socket option for the socket.
351    pub fn disconnect(&mut self, endpoint: &str) -> Result<(), Error> {
352        let endpoint_cstr = ffi::CString::new(endpoint).unwrap();
353        let rc = unsafe { zmq_ffi::zmq_disconnect(self.socket, endpoint_cstr.as_ptr()) };
354        if rc == -1 {
355            Err(Error::from_last_err())
356        } else {
357            Ok(())
358        }
359    }
360
361    /// Send a message part on a socket
362    ///
363    /// Binding of `int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags);`
364    pub fn send_msg(&mut self, mut msg: Message, flags: SocketFlag) -> Result<i32, Error> {
365        let rc = unsafe { zmq_ffi::zmq_msg_send(&mut msg.msg, self.socket, flags) };
366        if rc == -1 {
367            Err(Error::from_last_err())
368        } else {
369            Ok(rc)
370        }
371    }
372
373    /// Receive a message part from a socket
374    ///
375    /// Binding of `int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);`
376    pub fn recv_into_msg(&mut self, msg: &mut Message, flags: SocketFlag) -> Result<i32, Error> {
377        let rc = unsafe { zmq_ffi::zmq_msg_recv(&mut msg.msg, self.socket, flags) };
378        if rc == -1 {
379            Err(Error::from_last_err())
380        } else {
381            Ok(rc)
382        }
383    }
384
385    /// Receive a message part from a socket
386    ///
387    /// Binding of `int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);`
388    pub fn recv_msg(&mut self, flags: SocketFlag) -> Result<Message, Error> {
389        let mut msg = try!(Message::new());
390        match self.recv_into_msg(&mut msg, flags) {
391            Ok(_) => Ok(msg),
392            Err(e) => Err(e),
393        }
394    }
395
396    /// Send bytes on a socket
397    ///
398    /// Data will be copied into a Message object in order to be sent.
399    pub fn send_bytes(&mut self, data: &[u8], flags: SocketFlag) -> Result<i32, Error> {
400        let msg = try!(Message::from_slice(data));
401        self.send_msg(msg, flags)
402    }
403
404    /// Send a constant-memory message part on a socket
405    ///
406    /// Binding of `ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags);`
407    ///
408    /// The message buffer is assumed to be constant-memory(static) and will therefore not be copied or deallocated in any way
409    pub fn send_const_bytes(&mut self, data: &'static [u8], flags: SocketFlag) -> Result<i32, Error> {
410        let rc = unsafe { zmq_ffi::zmq_send_const(self.socket, transmute(data.as_ptr()), data.len(), flags) };
411        if rc == -1 {
412            Err(Error::from_last_err())
413        } else {
414            Ok(rc)
415        }
416    }
417
418    /// Send a UTF-8 string on socket
419    pub fn send_str(&mut self, data: &str, flags: SocketFlag) -> Result<i32, Error> {
420        self.send_bytes(data.as_bytes(), flags)
421    }
422
423    /// Receive bytes from a socket
424    pub fn recv_bytes(&mut self, flags: SocketFlag) -> Result<Vec<u8>, Error> {
425        match self.recv_msg(flags) {
426            Ok(msg) => Ok(msg.to_vec()),
427            Err(e) => Err(e),
428        }
429    }
430
431    /// Receive bytes into a mutable slice
432    /// # Caution
433    /// *Any bytes exceeding the length of buffer will be truncated.*
434    pub fn recv_bytes_into_slice(&mut self, buffer: &mut [u8], flags: SocketFlag) -> Result<i32, Error> {
435        let rc = unsafe { zmq_ffi::zmq_recv(self.socket, transmute(buffer.as_mut_ptr()), buffer.len(), flags) };
436        if rc == -1 {
437            Err(Error::from_last_err())
438        } else {
439            Ok(rc)
440        }
441    }
442
443    /// Receive a UTF-8 string from socket
444    pub fn recv_string(&mut self, flags: SocketFlag) -> Result<Result<String, Vec<u8>>, Error> {
445        match self.recv_bytes(flags) {
446            Ok(msg) => {
447                Ok({
448                    let s = String::from_utf8(msg);
449                    if s.is_ok() {
450                        Ok(s.unwrap())
451                    } else {
452                        Err(s.unwrap_err().into_bytes())
453                    }
454                })
455            }
456            Err(e) => Err(e),
457        }
458    }
459
460    /// Monitor socket events
461    ///
462    /// Binding of `int zmq_socket_monitor (void *socket, char *endpoint, int events);`
463    ///
464    /// The method lets an application thread track socket events (like connects) on a ZeroMQ socket
465    pub fn socket_monitor(&mut self, endpoint: &str, events: &Vec<SocketEvent>) -> Result<(), Error> {
466        let mut event_mask: i32 = 0;
467        for event in events {
468            event_mask |= Clone::clone(event) as i32;
469        }
470
471        let endpoint_cstr = ffi::CString::new(endpoint).unwrap();
472        let rc = unsafe { zmq_ffi::zmq_socket_monitor(self.socket, endpoint_cstr.as_ptr(), event_mask) };
473        if rc == -1 {
474            Err(Error::from_last_err())
475        } else {
476            Ok(())
477        }
478    }
479
480    /// Start built-in 0MQ proxy
481    ///
482    /// Binding of `int zmq_proxy (const void *frontend, const void *backend, const void *capture);`
483    ///
484    /// The function starts the built-in ØMQ proxy in the current application thread.
485    pub fn run_proxy(frontend: &mut Socket, backend: &mut Socket) -> Result<(), Error> {
486        let rc = unsafe { zmq_ffi::zmq_proxy(frontend.socket, backend.socket, std::ptr::null_mut()) };
487        if rc == -1 {
488            Err(Error::from_last_err())
489        } else {
490            Ok(())
491        }
492    }
493
494    /// Start built-in 0MQ proxy
495    ///
496    /// Binding of `int zmq_proxy (const void *frontend, const void *backend, const void *capture);` or
497    /// `int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control);`
498    ///
499    /// The function starts the built-in ØMQ proxy in the current application thread.
500    /// The proxy will send all messages, received on both frontend and backend, to the capture socket.
501    /// The capture socket should be a ZMQ_PUB, ZMQ_DEALER, ZMQ_PUSH, or ZMQ_PAIR socket.
502    /// If the control socket is not None, the proxy supports control flow.
503    /// If PAUSE is received on this socket, the proxy suspends its activities.
504    /// If RESUME is received, it goes on. If TERMINATE is received, it terminates smoothly.
505    /// At start, the proxy runs normally as if run_proxy was used.
506    pub fn run_proxy_ex(frontend: &mut Socket, backend: &mut Socket,
507    capture: Option<&mut Socket>, control: Option<&mut Socket>) -> Result<(), Error> {
508        let capture_ptr = if capture.is_none() { std::ptr::null_mut() } else { capture.unwrap().socket };
509
510        let rc = {
511            if control.is_none() {
512                unsafe { zmq_ffi::zmq_proxy(frontend.socket, backend.socket, capture_ptr) }
513            } else {
514                unsafe { zmq_ffi::zmq_proxy_steerable(frontend.socket, backend.socket, capture_ptr, control.unwrap().socket) }
515            }
516        };
517        if rc == -1 {
518            Err(Error::from_last_err())
519        } else {
520            Ok(())
521        }
522    }
523
524    /// Create a poll item from current socket
525    ///
526    /// # Safty
527    /// There is no lifetime guarantee that poll item does not live out socket
528    pub fn as_poll_item(&self) -> PollItem {
529        PollItem::from_socket(&self)
530    }
531
532    ///  input/output multiplexing
533    ///
534    /// Binding of `int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);`
535    pub fn poll(items: &mut [PollItem], nitems: i32, timeout: i32) -> Result<i32, Error> {
536        let rc = unsafe { zmq_ffi::zmq_poll(transmute(items.as_mut_ptr()), nitems as c_int, timeout as c_long) };
537        if rc == -1 {
538            Err(Error::from_last_err())
539        } else {
540            Ok(rc)
541        }
542    }
543
544    //-------------------------------- get options ----------------------------------- //
545
546    getsockopt_template!(get_affinity, SocketOption::AFFINITY, u64);
547    getsockopt_template!(get_backlog, SocketOption::BACKLOG, i32);
548    getsockopt_template!(get_curve_publickey, SocketOption::CURVE_PUBLICKEY, u8, 32);
549    getsockopt_template!(get_curve_printable_publickey, SocketOption::CURVE_PUBLICKEY, u8, 41,
550        |r: Vec<u8>| {
551            bytes_to_string(r)
552        }, String);
553    getsockopt_template!(get_curve_secretkey, SocketOption::CURVE_SECRETKEY, u8, 32);
554    getsockopt_template!(get_curve_printable_secretkey, SocketOption::CURVE_SECRETKEY, u8, 41,
555        |r: Vec<u8>| {
556            bytes_to_string(r)
557        }, String);
558    getsockopt_template!(get_curve_serverkey, SocketOption::CURVE_SERVERKEY, u8, 32);
559    getsockopt_template!(get_curve_printable_serverkey, SocketOption::CURVE_SERVERKEY, u8, 41,
560        |r: Vec<u8>| {
561            bytes_to_string(r)
562        }, String);
563    getsockopt_template!(get_events, SocketOption::EVENTS, PollEvent);
564    getsockopt_template!(get_fd, SocketOption::FD, SocketFd);
565    getsockopt_template!(is_gssapi_plaintext, SocketOption::GSSAPI_PLAINTEXT, i32, |r| { r > 0 }, bool);
566    getsockopt_template!(get_gssapi_principal, SocketOption::GSSAPI_PRINCIPAL, u8, 256,
567        |r: Vec<u8>| {
568            bytes_to_string(r)
569        }, String);
570    getsockopt_template!(is_gssapi_server, SocketOption::GSSAPI_SERVER, i32, |r| { r > 0 }, bool);
571    getsockopt_template!(get_gssapi_service_principal, SocketOption::GSSAPI_SERVICE_PRINCIPAL, u8, 256,
572        |r: Vec<u8>| {
573            bytes_to_string(r)
574        }, String);
575    getsockopt_template!(get_handshake_ivl, SocketOption::HANDSHAKE_IVL, i32);
576    getsockopt_template!(get_identity, SocketOption::IDENTITY, u8, 256);
577    getsockopt_template!(is_immediate, SocketOption::IMMEDIATE, i32, |r| { r > 0 }, bool);
578    //getsockopt_template!(get_ipv4only, SocketOption::IPV4ONLY, i32);      // deprecated
579    getsockopt_template!(is_ipv6_enabled, SocketOption::IPV6, i32, |r| { r > 0 }, bool);
580    /// Get last endpoint bound for TCP and IPC transports
581    /// if last endpoint has more than 2048 bytes, method call will be failed.
582    getsockopt_template!(get_last_endpoint, SocketOption::LAST_ENDPOINT, u8, 2048,
583        |r: Vec<u8>| {
584            bytes_to_string(r)
585        }, String);
586    getsockopt_template!(get_linger, SocketOption::LINGER, i32);
587    getsockopt_template!(get_max_msg_size, SocketOption::MAXMSGSIZE, i64);
588    getsockopt_template!(get_mechanism, SocketOption::MECHANISM, i32);
589    getsockopt_template!(get_multicast_hops, SocketOption::MULTICAST_HOPS, i32);
590    getsockopt_template!(get_plain_password, SocketOption::PLAIN_PASSWORD, u8, 256,
591        |r: Vec<u8>| {
592            bytes_to_string(r)
593        }, String);
594    getsockopt_template!(is_plain_server, SocketOption::PLAIN_SERVER, i32, |r| { r > 0 }, bool);
595    getsockopt_template!(get_plain_username, SocketOption::PLAIN_USERNAME, u8, 256,
596        |r: Vec<u8>| {
597            bytes_to_string(r)
598        }, String);
599    getsockopt_template!(get_rate, SocketOption::RATE, i32);
600    getsockopt_template!(get_rcvbuf, SocketOption::RCVBUF, i32);
601    getsockopt_template!(get_rcvhwm, SocketOption::RCVHWM, i32);
602    getsockopt_template!(can_rcvmore, SocketOption::RCVMORE, i32, |r| { r > 0 }, bool);
603    getsockopt_template!(get_rcvtimeo, SocketOption::RCVTIMEO, i32);
604    getsockopt_template!(get_reconnect_ivl, SocketOption::RECONNECT_IVL, i32);
605    getsockopt_template!(get_reconnect_ivl_max, SocketOption::RECONNECT_IVL_MAX, i32);
606    getsockopt_template!(get_recovery_ivl, SocketOption::RECOVERY_IVL, i32);
607    getsockopt_template!(get_sndbuf, SocketOption::SNDBUF, i32);
608    getsockopt_template!(get_sndhwm, SocketOption::SNDHWM, i32);
609    getsockopt_template!(get_sndtimeo, SocketOption::SNDTIMEO, i32);
610    getsockopt_template!(get_tcp_keepalive, SocketOption::TCP_KEEPALIVE, i32);
611    getsockopt_template!(get_tcp_keepalive_cnt, SocketOption::TCP_KEEPALIVE_CNT, i32);
612    getsockopt_template!(get_tcp_keepalive_idle, SocketOption::TCP_KEEPALIVE_IDLE, i32);
613    getsockopt_template!(get_tcp_keepalive_intvl, SocketOption::TCP_KEEPALIVE_INTVL, i32);
614    getsockopt_template!(get_tos, SocketOption::TOS, i32);
615    getsockopt_template!(get_type, SocketOption::TYPE, SocketType);
616    getsockopt_template!(get_zap_domain, SocketOption::ZAP_DOMAIN, u8, 256,
617        |r: Vec<u8>| {
618            bytes_to_string(r)
619        }, String);
620
621    //-------------------------------- set options ----------------------------------- //
622    setsockopt_template!(set_affinity, SocketOption::AFFINITY, u64);
623    setsockopt_template!(set_backlog, SocketOption::BACKLOG, i32);
624    setsockopt_template!(set_connect_rid, SocketOption::CONNECT_RID);
625    setsockopt_template!(set_conflate, SocketOption::CONFLATE, i32);
626    setsockopt_template!(set_curve_publickey, SocketOption::CURVE_PUBLICKEY);
627    setsockopt_template!(set_curve_plaintext_publickey, SocketOption::CURVE_PUBLICKEY, &str,
628        |s| { str_to_cstr_bytes(s) });
629    setsockopt_template!(set_curve_secretkey, SocketOption::CURVE_SECRETKEY);
630    setsockopt_template!(set_curve_plaintext_secretkey, SocketOption::CURVE_SECRETKEY, &str,
631        |s| { str_to_cstr_bytes(s) });
632    setsockopt_template!(set_curve_server, SocketOption::CURVE_SERVER, i32);
633    setsockopt_template!(set_curve_serverkey, SocketOption::CURVE_SERVERKEY);
634    setsockopt_template!(set_curve_plaintext_serverkey, SocketOption::CURVE_SERVERKEY, &str,
635        |s| { str_to_cstr_bytes(s) });
636    setsockopt_template!(set_gssapi_plaintext, SocketOption::GSSAPI_PLAINTEXT, i32);
637    setsockopt_template!(set_gssapi_principal, SocketOption::GSSAPI_PRINCIPAL, &str,
638        |s| { str_to_cstr_bytes(s) });
639    setsockopt_template!(set_gssapi_server, SocketOption::GSSAPI_SERVER, i32);
640    setsockopt_template!(set_gssapi_service_principal, SocketOption::GSSAPI_SERVICE_PRINCIPAL, &str,
641        |s| { str_to_cstr_bytes(s) });
642    setsockopt_template!(set_handshake_ivl, SocketOption::HANDSHAKE_IVL, i32);
643    setsockopt_template!(set_identity, SocketOption::IDENTITY);
644    setsockopt_template!(set_immediate, SocketOption::IMMEDIATE, i32);
645    setsockopt_template!(set_ipv6, SocketOption::IPV6, i32);
646    setsockopt_template!(set_linger, SocketOption::LINGER, i32);
647    setsockopt_template!(set_max_msg_size, SocketOption::MAXMSGSIZE, i64);
648    setsockopt_template!(set_multicast_hops, SocketOption::MULTICAST_HOPS, i32);
649    setsockopt_template!(set_plain_password, SocketOption::PLAIN_PASSWORD, &str,
650        |s| { str_to_cstr_bytes(s) });
651    setsockopt_nullptr_template!(set_plain_password_empty, SocketOption::PLAIN_PASSWORD);
652    setsockopt_template!(set_plain_server, SocketOption::PLAIN_SERVER, i32);
653    setsockopt_template!(set_plain_username, SocketOption::PLAIN_USERNAME, &str,
654        |s| { str_to_cstr_bytes(s) });
655    setsockopt_nullptr_template!(set_plain_username_empty, SocketOption::PLAIN_USERNAME);
656    setsockopt_template!(set_probe_router, SocketOption::PROBE_ROUTER, i32);
657    setsockopt_template!(set_rate, SocketOption::RATE, i32);
658    setsockopt_template!(set_rcvbuf, SocketOption::RCVBUF, i32);
659    setsockopt_template!(set_rcvhwm, SocketOption::RCVHWM, i32);
660    setsockopt_template!(set_rcvtimeo, SocketOption::RCVTIMEO, i32);
661    setsockopt_template!(set_reconnect_ivl, SocketOption::RECONNECT_IVL, i32);
662    setsockopt_template!(set_reconnect_ivl_max, SocketOption::RECONNECT_IVL_MAX, i32);
663    setsockopt_template!(set_recovery_ivl, SocketOption::RECOVERY_IVL, i32);
664    setsockopt_template!(set_req_correlate, SocketOption::REQ_CORRELATE, i32);
665    setsockopt_template!(set_req_relaxed, SocketOption::REQ_RELAXED, i32);
666    setsockopt_template!(set_router_handover, SocketOption::ROUTER_HANDOVER, i32);
667    setsockopt_template!(set_router_mandatory, SocketOption::ROUTER_MANDATORY, i32);
668    setsockopt_template!(set_router_raw, SocketOption::ROUTER_RAW, i32);
669    setsockopt_template!(set_sndbuf, SocketOption::SNDBUF, i32);
670    setsockopt_template!(set_sndhwm, SocketOption::SNDHWM, i32);
671    setsockopt_template!(set_sndtimeo, SocketOption::SNDTIMEO, i32);
672    setsockopt_template!(set_subscribe, SocketOption::SUBSCRIBE);
673    setsockopt_template!(set_tcp_keepalive, SocketOption::TCP_KEEPALIVE, i32);
674    setsockopt_template!(set_tcp_keepalive_cnt, SocketOption::TCP_KEEPALIVE_CNT, i32);
675    setsockopt_template!(set_tcp_keepalive_idle, SocketOption::TCP_KEEPALIVE_IDLE, i32);
676    setsockopt_template!(set_tcp_keepalive_intvl, SocketOption::TCP_KEEPALIVE_INTVL, i32);
677    setsockopt_template!(set_tos, SocketOption::TOS, i32);
678    setsockopt_template!(set_unsubscribe, SocketOption::UNSUBSCRIBE);
679    setsockopt_template!(set_xpub_verbose, SocketOption::XPUB_VERBOSE, i32);
680    setsockopt_template!(set_zqp_domain, SocketOption::ZAP_DOMAIN, &str,
681        |s| { str_to_cstr_bytes(s) });
682}
683
684impl Drop for Socket {
685    fn drop(&mut self) {
686        if !self.closed {
687            self.close_underly().unwrap();
688        }
689    }
690}
691
692pub type PollEvent = c_int;
693
694pub const POLLIN: PollEvent = 1;
695pub const POLLOUT: PollEvent = 2;
696pub const POLLERR: PollEvent = 4;
697
698#[cfg(target_os = "windows")]
699pub type SocketFd = ::libc::intptr_t;
700#[cfg(not(target_os = "windows"))]
701pub type SocketFd = c_int;
702
703#[repr(C)]
704pub struct PollItem {
705    pub socket: *mut c_void,
706    pub fd: SocketFd,
707    pub events: c_short,
708    pub revents: c_short,
709}
710
711impl PollItem {
712    pub fn from_socket(socket: &Socket) -> PollItem {
713        PollItem {
714            socket: socket.socket,
715            fd: 0,
716            events: 0,
717            revents: 0,
718        }
719    }
720
721    pub fn from_fd(fd: SocketFd) -> PollItem {
722        PollItem {
723            socket: std::ptr::null_mut(),
724            fd: fd,
725            events: 0,
726            revents: 0,
727        }
728    }
729
730    pub fn set_socket(&mut self, socket: &Socket) -> &mut PollItem {
731        self.socket = socket.socket;
732        self.fd = 0;
733        self
734    }
735
736    pub fn set_fd(&mut self, fd: SocketFd) -> &mut PollItem {
737        self.socket = std::ptr::null_mut();
738        self.fd = fd;
739        self
740    }
741
742    pub fn clear_events(&mut self) -> &mut PollItem {
743        self.events = 0;
744        self
745    }
746
747    pub fn reg_event(&mut self, ev: PollEvent) -> &mut PollItem {
748        self.events |= ev as c_short;
749        self
750    }
751
752    pub fn unreg_event(&mut self, ev: PollEvent) -> &mut PollItem {
753        self.events &= !(ev as c_short);
754        self
755    }
756
757    /// Clear all returned events
758    pub fn clear_revents(&mut self) -> &mut PollItem {
759        self.revents = 0;
760        self
761    }
762
763    /// Does this PollItem have the specified PollEvent returned.
764    pub fn has_revent(&self, ev: PollEvent) -> bool {
765        (self.revents & (ev as c_short)) > 0
766    }
767}