rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
use crate::error::ZmqEmptyMessageError;

use bytes::Bytes;
use smallvec::SmallVec;
use thiserror::Error;

use std::collections::VecDeque;
use std::convert::{From, TryFrom};

/// Inline capacity for [`ZmqMessage::frames`]. Sized to fit the two most common
/// shapes on the wire without allocating:
///
/// - single-frame messages (PUB/SUB topic+payload after fast paths, DEALER
///   send, REQ/REP body);
/// - the two-frame REP/REQ envelope (empty delimiter + body).
///
/// ROUTER and DEALER with routing prefixes will spill to the heap on the third
/// frame. Bumping this higher makes the `ZmqMessage` struct fatter, which
/// costs more on clone and move — 2 is the sweet spot for the current
/// socket-type mix.
const INLINE_FRAMES: usize = 2;

/// Frame storage for [`ZmqMessage`].
///
/// Using `SmallVec<[Bytes; 2]>` keeps the two most common message shapes
/// (single-frame, two-frame envelope) entirely stack-allocated. Compared to
/// the previous `VecDeque<Bytes>` representation this eliminates one heap
/// allocation per message in the common case, which was the dominant cost
/// for small-message PUB fanout and REQ/REP round-trips.
type FrameVec = SmallVec<[Bytes; INLINE_FRAMES]>;

/// A multi-frame ZMQ message.
///
/// Construct from a single frame via `From<Bytes>`, `From<Vec<u8>>`, `From<String>`, or
/// `From<&str>`. For multi-frame messages use [`push_back`](ZmqMessage::push_back) or
/// [`TryFrom<Vec<Bytes>>`](ZmqMessage#impl-TryFrom<Vec<Bytes>>-for-ZmqMessage).
///
/// # Examples
///
/// ```
/// use rustzmq2::ZmqMessage;
/// use bytes::Bytes;
///
/// // Single-frame from any byte-ish input.
/// let m1: ZmqMessage = "hello".into();
/// assert_eq!(m1.first().map(|b| b.as_ref()), Some(&b"hello"[..]));
///
/// // Multi-frame envelope: prepend a header onto a body.
/// let mut m2 = ZmqMessage::from(b"body".to_vec());
/// m2.push_front(b"header".to_vec());
/// assert_eq!(m2.len(), 2);
///
/// // Build from any iterator of byte-ish values.
/// let m3: ZmqMessage = ["a", "b", "c"].into_iter().collect();
/// assert_eq!(m3.len(), 3);
///
/// // Reject empty up front.
/// let m4 = ZmqMessage::try_from(vec![Bytes::from_static(b"x"), Bytes::from_static(b"y")]).unwrap();
/// assert_eq!(m4.last().map(|b| b.as_ref()), Some(&b"y"[..]));
/// ```
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
pub struct ZmqMessage {
    frames: FrameVec,
}

impl ZmqMessage {
    /// Append a frame to the end of the message.
    ///
    /// Accepts anything that converts into `Bytes` — `Vec<u8>`, `&[u8]`,
    /// `&'static [u8]`, `String`, `&str`, or a `Bytes` directly.
    pub fn push_back(&mut self, frame: impl Into<Bytes>) {
        self.frames.push(frame.into());
    }

    /// Prepend a frame at the front of the message.
    ///
    /// Accepts anything that converts into `Bytes`.
    pub fn push_front(&mut self, frame: impl Into<Bytes>) {
        self.frames.insert(0, frame.into());
    }

