1extern crate libc;
2
3use std::mem::transmute;
4use std::ffi;
5
6use libc::{ c_int, c_void, size_t, c_short, c_long, c_ushort };
7use super::{ Error, Message, SocketType };
8use ::std;
9use ::zmq_ffi;
10
11fn bytes_to_string(bytes: Vec<u8>) -> String {
12 unsafe { ffi::CStr::from_ptr(transmute(bytes.as_ptr())).to_str().unwrap().to_string() }
13}
14
15fn str_to_cstr_bytes(s: &str) -> Vec<u8> {
16 let cstr = ffi::CString::new(s).unwrap();
17 cstr.as_bytes_with_nul().to_vec()
19}
20
21macro_rules! getsockopt_template(
22 ($name: ident, $opt: expr, $t: ty) => {
24 pub fn $name(&self) -> Result<$t, Error> {
25 let mut optval: $t = std::default::Default::default();
26 let mut optval_len: size_t = std::mem::size_of::<$t>() as size_t;
27 let optval_ptr = &mut optval as *mut $t;
28
29 let rc = unsafe { zmq_ffi::zmq_getsockopt(self.socket, $opt as c_int, transmute(optval_ptr), &mut optval_len) };
30 if rc == -1 {
31 Err(Error::from_last_err())
32 } else {
33 Ok(optval)
34 }
35 }
36 };
37 ($name: ident, $opt: expr, $t: ty, $n: expr, $rmap: expr, $r: ty) => {
39 pub fn $name(&self) -> Result<$r, Error> {
40 let mut optval: Vec<$t> = Vec::with_capacity($n);
41
42 let mut optval_len: size_t = (optval.capacity() * std::mem::size_of::<$t>()) as size_t;
43 let optval_ptr = optval.as_mut_ptr();
44
45 let rc = unsafe {
46 zmq_ffi::zmq_getsockopt(self.socket, $opt as c_int,
47 transmute(optval_ptr), &mut optval_len)
48 };
49
50 if rc == -1 {
51 Err(Error::from_last_err())
52 } else {
53 unsafe { optval.set_len(optval_len); }
54 Ok($rmap(optval))
55 }
56 }
57 };
58 ($name: ident, $opt: expr, $t: ty, $rmap: expr, $r: ty) => {
60 pub fn $name(&self) -> Result<$r, Error> {
61 let mut optval: $t = std::default::Default::default();
62 let mut optval_len: size_t = std::mem::size_of::<$t>() as size_t;
63 let optval_ptr = &mut optval as *mut $t;
64
65 let rc = unsafe { zmq_ffi::zmq_getsockopt(self.socket, $opt as c_int, transmute(optval_ptr), &mut optval_len) };
66 if rc == -1 {
67 Err(Error::from_last_err())
68 } else {
69 Ok($rmap(optval))
70 }
71 }
72 };
73 ($name: ident, $opt: expr, $t: ty, $n: expr) => {
75 pub fn $name(&self) -> Result<Vec<$t>, Error> {
76 let mut optval: Vec<$t> = Vec::with_capacity($n);
77
78 let mut optval_len: size_t = (optval.capacity() * std::mem::size_of::<$t>()) as size_t;
79 let optval_ptr = optval.as_mut_ptr();
80
81 let rc = unsafe {
82 zmq_ffi::zmq_getsockopt(self.socket, $opt as c_int,
83 transmute(optval_ptr), &mut optval_len)
84 };
85
86 if rc == -1 {
87 Err(Error::from_last_err())
88 } else {
89 unsafe { optval.set_len(optval_len); }
90 Ok(optval)
91 }
92 }
93 };
94);
95
96macro_rules! setsockopt_nullptr_template(
97 ($name: ident, $opt: expr) => {
98 pub fn $name(&self) -> Result<(), Error> {
99 let optval_len: size_t = 0;
100 let optval_ptr: *const u8 = std::ptr::null();
101
102 let rc = unsafe {
103 zmq_ffi::zmq_setsockopt(self.socket, $opt as c_int,
104 transmute(optval_ptr), optval_len)
105 };
106
107 if rc == -1 {
108 Err(Error::from_last_err())
109 } else {
110 Ok(())
111 }
112 }
113 };
114);
115
116macro_rules! setsockopt_template(
117 ($name: ident, $opt: expr, $t: ty) => {
119 pub fn $name(&self, optval: &$t) -> Result<(), Error> {
120 let optval_len: size_t = std::mem::size_of::<$t>() as size_t;
121 let optval_ptr = optval as *const $t;
122
123 let rc = unsafe { zmq_ffi::zmq_setsockopt(self.socket, $opt as c_int, transmute(optval_ptr), optval_len) };
124 if rc == -1 {
125 Err(Error::from_last_err())
126 } else {
127 Ok(())
128 }
129 }
130 };
131
132 ($name: ident, $opt: expr) => {
134 pub fn $name(&self, optval: &[u8]) -> Result<(), Error> {
135 let optval_len: size_t = optval.len() as size_t;
136 let optval_ptr: *const u8 = optval.as_ptr();
137
138 let rc = unsafe {
139 zmq_ffi::zmq_setsockopt(self.socket, $opt as c_int,
140 transmute(optval_ptr), optval_len)
141 };
142
143 if rc == -1 {
144 Err(Error::from_last_err())
145 } else {
146 Ok(())
147 }
148 }
149 };
150
151 ($name: ident, $opt: expr, $t: ty, $map: expr) => {
152 pub fn $name(&self, optval: $t) -> Result<(), Error> {
153 let optval: Vec<u8> = $map(optval);
154 let optval_len: size_t = optval.len() as size_t;
155
156 let optval_ptr: *const u8 = optval.as_ptr();
157
158 let rc = unsafe {
159 zmq_ffi::zmq_setsockopt(self.socket, $opt as c_int,
160 transmute(optval_ptr), optval_len)
161 };
162
163 if rc == -1 {
164 Err(Error::from_last_err())
165 } else {
166 Ok(())
167 }
168 }
169 };
170);
171
172#[allow(non_camel_case_types)]
173#[derive(Copy, Clone, Debug)]
174enum SocketOption {
175 AFFINITY = 4,
176 IDENTITY = 5,
177 SUBSCRIBE = 6,
178 UNSUBSCRIBE = 7,
179 RATE = 8,
180 RECOVERY_IVL = 9,
181 SNDBUF = 11,
182 RCVBUF = 12,
183 RCVMORE = 13,
184 FD = 14,
185 EVENTS = 15,
186 TYPE = 16,
187 LINGER = 17,
188 RECONNECT_IVL = 18,
189 BACKLOG = 19,
190 RECONNECT_IVL_MAX = 21,
191 MAXMSGSIZE = 22,
192 SNDHWM = 23,
193 RCVHWM = 24,
194 MULTICAST_HOPS = 25,
195 RCVTIMEO = 27,
196 SNDTIMEO = 28,
197 LAST_ENDPOINT = 32,
198 ROUTER_MANDATORY = 33,
199 TCP_KEEPALIVE = 34,
200 TCP_KEEPALIVE_CNT = 35,
201 TCP_KEEPALIVE_IDLE = 36,
202 TCP_KEEPALIVE_INTVL = 37,
203 IMMEDIATE = 39,
204 XPUB_VERBOSE = 40,
205 ROUTER_RAW = 41,
206 IPV6 = 42,
207 MECHANISM = 43,
208 PLAIN_SERVER = 44,
209 PLAIN_USERNAME = 45,
210 PLAIN_PASSWORD = 46,
211 CURVE_SERVER = 47,
212 CURVE_PUBLICKEY = 48,
213 CURVE_SECRETKEY = 49,
214 CURVE_SERVERKEY = 50,
215 PROBE_ROUTER = 51,
216 REQ_CORRELATE = 52,
217 REQ_RELAXED = 53,
218 CONFLATE = 54,
219 ZAP_DOMAIN = 55,
220 ROUTER_HANDOVER = 56,
221 TOS = 57,
222 CONNECT_RID = 61,
223 GSSAPI_SERVER = 62,
224 GSSAPI_PRINCIPAL = 63,
225 GSSAPI_SERVICE_PRINCIPAL = 64,
226 GSSAPI_PLAINTEXT = 65,
227 HANDSHAKE_IVL = 66,
228 SOCKS_PROXY = 68,
229 XPUB_NODROP = 69,
230}
231
232pub type SocketEvent = c_ushort;
233
234pub const CONNECTED: SocketEvent = 0x0001;
235pub const CONNECT_DELAYED: SocketEvent = 0x0002;
236pub const CONNECT_RETRIED: SocketEvent = 0x0004;
237pub const LISTENING: SocketEvent = 0x0008;
238pub const BIND_FAILED: SocketEvent = 0x0010;
239pub const ACCEPTED: SocketEvent = 0x0020;
240pub const ACCEPT_FAILED: SocketEvent = 0x0040;
241pub const CLOSED: SocketEvent = 0x0080;
242pub const CLOSE_FAILED: SocketEvent = 0x0100;
243pub const DISCONNECTED: SocketEvent = 0x0200;
244pub const MONITOR_STOPPED: SocketEvent = 0x0400;
245pub const ALL: SocketEvent = 0xFFFF;
246
247pub type SocketFlag = c_int;
248
249pub const DONTWAIT: SocketFlag = 1;
250pub const SNDMORE: SocketFlag = 2;
251
252pub struct Socket {
253 socket: *mut c_void,
254 closed: bool,
255}
256
257unsafe impl Send for Socket {}
258
259impl Socket {
260 pub fn from_raw(socket: *mut c_void) -> Socket {
261 Socket { socket: socket, closed: false }
262 }
263
264 pub fn close(&mut self) -> Result<(), Error> {
276 self.closed = true;
277 self.close_underly()
278 }
279
280 fn close_underly(&mut self) -> Result<(), Error> {
281 loop {
282 let rc = unsafe { zmq_ffi::zmq_close(self.socket) };
283 if rc != 0 {
284 let e = Error::from_last_err();
285 if e.get_errno() == ::errno::EINTR {
286 continue;
287 } else {
288 return Err(e);
289 }
290
291 } else {
292 break;
293 }
294 }
295 Ok(())
296 }
297
298 pub fn bind(&mut self, endpoint: &str) -> Result<(), Error> {
304 let endpoint_cstr = ffi::CString::new(endpoint).unwrap();
305 let rc = unsafe { zmq_ffi::zmq_bind(self.socket, endpoint_cstr.as_ptr()) };
306 if rc == -1 {
307 Err(Error::from_last_err())
308 } else {
309 Ok(())
310 }
311 }
312
313 pub fn connect(&mut self, endpoint: &str) -> Result<(), Error> {
319 let endpoint_cstr = ffi::CString::new(endpoint).unwrap();
320 let rc = unsafe { zmq_ffi::zmq_connect(self.socket, endpoint_cstr.as_ptr()) };
321 if rc == -1 {
322 Err(Error::from_last_err())
323 } else {
324 Ok(())
325 }
326 }
327
328 pub fn unbind(&mut self, endpoint: &str) -> Result<(), Error> {
334 let endpoint_cstr = ffi::CString::new(endpoint).unwrap();
335 let rc = unsafe { zmq_ffi::zmq_unbind(self.socket, endpoint_cstr.as_ptr()) };
336 if rc == -1 {
337 Err(Error::from_last_err())
338 } else {
339 Ok(())
340 }
341 }
342
343 pub fn disconnect(&mut self, endpoint: &str) -> Result<(), Error> {
352 let endpoint_cstr = ffi::CString::new(endpoint).unwrap();
353 let rc = unsafe { zmq_ffi::zmq_disconnect(self.socket, endpoint_cstr.as_ptr()) };
354 if rc == -1 {
355 Err(Error::from_last_err())
356 } else {
357 Ok(())
358 }
359 }
360
361 pub fn send_msg(&mut self, mut msg: Message, flags: SocketFlag) -> Result<i32, Error> {
365 let rc = unsafe { zmq_ffi::zmq_msg_send(&mut msg.msg, self.socket, flags) };
366 if rc == -1 {
367 Err(Error::from_last_err())
368 } else {
369 Ok(rc)
370 }
371 }
372
373 pub fn recv_into_msg(&mut self, msg: &mut Message, flags: SocketFlag) -> Result<i32, Error> {
377 let rc = unsafe { zmq_ffi::zmq_msg_recv(&mut msg.msg, self.socket, flags) };
378 if rc == -1 {
379 Err(Error::from_last_err())
380 } else {
381 Ok(rc)
382 }
383 }
384
385 pub fn recv_msg(&mut self, flags: SocketFlag) -> Result<Message, Error> {
389 let mut msg = try!(Message::new());
390 match self.recv_into_msg(&mut msg, flags) {
391 Ok(_) => Ok(msg),
392 Err(e) => Err(e),
393 }
394 }
395
396 pub fn send_bytes(&mut self, data: &[u8], flags: SocketFlag) -> Result<i32, Error> {
400 let msg = try!(Message::from_slice(data));
401 self.send_msg(msg, flags)
402 }
403
404 pub fn send_const_bytes(&mut self, data: &'static [u8], flags: SocketFlag) -> Result<i32, Error> {
410 let rc = unsafe { zmq_ffi::zmq_send_const(self.socket, transmute(data.as_ptr()), data.len(), flags) };
411 if rc == -1 {
412 Err(Error::from_last_err())
413 } else {
414 Ok(rc)
415 }
416 }
417
418 pub fn send_str(&mut self, data: &str, flags: SocketFlag) -> Result<i32, Error> {
420 self.send_bytes(data.as_bytes(), flags)
421 }
422
423 pub fn recv_bytes(&mut self, flags: SocketFlag) -> Result<Vec<u8>, Error> {
425 match self.recv_msg(flags) {
426 Ok(msg) => Ok(msg.to_vec()),
427 Err(e) => Err(e),
428 }
429 }
430
431 pub fn recv_bytes_into_slice(&mut self, buffer: &mut [u8], flags: SocketFlag) -> Result<i32, Error> {
435 let rc = unsafe { zmq_ffi::zmq_recv(self.socket, transmute(buffer.as_mut_ptr()), buffer.len(), flags) };
436 if rc == -1 {
437 Err(Error::from_last_err())
438 } else {
439 Ok(rc)
440 }
441 }
442
443 pub fn recv_string(&mut self, flags: SocketFlag) -> Result<Result<String, Vec<u8>>, Error> {
445 match self.recv_bytes(flags) {
446 Ok(msg) => {
447 Ok({
448 let s = String::from_utf8(msg);
449 if s.is_ok() {
450 Ok(s.unwrap())
451 } else {
452 Err(s.unwrap_err().into_bytes())
453 }
454 })
455 }
456 Err(e) => Err(e),
457 }
458 }
459
460 pub fn socket_monitor(&mut self, endpoint: &str, events: &Vec<SocketEvent>) -> Result<(), Error> {
466 let mut event_mask: i32 = 0;
467 for event in events {
468 event_mask |= Clone::clone(event) as i32;
469 }
470
471 let endpoint_cstr = ffi::CString::new(endpoint).unwrap();
472 let rc = unsafe { zmq_ffi::zmq_socket_monitor(self.socket, endpoint_cstr.as_ptr(), event_mask) };
473 if rc == -1 {
474 Err(Error::from_last_err())
475 } else {
476 Ok(())
477 }
478 }
479
480 pub fn run_proxy(frontend: &mut Socket, backend: &mut Socket) -> Result<(), Error> {
486 let rc = unsafe { zmq_ffi::zmq_proxy(frontend.socket, backend.socket, std::ptr::null_mut()) };
487 if rc == -1 {
488 Err(Error::from_last_err())
489 } else {
490 Ok(())
491 }
492 }
493
494 pub fn run_proxy_ex(frontend: &mut Socket, backend: &mut Socket,
507 capture: Option<&mut Socket>, control: Option<&mut Socket>) -> Result<(), Error> {
508 let capture_ptr = if capture.is_none() { std::ptr::null_mut() } else { capture.unwrap().socket };
509
510 let rc = {
511 if control.is_none() {
512 unsafe { zmq_ffi::zmq_proxy(frontend.socket, backend.socket, capture_ptr) }
513 } else {
514 unsafe { zmq_ffi::zmq_proxy_steerable(frontend.socket, backend.socket, capture_ptr, control.unwrap().socket) }
515 }
516 };
517 if rc == -1 {
518 Err(Error::from_last_err())
519 } else {
520 Ok(())
521 }
522 }
523
524 pub fn as_poll_item(&self) -> PollItem {
529 PollItem::from_socket(&self)
530 }
531
532 pub fn poll(items: &mut [PollItem], nitems: i32, timeout: i32) -> Result<i32, Error> {
536 let rc = unsafe { zmq_ffi::zmq_poll(transmute(items.as_mut_ptr()), nitems as c_int, timeout as c_long) };
537 if rc == -1 {
538 Err(Error::from_last_err())
539 } else {
540 Ok(rc)
541 }
542 }
543
544 getsockopt_template!(get_affinity, SocketOption::AFFINITY, u64);
547 getsockopt_template!(get_backlog, SocketOption::BACKLOG, i32);
548 getsockopt_template!(get_curve_publickey, SocketOption::CURVE_PUBLICKEY, u8, 32);
549 getsockopt_template!(get_curve_printable_publickey, SocketOption::CURVE_PUBLICKEY, u8, 41,
550 |r: Vec<u8>| {
551 bytes_to_string(r)
552 }, String);
553 getsockopt_template!(get_curve_secretkey, SocketOption::CURVE_SECRETKEY, u8, 32);
554 getsockopt_template!(get_curve_printable_secretkey, SocketOption::CURVE_SECRETKEY, u8, 41,
555 |r: Vec<u8>| {
556 bytes_to_string(r)
557 }, String);
558 getsockopt_template!(get_curve_serverkey, SocketOption::CURVE_SERVERKEY, u8, 32);
559 getsockopt_template!(get_curve_printable_serverkey, SocketOption::CURVE_SERVERKEY, u8, 41,
560 |r: Vec<u8>| {
561 bytes_to_string(r)
562 }, String);
563 getsockopt_template!(get_events, SocketOption::EVENTS, PollEvent);
564 getsockopt_template!(get_fd, SocketOption::FD, SocketFd);
565 getsockopt_template!(is_gssapi_plaintext, SocketOption::GSSAPI_PLAINTEXT, i32, |r| { r > 0 }, bool);
566 getsockopt_template!(get_gssapi_principal, SocketOption::GSSAPI_PRINCIPAL, u8, 256,
567 |r: Vec<u8>| {
568 bytes_to_string(r)
569 }, String);
570 getsockopt_template!(is_gssapi_server, SocketOption::GSSAPI_SERVER, i32, |r| { r > 0 }, bool);
571 getsockopt_template!(get_gssapi_service_principal, SocketOption::GSSAPI_SERVICE_PRINCIPAL, u8, 256,
572 |r: Vec<u8>| {
573 bytes_to_string(r)
574 }, String);
575 getsockopt_template!(get_handshake_ivl, SocketOption::HANDSHAKE_IVL, i32);
576 getsockopt_template!(get_identity, SocketOption::IDENTITY, u8, 256);
577 getsockopt_template!(is_immediate, SocketOption::IMMEDIATE, i32, |r| { r > 0 }, bool);
578 getsockopt_template!(is_ipv6_enabled, SocketOption::IPV6, i32, |r| { r > 0 }, bool);
580 getsockopt_template!(get_last_endpoint, SocketOption::LAST_ENDPOINT, u8, 2048,
583 |r: Vec<u8>| {
584 bytes_to_string(r)
585 }, String);
586 getsockopt_template!(get_linger, SocketOption::LINGER, i32);
587 getsockopt_template!(get_max_msg_size, SocketOption::MAXMSGSIZE, i64);
588 getsockopt_template!(get_mechanism, SocketOption::MECHANISM, i32);
589 getsockopt_template!(get_multicast_hops, SocketOption::MULTICAST_HOPS, i32);
590 getsockopt_template!(get_plain_password, SocketOption::PLAIN_PASSWORD, u8, 256,
591 |r: Vec<u8>| {
592 bytes_to_string(r)
593 }, String);
594 getsockopt_template!(is_plain_server, SocketOption::PLAIN_SERVER, i32, |r| { r > 0 }, bool);
595 getsockopt_template!(get_plain_username, SocketOption::PLAIN_USERNAME, u8, 256,
596 |r: Vec<u8>| {
597 bytes_to_string(r)
598 }, String);
599 getsockopt_template!(get_rate, SocketOption::RATE, i32);
600 getsockopt_template!(get_rcvbuf, SocketOption::RCVBUF, i32);
601 getsockopt_template!(get_rcvhwm, SocketOption::RCVHWM, i32);
602 getsockopt_template!(can_rcvmore, SocketOption::RCVMORE, i32, |r| { r > 0 }, bool);
603 getsockopt_template!(get_rcvtimeo, SocketOption::RCVTIMEO, i32);
604 getsockopt_template!(get_reconnect_ivl, SocketOption::RECONNECT_IVL, i32);
605 getsockopt_template!(get_reconnect_ivl_max, SocketOption::RECONNECT_IVL_MAX, i32);
606 getsockopt_template!(get_recovery_ivl, SocketOption::RECOVERY_IVL, i32);
607 getsockopt_template!(get_sndbuf, SocketOption::SNDBUF, i32);
608 getsockopt_template!(get_sndhwm, SocketOption::SNDHWM, i32);
609 getsockopt_template!(get_sndtimeo, SocketOption::SNDTIMEO, i32);
610 getsockopt_template!(get_tcp_keepalive, SocketOption::TCP_KEEPALIVE, i32);
611 getsockopt_template!(get_tcp_keepalive_cnt, SocketOption::TCP_KEEPALIVE_CNT, i32);
612 getsockopt_template!(get_tcp_keepalive_idle, SocketOption::TCP_KEEPALIVE_IDLE, i32);
613 getsockopt_template!(get_tcp_keepalive_intvl, SocketOption::TCP_KEEPALIVE_INTVL, i32);
614 getsockopt_template!(get_tos, SocketOption::TOS, i32);
615 getsockopt_template!(get_type, SocketOption::TYPE, SocketType);
616 getsockopt_template!(get_zap_domain, SocketOption::ZAP_DOMAIN, u8, 256,
617 |r: Vec<u8>| {
618 bytes_to_string(r)
619 }, String);
620
621 setsockopt_template!(set_affinity, SocketOption::AFFINITY, u64);
623 setsockopt_template!(set_backlog, SocketOption::BACKLOG, i32);
624 setsockopt_template!(set_connect_rid, SocketOption::CONNECT_RID);
625 setsockopt_template!(set_conflate, SocketOption::CONFLATE, i32);
626 setsockopt_template!(set_curve_publickey, SocketOption::CURVE_PUBLICKEY);
627 setsockopt_template!(set_curve_plaintext_publickey, SocketOption::CURVE_PUBLICKEY, &str,
628 |s| { str_to_cstr_bytes(s) });
629 setsockopt_template!(set_curve_secretkey, SocketOption::CURVE_SECRETKEY);
630 setsockopt_template!(set_curve_plaintext_secretkey, SocketOption::CURVE_SECRETKEY, &str,
631 |s| { str_to_cstr_bytes(s) });
632 setsockopt_template!(set_curve_server, SocketOption::CURVE_SERVER, i32);
633 setsockopt_template!(set_curve_serverkey, SocketOption::CURVE_SERVERKEY);
634 setsockopt_template!(set_curve_plaintext_serverkey, SocketOption::CURVE_SERVERKEY, &str,
635 |s| { str_to_cstr_bytes(s) });
636 setsockopt_template!(set_gssapi_plaintext, SocketOption::GSSAPI_PLAINTEXT, i32);
637 setsockopt_template!(set_gssapi_principal, SocketOption::GSSAPI_PRINCIPAL, &str,
638 |s| { str_to_cstr_bytes(s) });
639 setsockopt_template!(set_gssapi_server, SocketOption::GSSAPI_SERVER, i32);
640 setsockopt_template!(set_gssapi_service_principal, SocketOption::GSSAPI_SERVICE_PRINCIPAL, &str,
641 |s| { str_to_cstr_bytes(s) });
642 setsockopt_template!(set_handshake_ivl, SocketOption::HANDSHAKE_IVL, i32);
643 setsockopt_template!(set_identity, SocketOption::IDENTITY);
644 setsockopt_template!(set_immediate, SocketOption::IMMEDIATE, i32);
645 setsockopt_template!(set_ipv6, SocketOption::IPV6, i32);
646 setsockopt_template!(set_linger, SocketOption::LINGER, i32);
647 setsockopt_template!(set_max_msg_size, SocketOption::MAXMSGSIZE, i64);
648 setsockopt_template!(set_multicast_hops, SocketOption::MULTICAST_HOPS, i32);
649 setsockopt_template!(set_plain_password, SocketOption::PLAIN_PASSWORD, &str,
650 |s| { str_to_cstr_bytes(s) });
651 setsockopt_nullptr_template!(set_plain_password_empty, SocketOption::PLAIN_PASSWORD);
652 setsockopt_template!(set_plain_server, SocketOption::PLAIN_SERVER, i32);
653 setsockopt_template!(set_plain_username, SocketOption::PLAIN_USERNAME, &str,
654 |s| { str_to_cstr_bytes(s) });
655 setsockopt_nullptr_template!(set_plain_username_empty, SocketOption::PLAIN_USERNAME);
656 setsockopt_template!(set_probe_router, SocketOption::PROBE_ROUTER, i32);
657 setsockopt_template!(set_rate, SocketOption::RATE, i32);
658 setsockopt_template!(set_rcvbuf, SocketOption::RCVBUF, i32);
659 setsockopt_template!(set_rcvhwm, SocketOption::RCVHWM, i32);
660 setsockopt_template!(set_rcvtimeo, SocketOption::RCVTIMEO, i32);
661 setsockopt_template!(set_reconnect_ivl, SocketOption::RECONNECT_IVL, i32);
662 setsockopt_template!(set_reconnect_ivl_max, SocketOption::RECONNECT_IVL_MAX, i32);
663 setsockopt_template!(set_recovery_ivl, SocketOption::RECOVERY_IVL, i32);
664 setsockopt_template!(set_req_correlate, SocketOption::REQ_CORRELATE, i32);
665 setsockopt_template!(set_req_relaxed, SocketOption::REQ_RELAXED, i32);
666 setsockopt_template!(set_router_handover, SocketOption::ROUTER_HANDOVER, i32);
667 setsockopt_template!(set_router_mandatory, SocketOption::ROUTER_MANDATORY, i32);
668 setsockopt_template!(set_router_raw, SocketOption::ROUTER_RAW, i32);
669 setsockopt_template!(set_sndbuf, SocketOption::SNDBUF, i32);
670 setsockopt_template!(set_sndhwm, SocketOption::SNDHWM, i32);
671 setsockopt_template!(set_sndtimeo, SocketOption::SNDTIMEO, i32);
672 setsockopt_template!(set_subscribe, SocketOption::SUBSCRIBE);
673 setsockopt_template!(set_tcp_keepalive, SocketOption::TCP_KEEPALIVE, i32);
674 setsockopt_template!(set_tcp_keepalive_cnt, SocketOption::TCP_KEEPALIVE_CNT, i32);
675 setsockopt_template!(set_tcp_keepalive_idle, SocketOption::TCP_KEEPALIVE_IDLE, i32);
676 setsockopt_template!(set_tcp_keepalive_intvl, SocketOption::TCP_KEEPALIVE_INTVL, i32);
677 setsockopt_template!(set_tos, SocketOption::TOS, i32);
678 setsockopt_template!(set_unsubscribe, SocketOption::UNSUBSCRIBE);
679 setsockopt_template!(set_xpub_verbose, SocketOption::XPUB_VERBOSE, i32);
680 setsockopt_template!(set_zqp_domain, SocketOption::ZAP_DOMAIN, &str,
681 |s| { str_to_cstr_bytes(s) });
682}
683
684impl Drop for Socket {
685 fn drop(&mut self) {
686 if !self.closed {
687 self.close_underly().unwrap();
688 }
689 }
690}
691
692pub type PollEvent = c_int;
693
694pub const POLLIN: PollEvent = 1;
695pub const POLLOUT: PollEvent = 2;
696pub const POLLERR: PollEvent = 4;
697
698#[cfg(target_os = "windows")]
699pub type SocketFd = ::libc::intptr_t;
700#[cfg(not(target_os = "windows"))]
701pub type SocketFd = c_int;
702
703#[repr(C)]
704pub struct PollItem {
705 pub socket: *mut c_void,
706 pub fd: SocketFd,
707 pub events: c_short,
708 pub revents: c_short,
709}
710
711impl PollItem {
712 pub fn from_socket(socket: &Socket) -> PollItem {
713 PollItem {
714 socket: socket.socket,
715 fd: 0,
716 events: 0,
717 revents: 0,
718 }
719 }
720
721 pub fn from_fd(fd: SocketFd) -> PollItem {
722 PollItem {
723 socket: std::ptr::null_mut(),
724 fd: fd,
725 events: 0,
726 revents: 0,
727 }
728 }
729
730 pub fn set_socket(&mut self, socket: &Socket) -> &mut PollItem {
731 self.socket = socket.socket;
732 self.fd = 0;
733 self
734 }
735
736 pub fn set_fd(&mut self, fd: SocketFd) -> &mut PollItem {
737 self.socket = std::ptr::null_mut();
738 self.fd = fd;
739 self
740 }
741
742 pub fn clear_events(&mut self) -> &mut PollItem {
743 self.events = 0;
744 self
745 }
746
747 pub fn reg_event(&mut self, ev: PollEvent) -> &mut PollItem {
748 self.events |= ev as c_short;
749 self
750 }
751
752 pub fn unreg_event(&mut self, ev: PollEvent) -> &mut PollItem {
753 self.events &= !(ev as c_short);
754 self
755 }
756
757 pub fn clear_revents(&mut self) -> &mut PollItem {
759 self.revents = 0;
760 self
761 }
762
763 pub fn has_revent(&self, ev: PollEvent) -> bool {
765 (self.revents & (ev as c_short)) > 0
766 }
767}