use crate::error::ZmqEmptyMessageError;
use bytes::Bytes;
use smallvec::SmallVec;
use thiserror::Error;
use std::collections::VecDeque;
use std::convert::{From, TryFrom};
const INLINE_FRAMES: usize = 2;
type FrameVec = SmallVec<[Bytes; INLINE_FRAMES]>;
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
pub struct ZmqMessage {
frames: FrameVec,
}
impl ZmqMessage {
pub fn push_back(&mut self, frame: impl Into<Bytes>) {
self.frames.push(frame.into());
}
pub fn push_front(&mut self, frame: impl Into<Bytes>) {
self.frames.insert(0, frame.into());
}
pub fn iter(&self) -> std::slice::Iter<'_, Bytes> {
self.frames.iter()
}
pub fn iter_mut(&mut self) -> std::slice::IterMut<'_, Bytes> {
self.frames.iter_mut()
}
pub fn first(&self) -> Option<&Bytes> {
self.frames.first()
}
pub fn last(&self) -> Option<&Bytes> {
self.frames.last()
}
pub fn first_mut(&mut self) -> Option<&mut Bytes> {
self.frames.first_mut()
}
pub fn last_mut(&mut self) -> Option<&mut Bytes> {
self.frames.last_mut()
}
pub fn pop_front(&mut self) -> Option<Bytes> {
if self.frames.is_empty() {
None
} else {
Some(self.frames.remove(0))
}
}
pub fn pop_back(&mut self) -> Option<Bytes> {
self.frames.pop()
}
pub fn len(&self) -> usize {
self.frames.len()
}
pub fn is_empty(&self) -> bool {
self.frames.is_empty()
}
pub fn get(&self, index: usize) -> Option<&Bytes> {
self.frames.get(index)
}
pub fn into_vec(self) -> Vec<Bytes> {
self.frames.into_vec()
}
pub fn into_vecdeque(self) -> VecDeque<Bytes> {
VecDeque::from(self.frames.into_vec())
}
pub fn prepend(&mut self, message: &ZmqMessage) {
for frame in message.iter().rev() {
self.push_front(frame.clone());
}
}
pub fn split_off(&mut self, at: usize) -> ZmqMessage {
let mut tail = FrameVec::with_capacity(self.frames.len().saturating_sub(at));
while self.frames.len() > at {
tail.push(self.frames.remove(at));
}
ZmqMessage { frames: tail }
}
}
impl std::ops::Index<usize> for ZmqMessage {
type Output = Bytes;
fn index(&self, index: usize) -> &Self::Output {
&self.frames[index]
}
}
impl<B: Into<Bytes>> FromIterator<B> for ZmqMessage {
fn from_iter<I: IntoIterator<Item = B>>(iter: I) -> Self {
Self {
frames: iter.into_iter().map(Into::into).collect(),
}
}
}
impl<B: Into<Bytes>> Extend<B> for ZmqMessage {
fn extend<I: IntoIterator<Item = B>>(&mut self, iter: I) {
self.frames.extend(iter.into_iter().map(Into::into));
}
}
impl IntoIterator for ZmqMessage {
type Item = Bytes;
type IntoIter = smallvec::IntoIter<[Bytes; INLINE_FRAMES]>;
fn into_iter(self) -> Self::IntoIter {
self.frames.into_iter()
}
}
impl<'a> IntoIterator for &'a ZmqMessage {
type Item = &'a Bytes;
type IntoIter = std::slice::Iter<'a, Bytes>;
fn into_iter(self) -> Self::IntoIter {
self.frames.iter()
}
}
impl<'a> IntoIterator for &'a mut ZmqMessage {
type Item = &'a mut Bytes;
type IntoIter = std::slice::IterMut<'a, Bytes>;
fn into_iter(self) -> Self::IntoIter {
self.frames.iter_mut()
}
}
impl TryFrom<Vec<Bytes>> for ZmqMessage {
type Error = ZmqEmptyMessageError;
fn try_from(v: Vec<Bytes>) -> Result<Self, Self::Error> {
if v.is_empty() {
Err(ZmqEmptyMessageError)
} else {
Ok(Self {
frames: SmallVec::from_vec(v),
})
}
}
}
impl TryFrom<VecDeque<Bytes>> for ZmqMessage {
type Error = ZmqEmptyMessageError;
fn try_from(v: VecDeque<Bytes>) -> Result<Self, Self::Error> {
if v.is_empty() {
Err(ZmqEmptyMessageError)
} else {
let vec: Vec<Bytes> = v.into();
Ok(Self {
frames: SmallVec::from_vec(vec),
})
}
}
}
impl From<Vec<u8>> for ZmqMessage {
fn from(v: Vec<u8>) -> Self {
ZmqMessage::from(Bytes::from(v))
}
}
impl From<Bytes> for ZmqMessage {
fn from(b: Bytes) -> Self {
let mut frames = FrameVec::new();
frames.push(b);
Self { frames }
}
}
impl From<String> for ZmqMessage {
fn from(s: String) -> Self {
let b: Bytes = s.into();
ZmqMessage::from(b)
}
}
impl From<&str> for ZmqMessage {
fn from(s: &str) -> Self {
ZmqMessage::from(Bytes::copy_from_slice(s.as_bytes()))
}
}
impl From<&[u8]> for ZmqMessage {
fn from(b: &[u8]) -> Self {
ZmqMessage::from(Bytes::copy_from_slice(b))
}
}
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum MessageConversionError {
#[error("expected single-frame message, got {0} frames")]
NotSingleFrame(usize),
#[error("frame is not valid UTF-8")]
InvalidUtf8(#[from] std::string::FromUtf8Error),
}
impl TryFrom<ZmqMessage> for Vec<u8> {
type Error = MessageConversionError;
fn try_from(mut z: ZmqMessage) -> Result<Self, Self::Error> {
if z.len() != 1 {
return Err(MessageConversionError::NotSingleFrame(z.len()));
}
Ok(z.pop_front().unwrap().to_vec())
}
}
impl TryFrom<ZmqMessage> for Bytes {
type Error = MessageConversionError;
fn try_from(mut z: ZmqMessage) -> Result<Self, Self::Error> {
if z.len() != 1 {
return Err(MessageConversionError::NotSingleFrame(z.len()));
}
Ok(z.pop_front().unwrap())
}
}
impl TryFrom<ZmqMessage> for String {
type Error = MessageConversionError;
fn try_from(mut z: ZmqMessage) -> Result<Self, Self::Error> {
if z.len() != 1 {
return Err(MessageConversionError::NotSingleFrame(z.len()));
}
Ok(String::from_utf8(z.pop_front().unwrap().to_vec())?)
}
}