nng 0.3.0

A safe wrapper for nanomsg-next-generation (nng)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
//! Message handling utilities
//!
//! Applications desiring to use the richest part of `nng` will want to use the
//! message API, where the message structure is passed between functions. This
//! API provides the most power support for zero-copy.
//!
//! Messages are divided into a header and a body, where the body generally
//! carries user-payload and the header carries protocol specific header
//! information. Most applications will only interact with the body.
use std::{ptr, slice};
use std::ops::{Deref, DerefMut};
use crate::error::Result;

/// An `nng` message type.
#[derive(Debug)]
pub struct Message
{
	// We would like to be able to return a reference to the body and the head,
	// but they aren't accessible structures. We could create a `Body` and
	// `BodyMut` types a la iterators but that leads to a whole lot of
	// duplicated code. Instead, we're going to make them members of this
	// struct and return references to that. This will solve the borrowing
	// issue and avoid code duplication.

	/// The pointer to the actual message.
	msgp: *mut nng_sys::nng_msg,

	/// The fake "body" of the message.
	body: Body,

	/// The fake "header" of the message.
	header: Header,
}
impl Message
{
	/// Creates a message from the given `nng_msg`
	pub(crate) unsafe fn from_ptr(msgp: *mut nng_sys::nng_msg) -> Self
	{
		Message {
			msgp,
			body: Body { msgp },
			header: Header { msgp },
		}
	}

	/// Consumes the message and returns the `nng_msg` pointer.
	pub(crate) unsafe fn into_ptr(self) -> *mut nng_sys::nng_msg
	{
		let ptr = self.msgp;
		std::mem::forget(self);

		ptr
	}

	/// Create an empty message.
	pub fn new() -> Result<Self>
	{
		let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
		let rv = unsafe {
			nng_sys::nng_msg_alloc(&mut msgp as _, 0)
		};

		validate_ptr!(rv, msgp);
		Ok(unsafe { Message::from_ptr(msgp) })
	}

	/// Create an empty message with a pre-allocated body buffer.
	///
	/// The returned buffer will have a capacity equal to `cap` but a length of
	/// zero. To get a `Message` with a specified length, use `Message::zeros`.
	pub fn with_capacity(cap: usize) -> Result<Self>
	{
		let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
		let rv = unsafe {
			nng_sys::nng_msg_alloc(&mut msgp as _, cap)
		};

		validate_ptr!(rv, msgp);

		// When nng allocates a message, it fills the body and sets the size to
		// whatever you requested. It makes sense in a C context, less so here.
		unsafe { nng_sys::nng_msg_clear(msgp); }

		Ok(unsafe { Message::from_ptr(msgp) })
	}

	/// Create a message that is filled to `size` with zeros.
	pub fn zeros(size: usize) -> Result<Self>
	{
		let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
		let rv = unsafe {
			nng_sys::nng_msg_alloc(&mut msgp as _, size)
		};

		validate_ptr!(rv, msgp);
		Ok(unsafe { Message::from_ptr(msgp) })
	}

	/// Attempts to convert a buffer into a message.
	///
	/// This is functionally equivalent to calling `From` but allows the user
	/// to handle the case of `nng` being out of memory.
	///
	/// This function will be converted to the `TryFrom` trait once it is
	/// stable.
	pub fn try_from(s: &[u8]) -> Result<Self>
	{
		let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
		let rv = unsafe {
			nng_sys::nng_msg_alloc(&mut msgp as _, s.len())
		};

		validate_ptr!(rv, msgp);

		// At this point, `nng` thinks we have the requested amount of memory.
		// There is no more validation we can try to do.
		unsafe { ptr::copy_nonoverlapping(s.as_ptr(), nng_sys::nng_msg_body(msgp) as _, s.len()) }

		Ok(unsafe { Message::from_ptr(msgp) })
	}

	/// Attempts to duplicate the message.
	///
	/// This is functionally equivalent to calling `Clone` but allows the user
	/// to handle the case of `nng` being out of memory.
	pub fn try_clone(&self) -> Result<Self>
	{
		let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();

		let rv = unsafe {
			nng_sys::nng_msg_dup(&mut msgp as _, self.msgp)
		};

		validate_ptr!(rv, msgp);
		Ok(unsafe { Message::from_ptr(msgp) })
	}

	/// Returns a reference to the message body.
	pub fn body(&self) -> &Body
	{
		&self.body
	}

	/// Returns a mutable reference to the message body.
	pub fn body_mut(&mut self) -> &mut Body
	{
		&mut self.body
	}

	/// Returns a reference to the message header.
	pub fn header(&self) -> &Header
	{
		&self.header
	}

	/// Returns a mutable reference to the message header.
	pub fn header_mut(&mut self) -> &mut Header
	{
		&mut self.header
	}
}
impl Drop for Message
{
	fn drop(&mut self)
	{
		unsafe {
			nng_sys::nng_msg_free(self.msgp);
		}
	}
}
unsafe impl Send for Message {}
unsafe impl Sync for Message {}

impl Clone for Message
{
	fn clone(&self) -> Self
	{
		// This is a section of code that disagrees with the rest of this
		// library. At the time of writing, I let the `ENOMEM` error propagate
		// to the caller when `nng` doesn't have enough memory. However,
		// cloning is such a well-used part of Rust that we're going to panic
		// if the clone fails.
		self.try_clone().expect("Nng failed to duplicate the message")
	}
}

impl<'a> From<&'a [u8]> for Message
{
	fn from(s: &[u8]) -> Message
	{
		// As with `Clone`, this section is different than the rest of this
		// wrapper. Since the message allocation function only ever returns
		// `ENOMEM`, we're going to provide a more Rust-like interface by
		// panicking in the same way all other Rust allocations panic.
		Message::try_from(s).expect("Nng failed to allocate the memory")
	}
}