    /// Iterate over frames in order.
    pub fn iter(&self) -> std::slice::Iter<'_, Bytes> {
        self.frames.iter()
    }

    /// Iterate over frames in order, allowing mutation. Replacing a frame
    /// is cheap; in-place editing of `Bytes` is not (it's an immutable
    /// reference-counted slice — call `*frame = new_bytes;` to swap).
    pub fn iter_mut(&mut self) -> std::slice::IterMut<'_, Bytes> {
        self.frames.iter_mut()
    }

    /// First frame, or `None` if the message is empty.
    pub fn first(&self) -> Option<&Bytes> {
        self.frames.first()
    }

    /// Last frame, or `None` if the message is empty.
    pub fn last(&self) -> Option<&Bytes> {
        self.frames.last()
    }

    /// Mutable reference to the first frame, or `None` if the message is empty.
    pub fn first_mut(&mut self) -> Option<&mut Bytes> {
        self.frames.first_mut()
    }

    /// Mutable reference to the last frame, or `None` if the message is empty.
    pub fn last_mut(&mut self) -> Option<&mut Bytes> {
        self.frames.last_mut()
    }

    /// Remove and return the first frame, or `None` if the message is empty.
    /// Useful for peeling envelope prefixes (e.g. ROUTER identity frames).
    pub fn pop_front(&mut self) -> Option<Bytes> {
        if self.frames.is_empty() {
            None
        } else {
            Some(self.frames.remove(0))
        }
    }

    /// Remove and return the last frame, or `None` if the message is empty.
    pub fn pop_back(&mut self) -> Option<Bytes> {
        self.frames.pop()
    }

    /// Number of frames in the message.
    pub fn len(&self) -> usize {
        self.frames.len()
    }

    /// Returns `true` if the message contains no frames.
    pub fn is_empty(&self) -> bool {
        self.frames.is_empty()
    }

    /// Returns a reference to the frame at `index`, or `None` if out of bounds.
    pub fn get(&self, index: usize) -> Option<&Bytes> {
        self.frames.get(index)
    }

    /// Consume the message and return its frames as a `Vec<Bytes>`.
    pub fn into_vec(self) -> Vec<Bytes> {
        self.frames.into_vec()
    }

    /// Consume the message and return its frames as a `VecDeque<Bytes>`.
    pub fn into_vecdeque(self) -> VecDeque<Bytes> {
        VecDeque::from(self.frames.into_vec())
    }

    /// Prepend all frames of `message` to the front of `self`, in order.
    pub fn prepend(&mut self, message: &ZmqMessage) {
        for frame in message.iter().rev() {
            self.push_front(frame.clone());
        }
    }

    /// Split the message at `at`, returning a new message containing frames
    /// `[at..]` while `self` retains `[..at]`.
    pub fn split_off(&mut self, at: usize) -> ZmqMessage {
        let mut tail = FrameVec::with_capacity(self.frames.len().saturating_sub(at));
        while self.frames.len() > at {
            tail.push(self.frames.remove(at));
        }
        ZmqMessage { frames: tail }
    }
}

impl std::ops::Index<usize> for ZmqMessage {
    type Output = Bytes;

    /// Indexed access to a frame. Panics on out-of-bounds; use
    /// [`get`](ZmqMessage::get) for the checked variant.
    fn index(&self, index: usize) -> &Self::Output {
        &self.frames[index]
    }
}

impl<B: Into<Bytes>> FromIterator<B> for ZmqMessage {
    /// Collect frames into a `ZmqMessage`. The resulting message may be
    /// empty if the iterator yielded nothing — most send paths reject
    /// empty messages, so prefer [`TryFrom<Vec<Bytes>>`] when you want
    /// the emptiness check up front.
    fn from_iter<I: IntoIterator<Item = B>>(iter: I) -> Self {
        Self {
            frames: iter.into_iter().map(Into::into).collect(),
        }
    }
}

impl<B: Into<Bytes>> Extend<B> for ZmqMessage {
    fn extend<I: IntoIterator<Item = B>>(&mut self, iter: I) {
        self.frames.extend(iter.into_iter().map(Into::into));
    }
}

impl IntoIterator for ZmqMessage {
    type Item = Bytes;
    type IntoIter = smallvec::IntoIter<[Bytes; INLINE_FRAMES]>;

    fn into_iter(self) -> Self::IntoIter {
        self.frames.into_iter()
    }
}

impl<'a> IntoIterator for &'a ZmqMessage {
    type Item = &'a Bytes;
    type IntoIter = std::slice::Iter<'a, Bytes>;

