1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
extern crate zmq_sys;
use libc::{size_t};
use std::ffi;
use std::fmt;
use std::{mem, ptr, str, slice};
use std::ops::{Deref, DerefMut};
use super::Result;
pub struct Message {
msg: zmq_sys::zmq_msg_t,
}
impl PartialEq for Message {
fn eq(&self, other: &Message) -> bool {
&self[..] == &other[..]
}
}
impl Eq for Message {}
impl Drop for Message {
fn drop(&mut self) {
unsafe {
let rc = zmq_sys::zmq_msg_close(&mut self.msg);
assert_eq!(rc, 0);
}
}
}
impl fmt::Debug for Message {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.deref())
}
}
impl Message {
pub fn new() -> Result<Message> {
let mut msg = zmq_sys::zmq_msg_t::default();
zmq_try!(unsafe { zmq_sys::zmq_msg_init(&mut msg) });
Ok(Message { msg: msg })
}
pub unsafe fn with_capacity_unallocated(len: usize) -> Result<Message> {
let mut msg = zmq_sys::zmq_msg_t::default();
zmq_try!(zmq_sys::zmq_msg_init_size(&mut msg, len as size_t));
Ok(Message { msg: msg })
}
pub fn with_capacity(len: usize) -> Result<Message> {
unsafe {
let mut msg = try!(Message::with_capacity_unallocated(len));
ptr::write_bytes(msg.as_mut_ptr(), 0, len);
Ok(msg)
}
}
pub fn from_slice(data: &[u8]) -> Result<Message> {
unsafe {
let mut msg = try!(Message::with_capacity_unallocated(data.len()));
ptr::copy_nonoverlapping(data.as_ptr(), msg.as_mut_ptr(), data.len());
Ok(msg)
}
}
pub fn as_str(&self) -> Option<&str> {
str::from_utf8(self).ok()
}
pub fn get_more(&self) -> bool {
let rc = unsafe { zmq_sys::zmq_msg_more(&self.msg as *const _ as *mut _, ) };
rc != 0
}
pub fn gets<'a>(&'a mut self, property: &str) -> Option<&'a str> {
let c_str = ffi::CString::new(property.as_bytes()).unwrap();
let value = unsafe {
zmq_sys::zmq_msg_gets(&mut self.msg, c_str.as_ptr())
};
if value.is_null() {
None
} else {
Some(unsafe { str::from_utf8(ffi::CStr::from_ptr(value).to_bytes()).unwrap() })
}
}
}
impl Deref for Message {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe {
let ptr = &self.msg as *const _ as *mut _;
let data = zmq_sys::zmq_msg_data(ptr);
let len = zmq_sys::zmq_msg_size(ptr) as usize;
slice::from_raw_parts(mem::transmute(data), len)
}
}
}
impl DerefMut for Message {
fn deref_mut(&mut self) -> &mut [u8] {
unsafe {
let data = zmq_sys::zmq_msg_data(&mut self.msg);
let len = zmq_sys::zmq_msg_size(&mut self.msg) as usize;
slice::from_raw_parts_mut(mem::transmute(data), len)
}
}
}
pub fn msg_ptr(msg: &mut Message) -> *mut zmq_sys::zmq_msg_t {
&mut msg.msg
}