impl Deref for Message
{
	type Target = Body;

	fn deref(&self) -> &Body
	{
		&self.body
	}
}
impl DerefMut for Message
{
	fn deref_mut(&mut self) -> &mut Body
	{
		&mut self.body
	}
}

/// The body of a `Message`.
#[derive(Debug)]
pub struct Body
{
	msgp: *mut nng_sys::nng_msg,
}
impl Body
{
	/// Appends the data to the back of the message body.
	pub fn push_back(&mut self, data: &[u8]) -> Result<()>
	{
		let rv = unsafe {
			nng_sys::nng_msg_append(self.msgp, data.as_ptr() as _, data.len())
		};

		rv2res!(rv)
	}

	/// Shortens the message body, keeping the first `len` bytes.
	///
	/// If `len` is greater than the message body's current length, this has no
	/// effect.
	pub fn truncate(&mut self, len: usize)
	{
		let rv = unsafe {
			let current_len = nng_sys::nng_msg_len(self.msgp);
			nng_sys::nng_msg_chop(self.msgp, current_len.saturating_sub(len))
		};

		// We are guarding against this, so this should never happen
		assert!(rv == 0, "Message was too short to truncate");
	}

	/// Clears the message body.
	pub fn clear(&mut self)
	{
		unsafe {
			nng_sys::nng_msg_clear(self.msgp);
		}
	}

	/// Prepends the data to the message body.
	pub fn push_front(&mut self, data: &[u8]) -> Result<()>
	{
		let rv = unsafe {
			nng_sys::nng_msg_insert(self.msgp, data.as_ptr() as _, data.len())
		};

		rv2res!(rv)
	}

	/// Reserves capacity for at least `additional` more bytes to be inserted.
	///
	/// This function does nothing if the capacity is already sufficient.
	pub fn reserve(&mut self, additional: usize) -> Result<()>
	{
		let rv = unsafe {
			let current_len = nng_sys::nng_msg_len(self.msgp);
			nng_sys::nng_msg_realloc(self.msgp, current_len + additional)
		};

		rv2res!(rv)
	}

	/// Remove the first `len` bytes from the front of the message body.
	pub fn trim(&mut self, len: usize) -> Result<()>
	{
		let rv = unsafe {
			nng_sys::nng_msg_trim(self.msgp, len)
		};

		rv2res!(rv)
	}
}
unsafe impl Send for Body {}
unsafe impl Sync for Body {}

impl Deref for Body
{
	type Target = [u8];

	fn deref(&self) -> &[u8]
	{
		unsafe {
			let ptr = nng_sys::nng_msg_body(self.msgp);
			let len = nng_sys::nng_msg_len(self.msgp);

			slice::from_raw_parts(ptr as _, len)
		}
	}
}
impl DerefMut for Body
{
	fn deref_mut(&mut self) -> &mut [u8]
	{
		unsafe {
			let ptr = nng_sys::nng_msg_body(self.msgp);
			let len = nng_sys::nng_msg_len(self.msgp);

			slice::from_raw_parts_mut(ptr as _, len)
		}
	}
}

/// The header of a `Message`.
#[derive(Debug)]
pub struct Header
{
	msgp: *mut nng_sys::nng_msg,
}
impl Header
{
	/// Appends the data to the back of the message header.
	pub fn push_back(&mut self, data: &[u8]) -> Result<()>
	{
		let rv = unsafe {
			nng_sys::nng_msg_header_append(self.msgp, data.as_ptr() as _, data.len())
		};

		rv2res!(rv)
	}

	/// Shortens the message header, keeping the first `len` bytes.
	///
	/// If `len` is greater than the message header's current length, this has
	/// no effect.
	pub fn truncate(&mut self, len: usize)
	{
		let rv = unsafe {
			let current_len = nng_sys::nng_msg_header_len(self.msgp);
			nng_sys::nng_msg_header_chop(self.msgp, current_len.saturating_sub(len))
		};

		// We are guarding against this, so this should never happen
		assert!(rv == 0, "Message was too short to truncate");
	}

	/// Clears the message header.
	pub fn clear(&mut self)
	{
		unsafe {
			nng_sys::nng_msg_header_clear(self.msgp);
		}
	}

	/// Prepends the data to the message header.
	pub fn push_front(&mut self, data: &[u8]) -> Result<()>
	{
		let rv = unsafe {
			nng_sys::nng_msg_header_insert(self.msgp, data.as_ptr() as _, data.len())
		};

		rv2res!(rv)
	}

	/// Remove the first `len` bytes from the front of the message header.
	pub fn trim(&mut self, len: usize) -> Result<()>
	{
		let rv = unsafe {
			nng_sys::nng_msg_header_trim(self.msgp, len)
		};

		rv2res!(rv)
	}
}
unsafe impl Send for Header {}
unsafe impl Sync for Header {}

impl Deref for Header
{
	type Target = [u8];

	fn deref(&self) -> &[u8]
	{
		unsafe {
			let ptr = nng_sys::nng_msg_header(self.msgp);
			let len = nng_sys::nng_msg_header_len(self.msgp);

			slice::from_raw_parts(ptr as _, len)
		}
	}
}
impl DerefMut for Header
{
	fn deref_mut(&mut self) -> &mut [u8]
	{
		unsafe {
			let ptr = nng_sys::nng_msg_header(self.msgp);
			let len = nng_sys::nng_msg_header_len(self.msgp);

			slice::from_raw_parts_mut(ptr as _, len)
		}
	}
}