use std::{
io::{self, Write},
iter::FromIterator,
ops::{Deref, DerefMut, Index, IndexMut},
ptr::{self, NonNull},
slice::{self, SliceIndex},
};
use crate::{pipe::Pipe, util::validate_ptr};
const ALLOC_FAIL_MSG: &str = "NNG failed to allocate memory";
#[derive(Debug)]
pub struct Message
{
msgp: NonNull<nng_sys::nng_msg>,
header: Header,
}
impl Message
{
pub fn new() -> Self
{
let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
let rv = unsafe { nng_sys::nng_msg_alloc(&mut msgp as _, 0) };
let msgp = validate_ptr(rv, msgp).expect(ALLOC_FAIL_MSG);
Message::from_ptr(msgp)
}
pub fn with_capacity(cap: usize) -> Self
{
let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
let rv = unsafe { nng_sys::nng_msg_alloc(&mut msgp as _, cap) };
let msgp = validate_ptr(rv, msgp).expect(ALLOC_FAIL_MSG);
unsafe {
nng_sys::nng_msg_clear(msgp.as_ptr());
}
Message::from_ptr(msgp)
}
pub fn with_zeros(size: usize) -> Self
{
let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
let rv = unsafe { nng_sys::nng_msg_alloc(&mut msgp as _, size) };
let msgp = validate_ptr(rv, msgp).expect(ALLOC_FAIL_MSG);
Message::from_ptr(msgp)
}
pub fn truncate(&mut self, len: usize)
{
let rv = unsafe {
let current_len = nng_sys::nng_msg_len(self.msgp.as_ptr());
nng_sys::nng_msg_chop(self.msgp.as_ptr(), current_len.saturating_sub(len))
};
debug_assert_eq!(rv, 0, "Message was too short to truncate");
}
pub fn trim(&mut self, len: usize)
{
let rv = unsafe {
let current_len = nng_sys::nng_msg_len(self.msgp.as_ptr());
nng_sys::nng_msg_trim(self.msgp.as_ptr(), len.min(current_len))
};
debug_assert_eq!(rv, 0, "Message was too short to trim");
}
pub fn as_slice(&self) -> &[u8]
{
unsafe {
let ptr = nng_sys::nng_msg_body(self.msgp.as_ptr());
let len = nng_sys::nng_msg_len(self.msgp.as_ptr());
slice::from_raw_parts(ptr as _, len)
}
}
pub fn as_mut_slice(&mut self) -> &mut [u8]
{
unsafe {
let ptr = nng_sys::nng_msg_body(self.msgp.as_ptr());
let len = nng_sys::nng_msg_len(self.msgp.as_ptr());
slice::from_raw_parts_mut(ptr as _, len)
}
}
pub const fn as_header(&self) -> &Header { &self.header }
pub fn as_mut_header(&mut self) -> &mut Header { &mut self.header }
pub fn len(&self) -> usize { unsafe { nng_sys::nng_msg_len(self.msgp.as_ptr()) } }
pub fn is_empty(&self) -> bool { self.len() == 0 }
pub fn clear(&mut self)
{
unsafe {
nng_sys::nng_msg_clear(self.msgp.as_ptr());
}
}
pub fn push_front(&mut self, data: &[u8])
{
let rv =
unsafe { nng_sys::nng_msg_insert(self.msgp.as_ptr(), data.as_ptr() as _, data.len()) };
rv2res!(rv).expect(ALLOC_FAIL_MSG);
}
pub fn push_back(&mut self, data: &[u8])
{
let rv =
unsafe { nng_sys::nng_msg_append(self.msgp.as_ptr(), data.as_ptr() as _, data.len()) };
rv2res!(rv).expect(ALLOC_FAIL_MSG);
}
pub fn pipe(&mut self) -> Option<Pipe>
{
let (pipe, id) = unsafe {
let pipe = nng_sys::nng_msg_get_pipe(self.msgp.as_ptr());
let id = nng_sys::nng_pipe_id(pipe);
(pipe, id)
};
if id > 0 { Some(Pipe::from_nng_sys(pipe)) } else { None }
}
pub fn set_pipe(&mut self, pipe: Pipe)
{
unsafe { nng_sys::nng_msg_set_pipe(self.msgp.as_ptr(), pipe.handle()) }
}
pub(crate) const fn from_ptr(msgp: NonNull<nng_sys::nng_msg>) -> Self
{
Message { msgp, header: Header { msgp } }
}
pub(crate) fn into_ptr(self) -> NonNull<nng_sys::nng_msg>
{
let ptr = self.msgp;
std::mem::forget(self);
ptr
}
}
#[cfg(feature = "ffi-module")]
impl Message
{
pub fn nng_msg(&self) -> *mut nng_sys::nng_msg { self.msgp.as_ptr() }
}
impl Drop for Message
{
fn drop(&mut self)
{
unsafe {
nng_sys::nng_msg_free(self.msgp.as_ptr());
}
}
}
unsafe impl Send for Message {}
unsafe impl Sync for Message {}
impl Clone for Message
{
fn clone(&self) -> Self
{
let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
let rv = unsafe { nng_sys::nng_msg_dup(&mut msgp as _, self.msgp.as_ptr()) };
let msgp = validate_ptr(rv, msgp).expect(ALLOC_FAIL_MSG);
Message::from_ptr(msgp)
}
}
impl Default for Message
{
fn default() -> Message { Message::new() }
}
impl<'a> From<&'a [u8]> for Message
{
fn from(s: &[u8]) -> Message
{
let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
let rv = unsafe { nng_sys::nng_msg_alloc(&mut msgp as _, s.len()) };
let msgp = validate_ptr(rv, msgp).expect(ALLOC_FAIL_MSG);
unsafe {
ptr::copy_nonoverlapping(
s.as_ptr(),
nng_sys::nng_msg_body(msgp.as_ptr()) as _,
s.len(),
);
}
Message::from_ptr(msgp)
}
}
impl<'a> From<&'a Vec<u8>> for Message
{
fn from(s: &Vec<u8>) -> Message { s.as_slice().into() }
}
macro_rules! array_impl
{
($s:tt) => {
impl From<[u8; $s]> for Message
{
fn from(s: [u8; $s]) -> Message
{
s[..].into()
}
}
};
($s:tt, $($r:tt),+) => {
array_impl!($s);
array_impl!($($r),+);
}
}
array_impl!(
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25,
26, 27, 28, 29, 30, 31, 32
);
impl FromIterator<u8> for Message
{
fn from_iter<T>(iter: T) -> Message
where
T: IntoIterator<Item = u8>,
{
let iter = iter.into_iter();
let (lower, _) = iter.size_hint();
let mut msg = Message::with_capacity(lower);
msg.extend(iter);
msg
}
}
impl<'a> FromIterator<&'a u8> for Message
{
fn from_iter<T>(iter: T) -> Message
where
T: IntoIterator<Item = &'a u8>,
{
let iter = iter.into_iter();
let (lower, _) = iter.size_hint();
let mut msg = Message::with_capacity(lower);
msg.extend(iter);
msg
}
}
impl Deref for Message
{
type Target = [u8];
fn deref(&self) -> &[u8] { self.as_slice() }
}
impl DerefMut for Message
{
fn deref_mut(&mut self) -> &mut [u8] { self.as_mut_slice() }
}
impl Write for Message
{
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize>
{
self.push_back(buf);
Ok(buf.len())
}
#[inline]
fn write_all(&mut self, buf: &[u8]) -> io::Result<()>
{
self.push_back(buf);
Ok(())
}
#[inline]
fn flush(&mut self) -> io::Result<()> { Ok(()) }
}
impl Extend<u8> for Message
{
fn extend<I: IntoIterator<Item = u8>>(&mut self, iter: I)
{
for byte in iter {
self.push_back(slice::from_ref(&byte));
}
}
}
impl<'a> Extend<&'a u8> for Message
{
fn extend<I: IntoIterator<Item = &'a u8>>(&mut self, iter: I)
{
for byte in iter {
self.push_back(slice::from_ref(byte));
}
}
}
impl<I: SliceIndex<[u8]>> Index<I> for Message
{
type Output = I::Output;
#[inline]
fn index(&self, index: I) -> &Self::Output { self.as_slice().index(index) }
}
impl<I: SliceIndex<[u8]>> IndexMut<I> for Message
{
#[inline]
fn index_mut(&mut self, index: I) -> &mut Self::Output { self.as_mut_slice().index_mut(index) }
}
#[derive(Debug)]
pub struct Header
{
msgp: NonNull<nng_sys::nng_msg>,
}
impl Header
{
pub fn truncate(&mut self, len: usize)
{
let rv = unsafe {
let current_len = nng_sys::nng_msg_header_len(self.msgp.as_ptr());
nng_sys::nng_msg_header_chop(self.msgp.as_ptr(), current_len.saturating_sub(len))
};
debug_assert!(rv == 0, "Message header was too short to truncate");
}
pub fn trim(&mut self, len: usize)
{
let rv = unsafe {
let current_len = nng_sys::nng_msg_header_len(self.msgp.as_ptr());
nng_sys::nng_msg_header_trim(self.msgp.as_ptr(), len.min(current_len))
};
debug_assert_eq!(rv, 0, "Message header was too short to trim");
}
pub fn as_slice(&self) -> &[u8]
{
unsafe {
let ptr = nng_sys::nng_msg_header(self.msgp.as_ptr());
let len = nng_sys::nng_msg_header_len(self.msgp.as_ptr());
slice::from_raw_parts(ptr as _, len)
}
}
pub fn as_mut_slice(&mut self) -> &mut [u8]
{
unsafe {
let ptr = nng_sys::nng_msg_header(self.msgp.as_ptr());
let len = nng_sys::nng_msg_header_len(self.msgp.as_ptr());
slice::from_raw_parts_mut(ptr as _, len)
}
}
pub fn len(&self) -> usize { unsafe { nng_sys::nng_msg_header_len(self.msgp.as_ptr()) } }
pub fn is_empty(&self) -> bool { self.len() == 0 }
pub fn clear(&mut self)
{
unsafe {
nng_sys::nng_msg_header_clear(self.msgp.as_ptr());
}
}
pub fn push_back(&mut self, data: &[u8])
{
let rv = unsafe {
nng_sys::nng_msg_header_append(self.msgp.as_ptr(), data.as_ptr() as _, data.len())
};
rv2res!(rv).expect(ALLOC_FAIL_MSG);
}
pub fn push_front(&mut self, data: &[u8])
{
let rv = unsafe {
nng_sys::nng_msg_header_insert(self.msgp.as_ptr(), data.as_ptr() as _, data.len())
};
rv2res!(rv).expect(ALLOC_FAIL_MSG);
}
}
unsafe impl Send for Header {}
unsafe impl Sync for Header {}
impl Deref for Header
{
type Target = [u8];
fn deref(&self) -> &[u8] { self.as_slice() }
}
impl DerefMut for Header
{
fn deref_mut(&mut self) -> &mut [u8] { self.as_mut_slice() }
}
impl Write for Header
{
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize>
{
self.push_back(buf);
Ok(buf.len())
}
#[inline]
fn write_all(&mut self, buf: &[u8]) -> io::Result<()>
{
self.push_back(buf);
Ok(())
}
#[inline]
fn flush(&mut self) -> io::Result<()> { Ok(()) }
}
impl Extend<u8> for Header
{
fn extend<I: IntoIterator<Item = u8>>(&mut self, iter: I)
{
for byte in iter {
self.push_back(slice::from_ref(&byte));
}
}
}
impl<'a> Extend<&'a u8> for Header
{
fn extend<I: IntoIterator<Item = &'a u8>>(&mut self, iter: I)
{
for byte in iter {
self.push_back(slice::from_ref(byte));
}
}
}
impl<I: SliceIndex<[u8]>> Index<I> for Header
{
type Output = I::Output;
#[inline]
fn index(&self, index: I) -> &Self::Output { self.as_slice().index(index) }
}
impl<I: SliceIndex<[u8]>> IndexMut<I> for Header
{
#[inline]
fn index_mut(&mut self, index: I) -> &mut Self::Output { self.as_mut_slice().index_mut(index) }
}