    fn into_iter(self) -> Self::IntoIter {
        self.frames.iter()
    }
}

impl<'a> IntoIterator for &'a mut ZmqMessage {
    type Item = &'a mut Bytes;
    type IntoIter = std::slice::IterMut<'a, Bytes>;

    fn into_iter(self) -> Self::IntoIter {
        self.frames.iter_mut()
    }
}

impl TryFrom<Vec<Bytes>> for ZmqMessage {
    type Error = ZmqEmptyMessageError;
    fn try_from(v: Vec<Bytes>) -> Result<Self, Self::Error> {
        if v.is_empty() {
            Err(ZmqEmptyMessageError)
        } else {
            Ok(Self {
                frames: SmallVec::from_vec(v),
            })
        }
    }
}

impl TryFrom<VecDeque<Bytes>> for ZmqMessage {
    type Error = ZmqEmptyMessageError;
    fn try_from(v: VecDeque<Bytes>) -> Result<Self, Self::Error> {
        if v.is_empty() {
            Err(ZmqEmptyMessageError)
        } else {
            let vec: Vec<Bytes> = v.into();
            Ok(Self {
                frames: SmallVec::from_vec(vec),
            })
        }
    }
}

impl From<Vec<u8>> for ZmqMessage {
    fn from(v: Vec<u8>) -> Self {
        ZmqMessage::from(Bytes::from(v))
    }
}

impl From<Bytes> for ZmqMessage {
    fn from(b: Bytes) -> Self {
        let mut frames = FrameVec::new();
        frames.push(b);
        Self { frames }
    }
}

impl From<String> for ZmqMessage {
    fn from(s: String) -> Self {
        let b: Bytes = s.into();
        ZmqMessage::from(b)
    }
}

impl From<&str> for ZmqMessage {
    fn from(s: &str) -> Self {
        // `Bytes::copy_from_slice` is one sized allocation — skips the
        // intermediate `String` that `to_owned()` + `Bytes::from(Vec<u8>)`
        // would go through, and hits `Bytes`'s small-slice fast paths.
        ZmqMessage::from(Bytes::copy_from_slice(s.as_bytes()))
    }
}

impl From<&[u8]> for ZmqMessage {
    fn from(b: &[u8]) -> Self {
        ZmqMessage::from(Bytes::copy_from_slice(b))
    }
}

/// Error returned when converting a [`ZmqMessage`] into a single-frame
/// representation (`Vec<u8>` / `String` / `Bytes`) fails.
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum MessageConversionError {
    /// The message had a frame count other than exactly one.
    #[error("expected single-frame message, got {0} frames")]
    NotSingleFrame(usize),
    /// The single frame was not valid UTF-8.
    #[error("frame is not valid UTF-8")]
    InvalidUtf8(#[from] std::string::FromUtf8Error),
}

impl TryFrom<ZmqMessage> for Vec<u8> {
    type Error = MessageConversionError;

    fn try_from(mut z: ZmqMessage) -> Result<Self, Self::Error> {
        if z.len() != 1 {
            return Err(MessageConversionError::NotSingleFrame(z.len()));
        }
        Ok(z.pop_front().unwrap().to_vec())
    }
}

impl TryFrom<ZmqMessage> for Bytes {
    type Error = MessageConversionError;

    fn try_from(mut z: ZmqMessage) -> Result<Self, Self::Error> {
        if z.len() != 1 {
            return Err(MessageConversionError::NotSingleFrame(z.len()));
        }
        Ok(z.pop_front().unwrap())
    }
}

impl TryFrom<ZmqMessage> for String {
    type Error = MessageConversionError;

    fn try_from(mut z: ZmqMessage) -> Result<Self, Self::Error> {
        if z.len() != 1 {
            return Err(MessageConversionError::NotSingleFrame(z.len()));
        }
        Ok(String::from_utf8(z.pop_front().unwrap().to_vec())?)
    }
}