use alloc::string::ToString;
use alloc::vec::Vec;
use bytes::{BufMut, Bytes, BytesMut};
const MAX_COLLECTION_SIZE: usize = 10_000_000;
const MAX_BULK_STRING_SIZE: usize = 512 * 1024 * 1024;
#[derive(Default, Debug)]
pub struct Parser {
buffer: BytesMut,
}
impl Parser {
pub fn new() -> Self {
Self {
buffer: BytesMut::new(),
}
}
pub fn feed(&mut self, data: Bytes) {
self.buffer.extend_from_slice(&data);
}
pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
if self.buffer.is_empty() {
return Ok(None);
}
let bytes = self.buffer.split().freeze();
match parse_frame_inner(&bytes, 0) {
Ok((frame, consumed)) => {
if consumed < bytes.len() {
self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
}
Ok(Some(frame))
}
Err(ParseError::Incomplete) => {
self.buffer.unsplit(bytes.into());
Ok(None)
}
Err(e) => {
Err(e)
}
}
}
pub fn buffered_bytes(&self) -> usize {
self.buffer.len()
}
pub fn clear(&mut self) {
self.buffer.clear();
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum Frame {
SimpleString(Bytes),
Error(Bytes),
Integer(i64),
BulkString(Option<Bytes>),
BlobError(Bytes),
StreamedStringHeader,
StreamedBlobErrorHeader,
StreamedVerbatimStringHeader,
StreamedArrayHeader,
StreamedSetHeader,
StreamedMapHeader,
StreamedAttributeHeader,
StreamedPushHeader,
StreamedStringChunk(Bytes),
StreamedString(Vec<Bytes>),
StreamedArray(Vec<Frame>),
StreamedSet(Vec<Frame>),
StreamedMap(Vec<(Frame, Frame)>),
StreamedAttribute(Vec<(Frame, Frame)>),
StreamedPush(Vec<Frame>),
StreamTerminator,
Null,
Double(f64),
SpecialFloat(Bytes),
Boolean(bool),
BigNumber(Bytes),
VerbatimString(Bytes, Bytes),
Array(Option<Vec<Frame>>),
Set(Vec<Frame>),
Map(Vec<(Frame, Frame)>),
Attribute(Vec<(Frame, Frame)>),
Push(Vec<Frame>),
}
impl Frame {
pub fn as_bytes(&self) -> Option<&Bytes> {
match self {
Frame::SimpleString(b)
| Frame::Error(b)
| Frame::BlobError(b)
| Frame::BigNumber(b) => Some(b),
Frame::BulkString(opt) => opt.as_ref(),
_ => None,
}
}
pub fn as_str(&self) -> Option<&str> {
self.as_bytes().and_then(|b| core::str::from_utf8(b).ok())
}
pub fn as_integer(&self) -> Option<i64> {
match self {
Frame::Integer(v) => Some(*v),
_ => None,
}
}
pub fn as_double(&self) -> Option<f64> {
match self {
Frame::Double(v) => Some(*v),
_ => None,
}
}
pub fn as_boolean(&self) -> Option<bool> {
match self {
Frame::Boolean(v) => Some(*v),
_ => None,
}
}
pub fn as_array(&self) -> Option<&[Frame]> {
match self {
Frame::Array(Some(items)) => Some(items),
_ => None,
}
}
pub fn as_set(&self) -> Option<&[Frame]> {
match self {
Frame::Set(items) => Some(items),
_ => None,
}
}
pub fn as_map(&self) -> Option<&[(Frame, Frame)]> {
match self {
Frame::Map(pairs) => Some(pairs),
_ => None,
}
}
pub fn as_push(&self) -> Option<&[Frame]> {
match self {
Frame::Push(items) => Some(items),
_ => None,
}
}
pub fn as_verbatim_string(&self) -> Option<(&Bytes, &Bytes)> {
match self {
Frame::VerbatimString(format, content) => Some((format, content)),
_ => None,
}
}
pub fn into_array(self) -> Result<Vec<Frame>, Frame> {
match self {
Frame::Array(Some(items)) => Ok(items),
other => Err(other),
}
}
pub fn into_bulk_string(self) -> Result<Bytes, Frame> {
match self {
Frame::BulkString(Some(b)) => Ok(b),
other => Err(other),
}
}
pub fn into_map(self) -> Result<Vec<(Frame, Frame)>, Frame> {
match self {
Frame::Map(pairs) => Ok(pairs),
other => Err(other),
}
}
pub fn into_set(self) -> Result<Vec<Frame>, Frame> {
match self {
Frame::Set(items) => Ok(items),
other => Err(other),
}
}
pub fn is_null(&self) -> bool {
matches!(
self,
Frame::Null | Frame::BulkString(None) | Frame::Array(None)
)
}
pub fn is_error(&self) -> bool {
matches!(self, Frame::Error(_) | Frame::BlobError(_))
}
}
pub use crate::ParseError;
pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
let (frame, consumed) = parse_frame_inner(&input, 0)?;
Ok((frame, input.slice(consumed..)))
}
#[inline(never)]
fn parse_blob_bounds(
buf: &[u8],
after_crlf: usize,
len: usize,
) -> Result<(usize, usize), ParseError> {
if len == 0 {
if after_crlf + 1 >= buf.len() {
return Err(ParseError::Incomplete);
}
if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
return Ok((after_crlf, after_crlf));
} else {
return Err(ParseError::InvalidFormat);
}
}
let data_start = after_crlf;
let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
if data_end + 1 >= buf.len() {
return Err(ParseError::Incomplete);
}
if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
return Err(ParseError::InvalidFormat);
}
Ok((data_start, data_end))
}
fn parse_bulk_string(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
let len_bytes = &buf[pos + 1..line_end];
if len_bytes == b"?" {
return Ok((Frame::StreamedStringHeader, after_crlf));
}
if len_bytes == b"-1" {
return Ok((Frame::BulkString(None), after_crlf));
}
let len = parse_usize(len_bytes)?;
if len > MAX_BULK_STRING_SIZE {
return Err(ParseError::BadLength);
}
let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
if data_start == data_end {
Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2))
} else {
Ok((
Frame::BulkString(Some(input.slice(data_start..data_end))),
data_end + 2,
))
}
}
#[inline(never)]
fn parse_double_frame(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
let line_bytes = &buf[pos + 1..line_end];
if line_bytes == b"inf" || line_bytes == b"-inf" || line_bytes == b"nan" {
return Ok((
Frame::SpecialFloat(input.slice(pos + 1..line_end)),
after_crlf,
));
}
let s = core::str::from_utf8(line_bytes).map_err(|_| ParseError::Utf8Error)?;
let v = s.parse::<f64>().map_err(|_| ParseError::InvalidFormat)?;
if v.is_infinite() || v.is_nan() {
let canonical = if v.is_nan() {
"nan"
} else if v.is_sign_negative() {
"-inf"
} else {
"inf"
};
return Ok((Frame::SpecialFloat(Bytes::from(canonical)), after_crlf));
}
Ok((Frame::Double(v), after_crlf))
}
#[inline(never)]
fn parse_verbatim(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
let len_bytes = &buf[pos + 1..line_end];
if len_bytes == b"?" {
return Ok((Frame::StreamedVerbatimStringHeader, after_crlf));
}
if len_bytes == b"-1" {
return Err(ParseError::BadLength);
}
let len = parse_usize(len_bytes)?;
if len > MAX_BULK_STRING_SIZE {
return Err(ParseError::BadLength);
}
let data_start = after_crlf;
let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
if data_end + 1 >= buf.len() {
return Err(ParseError::Incomplete);
}
if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
return Err(ParseError::InvalidFormat);
}
let sep = buf[data_start..data_end]
.iter()
.position(|&b| b == b':')
.ok_or(ParseError::InvalidFormat)?;
if sep != 3 {
return Err(ParseError::InvalidFormat);
}
let format = input.slice(data_start..data_start + sep);
let content = input.slice(data_start + sep + 1..data_end);
Ok((Frame::VerbatimString(format, content), data_end + 2))
}
#[inline(never)]
fn parse_blob_error(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
let len_bytes = &buf[pos + 1..line_end];
if len_bytes == b"?" {
return Ok((Frame::StreamedBlobErrorHeader, after_crlf));
}
if len_bytes == b"-1" {
return Err(ParseError::BadLength);
}
let len = parse_usize(len_bytes)?;
if len > MAX_BULK_STRING_SIZE {
return Err(ParseError::BadLength);
}
let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
if data_start == data_end {
Ok((Frame::BlobError(Bytes::new()), after_crlf + 2))
} else {
Ok((
Frame::BlobError(input.slice(data_start..data_end)),
data_end + 2,
))
}
}
#[inline(never)]
fn parse_collection(
input: &Bytes,
buf: &[u8],
pos: usize,
tag: u8,
) -> Result<(Frame, usize), ParseError> {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
let len_bytes = &buf[pos + 1..line_end];
if len_bytes == b"?" {
return match tag {
b'*' => Ok((Frame::StreamedArrayHeader, after_crlf)),
b'~' => Ok((Frame::StreamedSetHeader, after_crlf)),
b'>' => Ok((Frame::StreamedPushHeader, after_crlf)),
_ => unreachable!(),
};
}
if tag == b'*' && len_bytes == b"-1" {
return Ok((Frame::Array(None), after_crlf));
}
let count = parse_count(len_bytes)?;
if count == 0 {
return match tag {
b'*' => Ok((Frame::Array(Some(Vec::new())), after_crlf)),
b'~' => Ok((Frame::Set(Vec::new()), after_crlf)),
b'>' => Ok((Frame::Push(Vec::new()), after_crlf)),
_ => unreachable!(),
};
}
let mut cursor = after_crlf;
let mut items = Vec::with_capacity(count);
for _ in 0..count {
let (item, next) = parse_frame_inner(input, cursor)?;
items.push(item);
cursor = next;
}
match tag {
b'*' => Ok((Frame::Array(Some(items)), cursor)),
b'~' => Ok((Frame::Set(items), cursor)),
b'>' => Ok((Frame::Push(items), cursor)),
_ => unreachable!(),
}
}
#[inline(never)]
fn parse_pairs(
input: &Bytes,
buf: &[u8],
pos: usize,
tag: u8,
) -> Result<(Frame, usize), ParseError> {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
let len_bytes = &buf[pos + 1..line_end];
if len_bytes == b"?" {
return if tag == b'%' {
Ok((Frame::StreamedMapHeader, after_crlf))
} else {
Ok((Frame::StreamedAttributeHeader, after_crlf))
};
}
let count = parse_count(len_bytes)?;
let mut cursor = after_crlf;
let mut pairs = Vec::with_capacity(count);
for _ in 0..count {
let (key, next1) = parse_frame_inner(input, cursor)?;
let (val, next2) = parse_frame_inner(input, next1)?;
pairs.push((key, val));
cursor = next2;
}
if tag == b'%' {
Ok((Frame::Map(pairs), cursor))
} else {
Ok((Frame::Attribute(pairs), cursor))
}
}
#[inline(never)]
fn parse_streamed_chunk(
input: &Bytes,
buf: &[u8],
pos: usize,
) -> Result<(Frame, usize), ParseError> {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
let len = parse_usize(&buf[pos + 1..line_end])?;
if len > MAX_BULK_STRING_SIZE {
return Err(ParseError::BadLength);
}
let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
if data_start == data_end {
Ok((Frame::StreamedStringChunk(Bytes::new()), after_crlf + 2))
} else {
Ok((
Frame::StreamedStringChunk(input.slice(data_start..data_end)),
data_end + 2,
))
}
}
pub(crate) fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
let buf = input.as_ref();
if pos >= buf.len() {
return Err(ParseError::Incomplete);
}
let tag = buf[pos];
match tag {
b'+' => {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
Ok((
Frame::SimpleString(input.slice(pos + 1..line_end)),
after_crlf,
))
}
b'-' => {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
}
b':' => {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
let v = parse_i64(&buf[pos + 1..line_end])?;
Ok((Frame::Integer(v), after_crlf))
}
b'#' => {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
match &buf[pos + 1..line_end] {
b"t" => Ok((Frame::Boolean(true), after_crlf)),
b"f" => Ok((Frame::Boolean(false), after_crlf)),
_ => Err(ParseError::InvalidBoolean),
}
}
b'(' => {
let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
Ok((Frame::BigNumber(input.slice(pos + 1..line_end)), after_crlf))
}
b'_' => {
if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
Ok((Frame::Null, pos + 3))
} else {
Err(ParseError::Incomplete)
}
}
b'.' => {
if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
Ok((Frame::StreamTerminator, pos + 3))
} else {
Err(ParseError::Incomplete)
}
}
b'$' => parse_bulk_string(input, buf, pos),
b',' => parse_double_frame(input, buf, pos),
b'=' => parse_verbatim(input, buf, pos),
b'!' => parse_blob_error(input, buf, pos),
b';' => parse_streamed_chunk(input, buf, pos),
b'*' | b'~' | b'>' => parse_collection(input, buf, pos, tag),
b'%' | b'|' => parse_pairs(input, buf, pos, tag),
_ => Err(ParseError::InvalidTag(tag)),
}
}
#[cfg(feature = "unsafe-internals")]
#[path = "resp3_unchecked.rs"]
mod unchecked;
#[cfg(feature = "unsafe-internals")]
pub use unchecked::parse_frame_unchecked;
#[cfg(feature = "codec")]
#[path = "resp3_codec.rs"]
mod codec_impl;
#[cfg(feature = "codec")]
pub use codec_impl::Codec;
pub fn parse_streaming_sequence(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
if input.is_empty() {
return Err(ParseError::Incomplete);
}
let (header, mut rest) = parse_frame(input)?;
match header {
Frame::StreamedStringHeader => {
let mut chunks = Vec::new();
loop {
let (frame, new_rest) = parse_frame(rest)?;
rest = new_rest;
match frame {
Frame::StreamedStringChunk(chunk) => {
if chunk.is_empty() {
break;
}
chunks.push(chunk);
}
_ => {
return Err(ParseError::InvalidFormat);
}
}
}
Ok((Frame::StreamedString(chunks), rest))
}
Frame::StreamedBlobErrorHeader | Frame::StreamedVerbatimStringHeader => {
Ok((header, rest))
}
Frame::StreamedArrayHeader => {
let mut items = Vec::new();
loop {
let (frame, new_rest) = parse_frame(rest)?;
rest = new_rest;
match frame {
Frame::StreamTerminator => {
break;
}
item => {
items.push(item);
}
}
}
Ok((Frame::StreamedArray(items), rest))
}
Frame::StreamedSetHeader => {
let mut items = Vec::new();
loop {
let (frame, new_rest) = parse_frame(rest)?;
rest = new_rest;
match frame {
Frame::StreamTerminator => {
break;
}
item => {
items.push(item);
}
}
}
Ok((Frame::StreamedSet(items), rest))
}
Frame::StreamedMapHeader => {
let mut pairs = Vec::new();
loop {
let (frame, new_rest) = parse_frame(rest)?;
rest = new_rest;
match frame {
Frame::StreamTerminator => {
break;
}
key => {
let (value, newer_rest) = parse_frame(rest)?;
if matches!(value, Frame::StreamTerminator) {
return Err(ParseError::InvalidFormat);
}
rest = newer_rest;
pairs.push((key, value));
}
}
}
Ok((Frame::StreamedMap(pairs), rest))
}
Frame::StreamedAttributeHeader => {
let mut pairs = Vec::new();
loop {
let (frame, new_rest) = parse_frame(rest)?;
rest = new_rest;
match frame {
Frame::StreamTerminator => {
break;
}
key => {
let (value, newer_rest) = parse_frame(rest)?;
if matches!(value, Frame::StreamTerminator) {
return Err(ParseError::InvalidFormat);
}
rest = newer_rest;
pairs.push((key, value));
}
}
}
Ok((Frame::StreamedAttribute(pairs), rest))
}
Frame::StreamedPushHeader => {
let mut items = Vec::new();
loop {
let (frame, new_rest) = parse_frame(rest)?;
rest = new_rest;
match frame {
Frame::StreamTerminator => {
break;
}
item => {
items.push(item);
}
}
}
Ok((Frame::StreamedPush(items), rest))
}
_ => {
Ok((header, rest))
}
}
}
#[inline]
fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
let mut i = from;
let len = buf.len();
while i + 1 < len {
if buf[i] == b'\r' && buf[i + 1] == b'\n' {
return Ok((i, i + 2));
}
i += 1;
}
Err(ParseError::Incomplete)
}
#[inline]
fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
if buf.is_empty() {
return Err(ParseError::BadLength);
}
let mut v: usize = 0;
for &b in buf {
if !b.is_ascii_digit() {
return Err(ParseError::BadLength);
}
v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
v = v
.checked_add((b - b'0') as usize)
.ok_or(ParseError::BadLength)?;
}
Ok(v)
}
#[inline]
fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
if buf.is_empty() {
return Err(ParseError::InvalidFormat);
}
let (neg, digits) = if buf[0] == b'-' {
(true, &buf[1..])
} else {
(false, buf)
};
if digits.is_empty() {
return Err(ParseError::InvalidFormat);
}
let mut v: i64 = 0;
for (i, &d) in digits.iter().enumerate() {
if !d.is_ascii_digit() {
return Err(ParseError::InvalidFormat);
}
let digit = (d - b'0') as i64;
if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
return Ok(i64::MIN);
}
if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
return Err(ParseError::Overflow);
}
v = v * 10 + digit;
}
if neg { Ok(-v) } else { Ok(v) }
}
#[inline]
fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
let count = parse_usize(buf)?;
if count > MAX_COLLECTION_SIZE {
return Err(ParseError::BadLength);
}
Ok(count)
}
pub fn frame_to_bytes(frame: &Frame) -> Bytes {
let mut buf = BytesMut::new();
serialize_frame(frame, &mut buf);
buf.freeze()
}
fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
match frame {
Frame::SimpleString(s) => {
buf.put_u8(b'+');
buf.extend_from_slice(s);
buf.extend_from_slice(b"\r\n");
}
Frame::Error(e) => {
buf.put_u8(b'-');
buf.extend_from_slice(e);
buf.extend_from_slice(b"\r\n");
}
Frame::Integer(i) => {
buf.put_u8(b':');
let s = i.to_string();
buf.extend_from_slice(s.as_bytes());
buf.extend_from_slice(b"\r\n");
}
Frame::BulkString(opt) => {
buf.put_u8(b'$');
match opt {
Some(data) => {
let len = data.len().to_string();
buf.extend_from_slice(len.as_bytes());
buf.extend_from_slice(b"\r\n");
buf.extend_from_slice(data);
buf.extend_from_slice(b"\r\n");
}
None => {
buf.extend_from_slice(b"-1\r\n");
}
}
}
Frame::BlobError(data) => {
buf.put_u8(b'!');
let len = data.len().to_string();
buf.extend_from_slice(len.as_bytes());
buf.extend_from_slice(b"\r\n");
buf.extend_from_slice(data);
buf.extend_from_slice(b"\r\n");
}
Frame::StreamedStringHeader => {
buf.extend_from_slice(b"$?\r\n");
}
Frame::StreamedBlobErrorHeader => {
buf.extend_from_slice(b"!?\r\n");
}
Frame::StreamedVerbatimStringHeader => {
buf.extend_from_slice(b"=?\r\n");
}
Frame::StreamedArrayHeader => {
buf.extend_from_slice(b"*?\r\n");
}
Frame::StreamedSetHeader => {
buf.extend_from_slice(b"~?\r\n");
}
Frame::StreamedMapHeader => {
buf.extend_from_slice(b"%?\r\n");
}
Frame::StreamedAttributeHeader => {
buf.extend_from_slice(b"|?\r\n");
}
Frame::StreamedPushHeader => {
buf.extend_from_slice(b">?\r\n");
}
Frame::StreamedStringChunk(data) => {
buf.put_u8(b';');
let len = data.len().to_string();
buf.extend_from_slice(len.as_bytes());
buf.extend_from_slice(b"\r\n");
buf.extend_from_slice(data);
buf.extend_from_slice(b"\r\n");
}
Frame::StreamedString(chunks) => {
buf.extend_from_slice(b"$?\r\n");
for chunk in chunks {
buf.put_u8(b';');
let len = chunk.len().to_string();
buf.extend_from_slice(len.as_bytes());
buf.extend_from_slice(b"\r\n");
buf.extend_from_slice(chunk);
buf.extend_from_slice(b"\r\n");
}
buf.extend_from_slice(b";0\r\n\r\n");
}
Frame::StreamedArray(items) => {
buf.extend_from_slice(b"*?\r\n");
for item in items {
serialize_frame(item, buf);
}
buf.extend_from_slice(b".\r\n");
}
Frame::StreamedSet(items) => {
buf.extend_from_slice(b"~?\r\n");
for item in items {
serialize_frame(item, buf);
}
buf.extend_from_slice(b".\r\n");
}
Frame::StreamedMap(pairs) => {
buf.extend_from_slice(b"%?\r\n");
for (key, value) in pairs {
serialize_frame(key, buf);
serialize_frame(value, buf);
}
buf.extend_from_slice(b".\r\n");
}
Frame::StreamedAttribute(pairs) => {
buf.extend_from_slice(b"|?\r\n");
for (key, value) in pairs {
serialize_frame(key, buf);
serialize_frame(value, buf);
}
buf.extend_from_slice(b".\r\n");
}
Frame::StreamedPush(items) => {
buf.extend_from_slice(b">?\r\n");
for item in items {
serialize_frame(item, buf);
}
buf.extend_from_slice(b".\r\n");
}
Frame::StreamTerminator => {
buf.extend_from_slice(b".\r\n");
}
Frame::Null => {
buf.extend_from_slice(b"_\r\n");
}
Frame::Double(d) => {
buf.put_u8(b',');
let s = d.to_string();
buf.extend_from_slice(s.as_bytes());
buf.extend_from_slice(b"\r\n");
}
Frame::SpecialFloat(f) => {
buf.put_u8(b',');
buf.extend_from_slice(f);
buf.extend_from_slice(b"\r\n");
}
Frame::Boolean(b) => {
buf.extend_from_slice(if *b { b"#t\r\n" } else { b"#f\r\n" });
}
Frame::BigNumber(n) => {
buf.put_u8(b'(');
buf.extend_from_slice(n);
buf.extend_from_slice(b"\r\n");
}
Frame::VerbatimString(format, content) => {
buf.put_u8(b'=');
let total_len = format.len() + 1 + content.len(); let len = total_len.to_string();
buf.extend_from_slice(len.as_bytes());
buf.extend_from_slice(b"\r\n");
buf.extend_from_slice(format);
buf.put_u8(b':');
buf.extend_from_slice(content);
buf.extend_from_slice(b"\r\n");
}
Frame::Array(opt) => {
buf.put_u8(b'*');
match opt {
Some(items) => {
let len = items.len().to_string();
buf.extend_from_slice(len.as_bytes());
buf.extend_from_slice(b"\r\n");
for item in items {
serialize_frame(item, buf);
}
}
None => {
buf.extend_from_slice(b"-1\r\n");
}
}
}
Frame::Set(items) => {
buf.put_u8(b'~');
let len = items.len().to_string();
buf.extend_from_slice(len.as_bytes());
buf.extend_from_slice(b"\r\n");
for item in items {
serialize_frame(item, buf);
}
}
Frame::Map(pairs) => {
buf.put_u8(b'%');
let len = pairs.len().to_string();
buf.extend_from_slice(len.as_bytes());
buf.extend_from_slice(b"\r\n");
for (key, value) in pairs {
serialize_frame(key, buf);
serialize_frame(value, buf);
}
}
Frame::Attribute(pairs) => {
buf.put_u8(b'|');
let len = pairs.len().to_string();
buf.extend_from_slice(len.as_bytes());
buf.extend_from_slice(b"\r\n");
for (key, value) in pairs {
serialize_frame(key, buf);
serialize_frame(value, buf);
}
}
Frame::Push(items) => {
buf.put_u8(b'>');
let len = items.len().to_string();
buf.extend_from_slice(len.as_bytes());
buf.extend_from_slice(b"\r\n");
for item in items {
serialize_frame(item, buf);
}
}
}
}
#[cfg(test)]
mod tests {
use super::{Frame, ParseError, Parser, frame_to_bytes, parse_frame, parse_streaming_sequence};
use bytes::Bytes;
#[test]
fn test_parse_frame_simple_string() {
let input = Bytes::from("+HELLO\r\nWORLD");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::SimpleString(Bytes::from("HELLO")));
assert_eq!(rest, Bytes::from("WORLD"));
}
#[test]
fn test_parse_frame_blob_error() {
let input = Bytes::from("!5\r\nERROR\r\nREST");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::BlobError(Bytes::from("ERROR")));
assert_eq!(rest, Bytes::from("REST"));
}
#[test]
fn test_parse_frame_error() {
let input = Bytes::from("-ERR fail\r\nLEFT");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
assert_eq!(rest, Bytes::from("LEFT"));
}
#[test]
fn test_parse_frame_integer() {
let input = Bytes::from(":42\r\nTAIL");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::Integer(42));
assert_eq!(rest, Bytes::from("TAIL"));
}
#[test]
fn test_parse_frame_bulk_string() {
let input = Bytes::from("$3\r\nfoo\r\nREST");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::BulkString(Some(Bytes::from("foo"))));
assert_eq!(rest, Bytes::from("REST"));
let null_input = Bytes::from("$-1\r\nAFTER");
let (frame, rest) = parse_frame(null_input.clone()).unwrap();
assert_eq!(frame, Frame::BulkString(None));
assert_eq!(rest, Bytes::from("AFTER"));
}
#[test]
fn test_parse_frame_null() {
let input = Bytes::from("_\r\nLEFT");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::Null);
assert_eq!(rest, Bytes::from("LEFT"));
}
#[test]
fn test_parse_frame_double_and_special_float() {
let input = Bytes::from(",3.5\r\nNEXT");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::Double(3.5));
assert_eq!(rest, Bytes::from("NEXT"));
let input_inf = Bytes::from(",inf\r\nTAIL");
let (frame, rest) = parse_frame(input_inf.clone()).unwrap();
assert_eq!(frame, Frame::SpecialFloat(Bytes::from("inf")));
assert_eq!(rest, Bytes::from("TAIL"));
}
#[test]
fn test_parse_frame_boolean() {
let input_true = Bytes::from("#t\r\nXYZ");
let (frame, rest) = parse_frame(input_true.clone()).unwrap();
assert_eq!(frame, Frame::Boolean(true));
assert_eq!(rest, Bytes::from("XYZ"));
let input_false = Bytes::from("#f\r\nDONE");
let (frame, rest) = parse_frame(input_false.clone()).unwrap();
assert_eq!(frame, Frame::Boolean(false));
assert_eq!(rest, Bytes::from("DONE"));
}
#[test]
fn test_parse_frame_big_number() {
let input = Bytes::from("(123456789\r\nEND");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::BigNumber(Bytes::from("123456789")));
assert_eq!(rest, Bytes::from("END"));
}
#[test]
fn test_parse_frame_verbatim_string() {
let input = Bytes::from("=12\r\ntxt:hi there\r\nAFTER");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(
frame,
Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hi there")) );
assert_eq!(rest, Bytes::from("AFTER"));
}
#[test]
fn test_parse_frame_array_set_push_map_attribute() {
let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\nTAIL");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(
frame,
Frame::Array(Some(vec![
Frame::BulkString(Some(Bytes::from("foo"))),
Frame::BulkString(Some(Bytes::from("bar")))
]))
);
assert_eq!(rest, Bytes::from("TAIL"));
let input_null = Bytes::from("*-1\r\nEND");
let (frame, rest) = parse_frame(input_null.clone()).unwrap();
assert_eq!(frame, Frame::Array(None));
assert_eq!(rest, Bytes::from("END"));
let input_set = Bytes::from("~2\r\n+foo\r\n+bar\r\nTAIL");
let (frame, rest) = parse_frame(input_set.clone()).unwrap();
assert_eq!(
frame,
Frame::Set(vec![
Frame::SimpleString(Bytes::from("foo")),
Frame::SimpleString(Bytes::from("bar")),
])
);
assert_eq!(rest, Bytes::from("TAIL"));
let input_map = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\nTRAIL");
let (frame, rest) = parse_frame(input_map.clone()).unwrap();
assert_eq!(
frame,
Frame::Map(vec![
(
Frame::SimpleString(Bytes::from("key1")),
Frame::SimpleString(Bytes::from("val1"))
),
(
Frame::SimpleString(Bytes::from("key2")),
Frame::SimpleString(Bytes::from("val2"))
),
])
);
assert_eq!(rest, Bytes::from("TRAIL"));
let input_attr = Bytes::from("|1\r\n+meta\r\n+data\r\nAFTER");
let (frame, rest) = parse_frame(input_attr.clone()).unwrap();
assert_eq!(
frame,
Frame::Attribute(vec![(
Frame::SimpleString(Bytes::from("meta")),
Frame::SimpleString(Bytes::from("data"))
),])
);
assert_eq!(rest, Bytes::from("AFTER"));
let input_push = Bytes::from(">2\r\n+type\r\n:1\r\nNEXT");
let (frame, rest) = parse_frame(input_push.clone()).unwrap();
assert_eq!(
frame,
Frame::Push(vec![
Frame::SimpleString(Bytes::from("type")),
Frame::Integer(1),
])
);
assert_eq!(rest, Bytes::from("NEXT"));
}
#[test]
fn test_parse_frame_empty_input() {
assert!(parse_frame(Bytes::new()).is_err());
}
#[test]
fn test_parse_frame_invalid_tag() {
let input = Bytes::from("X123\r\n");
assert!(parse_frame(input).is_err());
}
#[test]
fn test_parse_frame_malformed_bulk_length() {
let input = Bytes::from("$x\r\nfoo\r\n");
assert!(parse_frame(input).is_err());
}
#[test]
fn test_parse_frame_zero_length_bulk() {
let input = Bytes::from("$0\r\n\r\nTAIL");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::BulkString(Some(Bytes::from(""))));
assert_eq!(rest, Bytes::from("TAIL"));
}
#[test]
fn test_parse_frame_zero_length_blob_error() {
let input = Bytes::from("!0\r\n\r\nREST");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::BlobError(Bytes::new()));
assert_eq!(rest, Bytes::from("REST"));
}
#[test]
fn test_parse_frame_missing_crlf() {
let input = Bytes::from(":42\nTAIL");
assert!(parse_frame(input).is_err());
}
#[test]
fn test_parse_frame_unicode_simple_string() {
let input = Bytes::from("+こんにちは\r\nEND");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::SimpleString(Bytes::from("こんにちは")));
assert_eq!(rest, Bytes::from("END"));
}
#[test]
fn test_parse_frame_chained_frames() {
let combined = Bytes::from("+OK\r\n:1\r\nfoo");
let (f1, rem) = parse_frame(combined.clone()).unwrap();
assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
let (f2, rem2) = parse_frame(rem).unwrap();
assert_eq!(f2, Frame::Integer(1));
assert_eq!(rem2, Bytes::from("foo"));
}
#[test]
fn test_parse_frame_empty_array() {
let input = Bytes::from("*0\r\nTAIL");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::Array(Some(vec![])));
assert_eq!(rest, Bytes::from("TAIL"));
}
#[test]
fn test_parse_frame_partial_array_data() {
let input = Bytes::from("*2\r\n+OK\r\n");
assert!(parse_frame(input).is_err());
}
#[test]
fn test_parse_frame_streamed_string() {
let input = Bytes::from("$?\r\n$5\r\nhello\r\n$0\r\n\r\nREST");
let (frame, rem) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::StreamedStringHeader);
let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
assert_eq!(chunk, Frame::BulkString(Some(Bytes::from("hello"))));
let (terminator, rest) = parse_frame(rem2.clone()).unwrap();
assert_eq!(terminator, Frame::BulkString(Some(Bytes::from(""))));
assert_eq!(rest, Bytes::from("REST"));
}
#[test]
fn test_parse_frame_streamed_blob_error() {
let input = Bytes::from("!?\r\n!5\r\nERROR\r\nREST");
let (frame, rem) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::StreamedBlobErrorHeader);
let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
assert_eq!(chunk, Frame::BlobError(Bytes::from("ERROR")));
assert_eq!(rem2, Bytes::from("REST"));
}
#[test]
fn test_parse_frame_streamed_verbatim_string() {
let input = Bytes::from("=?\r\n=9\r\ntxt:hello\r\nTAIL");
let (frame, rem) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
let (chunk, rest) = parse_frame(rem.clone()).unwrap();
assert_eq!(
chunk,
Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello")) );
assert_eq!(rest, Bytes::from("TAIL"));
}
#[test]
fn test_parse_frame_streamed_array() {
let input = Bytes::from("*?\r\n+one\r\n+two\r\n*0\r\nEND");
let (header, rem) = parse_frame(input.clone()).unwrap();
assert_eq!(header, Frame::StreamedArrayHeader);
let (item1, rem2) = parse_frame(rem.clone()).unwrap();
assert_eq!(item1, Frame::SimpleString(Bytes::from("one")));
let (item2, rem3) = parse_frame(rem2.clone()).unwrap();
assert_eq!(item2, Frame::SimpleString(Bytes::from("two")));
let (terminator, rest) = parse_frame(rem3.clone()).unwrap();
assert_eq!(terminator, Frame::Array(Some(vec![])));
assert_eq!(rest, Bytes::from("END"));
}
#[test]
fn test_parse_frame_streamed_set_map_attr_push() {
let input_set = Bytes::from("~?\r\n+foo\r\n+bar\r\n~0\r\nTAIL");
let (h_set, rem0) = parse_frame(input_set.clone()).unwrap();
assert_eq!(h_set, Frame::StreamedSetHeader);
let (s1, rem1) = parse_frame(rem0.clone()).unwrap();
assert_eq!(s1, Frame::SimpleString(Bytes::from("foo")));
let (s2, rem2) = parse_frame(rem1.clone()).unwrap();
assert_eq!(s2, Frame::SimpleString(Bytes::from("bar")));
let (term_set, rest_set) = parse_frame(rem2.clone()).unwrap();
assert_eq!(term_set, Frame::Set(vec![]));
assert_eq!(rest_set, Bytes::from("TAIL"));
let input_map = Bytes::from("%?\r\n+key\r\n+val\r\n%0\r\nNEXT");
let (h_map, rem_map) = parse_frame(input_map.clone()).unwrap();
assert_eq!(h_map, Frame::StreamedMapHeader);
let (k, rem_map2) = parse_frame(rem_map.clone()).unwrap();
assert_eq!(k, Frame::SimpleString(Bytes::from("key")));
let (v, rem_map3) = parse_frame(rem_map2.clone()).unwrap();
assert_eq!(v, Frame::SimpleString(Bytes::from("val")));
let (term_map, rest_map4) = parse_frame(rem_map3.clone()).unwrap();
assert_eq!(term_map, Frame::Map(vec![]));
assert_eq!(rest_map4, Bytes::from("NEXT"));
let input_attr = Bytes::from("|?\r\n+meta\r\n+info\r\n|0\r\nMORE");
let (h_attr, rem_attr) = parse_frame(input_attr.clone()).unwrap();
assert_eq!(h_attr, Frame::StreamedAttributeHeader);
let (a1, rem_attr2) = parse_frame(rem_attr.clone()).unwrap();
assert_eq!(a1, Frame::SimpleString(Bytes::from("meta")));
let (a2, rem_attr3) = parse_frame(rem_attr2.clone()).unwrap();
assert_eq!(a2, Frame::SimpleString(Bytes::from("info")));
let (term_attr, rest_attr) = parse_frame(rem_attr3.clone()).unwrap();
assert_eq!(term_attr, Frame::Attribute(vec![]));
assert_eq!(rest_attr, Bytes::from("MORE"));
let input_push = Bytes::from(">?\r\n:1\r\n:2\r\n>0\r\nEND");
let (h_push, rem_push) = parse_frame(input_push.clone()).unwrap();
assert_eq!(h_push, Frame::StreamedPushHeader);
let (p1, rem_push2) = parse_frame(rem_push.clone()).unwrap();
assert_eq!(p1, Frame::Integer(1));
let (p2, rem_push3) = parse_frame(rem_push2.clone()).unwrap();
assert_eq!(p2, Frame::Integer(2));
let (term_push, rest_push) = parse_frame(rem_push3.clone()).unwrap();
assert_eq!(term_push, Frame::Push(vec![]));
assert_eq!(rest_push, Bytes::from("END"));
}
#[test]
fn test_parse_frame_stream_terminator() {
let input = Bytes::from(".\r\nREST");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::StreamTerminator);
assert_eq!(rest, Bytes::from("REST"));
}
#[test]
fn test_parse_frame_null_blob_error_rejected() {
let input = Bytes::from("!-1\r\nTAIL");
assert_eq!(parse_frame(input), Err(ParseError::BadLength));
}
#[test]
fn test_parse_frame_null_verbatim_rejected() {
let input = Bytes::from("=-1\r\nTAIL");
assert_eq!(parse_frame(input), Err(ParseError::BadLength));
}
#[test]
fn test_verbatim_string_format_must_be_3_bytes() {
let input = Bytes::from("=6\r\nx:data\r\n");
assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
let input = Bytes::from("=9\r\ntxtx:data\r\n");
assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
let input = Bytes::from("=5\r\n:data\r\n");
assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
let input = Bytes::from("=8\r\ntxt:data\r\n");
let (frame, _) = parse_frame(input).unwrap();
assert_eq!(
frame,
Frame::VerbatimString(Bytes::from("txt"), Bytes::from("data"))
);
}
#[test]
fn test_parse_frame_special_float_nan() {
let input = Bytes::from(",nan\r\nTAIL");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::SpecialFloat(Bytes::from("nan")));
assert_eq!(rest, Bytes::from("TAIL"));
}
#[test]
fn test_parse_frame_big_number_zero() {
let input = Bytes::from("(0\r\nEND");
let (frame, rest) = parse_frame(input.clone()).unwrap();
assert_eq!(frame, Frame::BigNumber(Bytes::from("0")));
assert_eq!(rest, Bytes::from("END"));
}
#[test]
fn test_parse_frame_collection_empty() {
let input_push = Bytes::from(">0\r\nTAIL");
let (f_push, r_push) = parse_frame(input_push.clone()).unwrap();
assert_eq!(f_push, Frame::Push(vec![]));
assert_eq!(r_push, Bytes::from("TAIL"));
let input_attr = Bytes::from("|0\r\nAFTER");
let (f_attr, r_attr) = parse_frame(input_attr.clone()).unwrap();
assert_eq!(f_attr, Frame::Attribute(vec![]));
assert_eq!(r_attr, Bytes::from("AFTER"));
let input_map = Bytes::from("%0\r\nEND");
let (f_map, r_map) = parse_frame(input_map.clone()).unwrap();
assert_eq!(f_map, Frame::Map(vec![]));
assert_eq!(r_map, Bytes::from("END"));
let input_set = Bytes::from("~0\r\nDONE");
let (f_set, r_set) = parse_frame(input_set.clone()).unwrap();
assert_eq!(f_set, Frame::Set(vec![]));
assert_eq!(r_set, Bytes::from("DONE"));
let input_arr = Bytes::from("*-1\r\nFIN");
let (f_arr, r_arr) = parse_frame(input_arr.clone()).unwrap();
assert_eq!(f_arr, Frame::Array(None));
assert_eq!(r_arr, Bytes::from("FIN"));
}
#[test]
fn test_roundtrip_simple_string() {
let original = Bytes::from("+hello\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_error() {
let original = Bytes::from("-ERR error message\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_integer() {
let original = Bytes::from(":12345\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_bulk_string() {
let original = Bytes::from("$5\r\nhello\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
let original_null = Bytes::from("$-1\r\n");
let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
let serialized_null = frame_to_bytes(&frame_null);
assert_eq!(original_null, serialized_null);
let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
assert_eq!(frame_null, reparsed_null);
}
#[test]
fn test_roundtrip_blob_error() {
let original = Bytes::from("!5\r\nerror\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_null() {
let original = Bytes::from("_\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_double() {
let original = Bytes::from(",3.14159\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_special_float() {
let original = Bytes::from(",inf\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_boolean() {
let original_true = Bytes::from("#t\r\n");
let (frame_true, _) = parse_frame(original_true.clone()).unwrap();
let serialized_true = frame_to_bytes(&frame_true);
assert_eq!(original_true, serialized_true);
let (reparsed_true, _) = parse_frame(serialized_true).unwrap();
assert_eq!(frame_true, reparsed_true);
let original_false = Bytes::from("#f\r\n");
let (frame_false, _) = parse_frame(original_false.clone()).unwrap();
let serialized_false = frame_to_bytes(&frame_false);
assert_eq!(original_false, serialized_false);
let (reparsed_false, _) = parse_frame(serialized_false).unwrap();
assert_eq!(frame_false, reparsed_false);
}
#[test]
fn test_roundtrip_big_number() {
let original = Bytes::from("(12345678901234567890\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_verbatim_string() {
let original = Bytes::from("=10\r\ntxt:hello!\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_array() {
let original = Bytes::from("*2\r\n+hello\r\n:123\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
let original_null = Bytes::from("*-1\r\n");
let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
let serialized_null = frame_to_bytes(&frame_null);
assert_eq!(original_null, serialized_null);
let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
assert_eq!(frame_null, reparsed_null);
}
#[test]
fn test_roundtrip_set() {
let original = Bytes::from("~2\r\n+one\r\n+two\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_map() {
let original = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_attribute() {
let original = Bytes::from("|1\r\n+key\r\n+val\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_push() {
let original = Bytes::from(">2\r\n+msg\r\n+data\r\n");
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_roundtrip_streaming_headers() {
let headers = [
("$?\r\n", Frame::StreamedStringHeader),
("!?\r\n", Frame::StreamedBlobErrorHeader),
("=?\r\n", Frame::StreamedVerbatimStringHeader),
("*?\r\n", Frame::StreamedArrayHeader),
("~?\r\n", Frame::StreamedSetHeader),
("%?\r\n", Frame::StreamedMapHeader),
("|?\r\n", Frame::StreamedAttributeHeader),
(">?\r\n", Frame::StreamedPushHeader),
(".\r\n", Frame::StreamTerminator),
];
for (original_str, expected_frame) in headers {
let original = Bytes::from(original_str);
let (frame, _) = parse_frame(original.clone()).unwrap();
assert_eq!(frame, expected_frame);
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
}
#[test]
fn test_roundtrip_streaming_chunks() {
let chunks = [
(
";4\r\nHell\r\n",
Frame::StreamedStringChunk(Bytes::from("Hell")),
),
(
";5\r\no wor\r\n",
Frame::StreamedStringChunk(Bytes::from("o wor")),
),
(";1\r\nd\r\n", Frame::StreamedStringChunk(Bytes::from("d"))),
(";0\r\n\r\n", Frame::StreamedStringChunk(Bytes::new())),
(
";11\r\nHello World\r\n",
Frame::StreamedStringChunk(Bytes::from("Hello World")),
),
];
for (original_str, expected_frame) in chunks {
let original = Bytes::from(original_str);
let (frame, rest) = parse_frame(original.clone()).unwrap();
assert_eq!(frame, expected_frame);
assert!(rest.is_empty());
let serialized = frame_to_bytes(&frame);
assert_eq!(original, serialized);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
}
#[test]
fn test_streaming_chunks_edge_cases() {
let data = Bytes::from(";4\r\nHel");
let result = parse_frame(data);
assert!(matches!(result, Err(ParseError::Incomplete)));
let data = Bytes::from(";4\r\nHell");
let result = parse_frame(data);
assert!(matches!(result, Err(ParseError::Incomplete)));
let data = Bytes::from(";abc\r\ndata\r\n");
let result = parse_frame(data);
assert!(matches!(result, Err(ParseError::BadLength)));
let data = Bytes::from(";-1\r\ndata\r\n");
let result = parse_frame(data);
assert!(matches!(result, Err(ParseError::BadLength)));
let data = Bytes::from(";5\r\nHell\r\n");
let result = parse_frame(data);
assert!(matches!(result, Err(ParseError::Incomplete)));
let data = Bytes::from(";0\r\n");
let result = parse_frame(data);
assert!(matches!(result, Err(ParseError::Incomplete)));
let binary_data = b"\x00\x01\x02\x03\xFF";
let mut chunk_data = Vec::new();
chunk_data.extend_from_slice(b";5\r\n");
chunk_data.extend_from_slice(binary_data);
chunk_data.extend_from_slice(b"\r\n");
let data = Bytes::from(chunk_data);
let result = parse_frame(data);
assert!(result.is_ok());
let (frame, _) = result.unwrap();
if let Frame::StreamedStringChunk(chunk) = frame {
assert_eq!(chunk.as_ref(), binary_data);
}
}
#[test]
fn test_roundtrip_streaming_sequences() {
let streaming_string = Frame::StreamedString(vec![
Bytes::from("Hell"),
Bytes::from("o wor"),
Bytes::from("ld"),
]);
let serialized = frame_to_bytes(&streaming_string);
let expected = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;2\r\nld\r\n;0\r\n\r\n";
assert_eq!(serialized, Bytes::from(expected));
let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
assert_eq!(parsed, streaming_string);
let streaming_array = Frame::StreamedArray(vec![
Frame::SimpleString(Bytes::from("hello")),
Frame::Integer(42),
Frame::Boolean(true),
]);
let serialized = frame_to_bytes(&streaming_array);
let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
assert_eq!(parsed, streaming_array);
let streaming_map = Frame::StreamedMap(vec![
(
Frame::SimpleString(Bytes::from("key1")),
Frame::SimpleString(Bytes::from("val1")),
),
(
Frame::SimpleString(Bytes::from("key2")),
Frame::Integer(123),
),
]);
let serialized = frame_to_bytes(&streaming_map);
let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
assert_eq!(parsed, streaming_map);
let empty_streaming = Frame::StreamedString(vec![]);
let serialized = frame_to_bytes(&empty_streaming);
let expected = "$?\r\n;0\r\n\r\n";
assert_eq!(serialized, Bytes::from(expected));
let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
assert_eq!(parsed, empty_streaming);
let streaming_set = Frame::StreamedSet(vec![
Frame::SimpleString(Bytes::from("apple")),
Frame::SimpleString(Bytes::from("banana")),
Frame::Integer(42),
]);
let serialized = frame_to_bytes(&streaming_set);
let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
assert_eq!(parsed, streaming_set);
let streaming_attribute = Frame::StreamedAttribute(vec![
(
Frame::SimpleString(Bytes::from("trace-id")),
Frame::SimpleString(Bytes::from("abc123")),
),
(
Frame::SimpleString(Bytes::from("span-id")),
Frame::SimpleString(Bytes::from("def456")),
),
]);
let serialized = frame_to_bytes(&streaming_attribute);
let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
assert_eq!(parsed, streaming_attribute);
let streaming_push = Frame::StreamedPush(vec![
Frame::SimpleString(Bytes::from("pubsub")),
Frame::SimpleString(Bytes::from("channel1")),
Frame::SimpleString(Bytes::from("message data")),
]);
let serialized = frame_to_bytes(&streaming_push);
let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
assert_eq!(parsed, streaming_push);
let empty_array = Frame::StreamedArray(vec![]);
let serialized = frame_to_bytes(&empty_array);
let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
assert_eq!(parsed, empty_array);
let empty_set = Frame::StreamedSet(vec![]);
let serialized = frame_to_bytes(&empty_set);
let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
assert_eq!(parsed, empty_set);
}
#[test]
fn test_streaming_sequences_edge_cases() {
let data = Bytes::from("$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n");
let result = parse_streaming_sequence(data);
assert!(matches!(result, Err(ParseError::Incomplete)));
let data = Bytes::from("$?\r\n;abc\r\nHell\r\n;0\r\n");
let result = parse_streaming_sequence(data);
assert!(matches!(result, Err(ParseError::BadLength)));
let data = Bytes::from("*?\r\n+hello\r\n:42\r\n");
let result = parse_streaming_sequence(data);
assert!(matches!(result, Err(ParseError::Incomplete)));
let data = Bytes::from("*?\r\n+hello\r\n*2\r\n:1\r\n:2\r\n.\r\n");
let result = parse_streaming_sequence(data);
assert!(result.is_ok());
let (frame, _) = result.unwrap();
if let Frame::StreamedArray(items) = frame {
assert_eq!(items.len(), 2);
assert!(matches!(items[0], Frame::SimpleString(_)));
assert!(matches!(items[1], Frame::Array(_)));
}
let data = Bytes::from("*?\r\n.\r\n");
let result = parse_streaming_sequence(data);
assert!(result.is_ok());
let (frame, _) = result.unwrap();
if let Frame::StreamedArray(items) = frame {
assert!(items.is_empty());
}
let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+orphan\r\n.\r\n");
let result = parse_streaming_sequence(data);
assert!(matches!(result, Err(ParseError::InvalidFormat)));
let data = Bytes::from("+simple\r\n");
let result = parse_streaming_sequence(data);
assert!(result.is_ok());
let (frame, _) = result.unwrap();
assert!(matches!(frame, Frame::SimpleString(_)));
let data = Bytes::from(";999999999999999999\r\ndata\r\n");
let result = parse_frame(data);
match &result {
Err(ParseError::BadLength) => {} Err(ParseError::Incomplete) => {} Err(e) => panic!("Got unexpected error type: {e:?}"),
Ok(_) => panic!("Large chunk size should fail"),
}
let data = Bytes::from("$?\r\n+invalid\r\n;0\r\n");
let result = parse_streaming_sequence(data);
assert!(matches!(result, Err(ParseError::InvalidFormat)));
let data = Bytes::from("*?\r\n+hello\r\n.corrupted\r\n");
let result = parse_streaming_sequence(data);
assert!(matches!(result, Err(ParseError::Incomplete)));
let data = Bytes::new();
let result = parse_streaming_sequence(data);
assert!(matches!(result, Err(ParseError::Incomplete)));
let data = Bytes::from("*?\r\n+hello\r\n$5\r\nwo");
let result = parse_streaming_sequence(data);
assert!(matches!(result, Err(ParseError::Incomplete)));
}
#[test]
fn test_roundtrip_nested_structures() {
let original = Bytes::from(
"*3\r\n+hello\r\n%2\r\n+key1\r\n:123\r\n+key2\r\n~1\r\n+item\r\n|1\r\n+meta\r\n+data\r\n",
);
let (frame, _) = parse_frame(original.clone()).unwrap();
let serialized = frame_to_bytes(&frame);
let (reparsed, _) = parse_frame(serialized).unwrap();
assert_eq!(frame, reparsed);
}
#[test]
fn test_zero_length_bulk_string_requires_trailing_crlf() {
let input = Bytes::from("$0\r\n\r\nTAIL");
let (frame, rest) = parse_frame(input).unwrap();
assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
assert_eq!(rest, Bytes::from("TAIL"));
let input = Bytes::from("$0\r\n");
assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
let input = Bytes::from("$0\r\n\r");
assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
let input = Bytes::from("$0\r\nXY");
assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
}
#[test]
fn test_zero_length_streamed_chunk_requires_trailing_crlf() {
let input = Bytes::from(";0\r\n\r\nTAIL");
let (frame, rest) = parse_frame(input).unwrap();
assert_eq!(frame, Frame::StreamedStringChunk(Bytes::new()));
assert_eq!(rest, Bytes::from("TAIL"));
let input = Bytes::from(";0\r\n");
assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
let input = Bytes::from(";0\r\nXY");
assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
}
#[test]
fn test_integer_overflow_returns_overflow_error() {
let input = Bytes::from(":9223372036854775808\r\n");
assert_eq!(parse_frame(input), Err(ParseError::Overflow));
let input = Bytes::from(":9223372036854775807\r\n");
let (frame, _) = parse_frame(input).unwrap();
assert_eq!(frame, Frame::Integer(i64::MAX));
let input = Bytes::from(":-9223372036854775808\r\n");
let (frame, _) = parse_frame(input).unwrap();
assert_eq!(frame, Frame::Integer(i64::MIN));
}
#[test]
fn test_parser_propagates_errors() {
let mut parser = Parser::new();
parser.feed(Bytes::from("XINVALID\r\n"));
let result = parser.next_frame();
assert!(result.is_err());
assert_eq!(result.unwrap_err(), ParseError::InvalidTag(b'X'));
}
#[test]
fn test_parser_returns_ok_none_for_incomplete() {
let mut parser = Parser::new();
parser.feed(Bytes::from("+HELL"));
assert_eq!(parser.next_frame().unwrap(), None);
}
#[test]
fn test_integer_negative_overflow() {
assert!(parse_frame(Bytes::from(":-9223372036854775809\r\n")).is_err());
}
#[test]
fn test_nonempty_bulk_malformed_terminator() {
assert_eq!(
parse_frame(Bytes::from("$3\r\nfoo")),
Err(ParseError::Incomplete)
);
assert_eq!(
parse_frame(Bytes::from("$3\r\nfooX")),
Err(ParseError::Incomplete)
);
assert_eq!(
parse_frame(Bytes::from("$3\r\nfooXY")),
Err(ParseError::InvalidFormat)
);
}
#[test]
fn test_blob_error_malformed_terminator() {
assert_eq!(
parse_frame(Bytes::from("!3\r\nerr")),
Err(ParseError::Incomplete)
);
assert_eq!(
parse_frame(Bytes::from("!3\r\nerrXY")),
Err(ParseError::InvalidFormat)
);
}
#[test]
fn test_verbatim_string_malformed_terminator() {
assert_eq!(
parse_frame(Bytes::from("=8\r\ntxt:data")),
Err(ParseError::Incomplete)
);
assert_eq!(
parse_frame(Bytes::from("=8\r\ntxt:dataXY")),
Err(ParseError::InvalidFormat)
);
}
#[test]
fn test_streamed_chunk_malformed_terminator() {
assert_eq!(
parse_frame(Bytes::from(";3\r\nabc")),
Err(ParseError::Incomplete)
);
assert_eq!(
parse_frame(Bytes::from(";3\r\nabcXY")),
Err(ParseError::InvalidFormat)
);
}
#[test]
fn test_bulk_string_size_limit() {
assert_eq!(
parse_frame(Bytes::from("$536870913\r\n")),
Err(ParseError::BadLength)
);
}
#[test]
fn test_blob_error_size_limit() {
assert_eq!(
parse_frame(Bytes::from("!536870913\r\n")),
Err(ParseError::BadLength)
);
}
#[test]
fn test_verbatim_string_size_limit() {
assert_eq!(
parse_frame(Bytes::from("=536870913\r\n")),
Err(ParseError::BadLength)
);
}
#[test]
fn test_streamed_chunk_size_limit() {
assert_eq!(
parse_frame(Bytes::from(";536870913\r\n")),
Err(ParseError::BadLength)
);
}
#[test]
fn test_invalid_double() {
assert_eq!(
parse_frame(Bytes::from(",foo\r\n")),
Err(ParseError::InvalidFormat)
);
}
#[test]
fn test_invalid_boolean() {
assert_eq!(
parse_frame(Bytes::from("#\r\n")),
Err(ParseError::InvalidBoolean)
);
assert_eq!(
parse_frame(Bytes::from("#true\r\n")),
Err(ParseError::InvalidBoolean)
);
}
#[test]
fn test_parser_clears_buffer_on_error() {
let mut parser = Parser::new();
parser.feed(Bytes::from("X\r\n"));
assert_eq!(parser.next_frame(), Err(ParseError::InvalidTag(b'X')));
assert_eq!(parser.buffered_bytes(), 0);
}
#[test]
fn test_parser_recovers_after_error() {
let mut parser = Parser::new();
parser.feed(Bytes::from("X\r\n"));
assert!(parser.next_frame().is_err());
assert_eq!(parser.buffered_bytes(), 0);
parser.feed(Bytes::from("+OK\r\n"));
let frame = parser.next_frame().unwrap().unwrap();
assert_eq!(frame, Frame::SimpleString(Bytes::from("OK")));
}
#[test]
fn test_streaming_set_roundtrip() {
let data = Bytes::from("~?\r\n+a\r\n+b\r\n+c\r\n.\r\n");
let (frame, rest) = parse_streaming_sequence(data).unwrap();
assert_eq!(
frame,
Frame::StreamedSet(vec![
Frame::SimpleString(Bytes::from("a")),
Frame::SimpleString(Bytes::from("b")),
Frame::SimpleString(Bytes::from("c")),
])
);
assert!(rest.is_empty());
}
#[test]
fn test_streaming_attribute_roundtrip() {
let data = Bytes::from("|?\r\n+key\r\n+val\r\n.\r\n");
let (frame, rest) = parse_streaming_sequence(data).unwrap();
assert_eq!(
frame,
Frame::StreamedAttribute(vec![(
Frame::SimpleString(Bytes::from("key")),
Frame::SimpleString(Bytes::from("val")),
)])
);
assert!(rest.is_empty());
}
#[test]
fn test_streaming_push_roundtrip() {
let data = Bytes::from(">?\r\n+pubsub\r\n+channel\r\n+message\r\n.\r\n");
let (frame, rest) = parse_streaming_sequence(data).unwrap();
assert_eq!(
frame,
Frame::StreamedPush(vec![
Frame::SimpleString(Bytes::from("pubsub")),
Frame::SimpleString(Bytes::from("channel")),
Frame::SimpleString(Bytes::from("message")),
])
);
assert!(rest.is_empty());
}
#[test]
fn test_empty_streaming_containers() {
let data = Bytes::from("$?\r\n;0\r\n\r\n");
let (frame, _) = parse_streaming_sequence(data).unwrap();
assert_eq!(frame, Frame::StreamedString(vec![]));
let data = Bytes::from("*?\r\n.\r\n");
let (frame, _) = parse_streaming_sequence(data).unwrap();
assert_eq!(frame, Frame::StreamedArray(vec![]));
let data = Bytes::from("~?\r\n.\r\n");
let (frame, _) = parse_streaming_sequence(data).unwrap();
assert_eq!(frame, Frame::StreamedSet(vec![]));
let data = Bytes::from("%?\r\n.\r\n");
let (frame, _) = parse_streaming_sequence(data).unwrap();
assert_eq!(frame, Frame::StreamedMap(vec![]));
}
#[test]
fn test_streaming_attribute_odd_elements_errors() {
let data = Bytes::from("|?\r\n+key\r\n+val\r\n+orphan\r\n.\r\n");
let result = parse_streaming_sequence(data);
assert!(matches!(result, Err(ParseError::InvalidFormat)));
}
#[test]
fn test_streaming_blob_error_header_passthrough() {
let data = Bytes::from("!?\r\n!5\r\nERROR\r\n");
let (frame, rest) = parse_streaming_sequence(data).unwrap();
assert_eq!(frame, Frame::StreamedBlobErrorHeader);
assert!(!rest.is_empty());
}
#[test]
fn test_streaming_verbatim_header_passthrough() {
let data = Bytes::from("=?\r\n=9\r\ntxt:hello\r\n");
let (frame, rest) = parse_streaming_sequence(data).unwrap();
assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
assert!(!rest.is_empty());
}
#[test]
fn frame_as_bytes() {
assert_eq!(
Frame::SimpleString(Bytes::from("OK")).as_bytes(),
Some(&Bytes::from("OK"))
);
assert_eq!(
Frame::BlobError(Bytes::from("ERR")).as_bytes(),
Some(&Bytes::from("ERR"))
);
assert_eq!(
Frame::BigNumber(Bytes::from("123")).as_bytes(),
Some(&Bytes::from("123"))
);
assert_eq!(Frame::BulkString(None).as_bytes(), None);
assert_eq!(Frame::Null.as_bytes(), None);
assert_eq!(Frame::Boolean(true).as_bytes(), None);
}
#[test]
fn frame_as_str() {
assert_eq!(Frame::SimpleString(Bytes::from("OK")).as_str(), Some("OK"));
assert_eq!(
Frame::BulkString(Some(Bytes::from_static(&[0xFF]))).as_str(),
None
);
}
#[test]
fn frame_scalar_accessors() {
assert_eq!(Frame::Integer(42).as_integer(), Some(42));
assert_eq!(Frame::Double(3.5).as_double(), Some(3.5));
assert_eq!(Frame::Boolean(true).as_boolean(), Some(true));
assert_eq!(Frame::Integer(42).as_double(), None);
assert_eq!(Frame::Double(3.5).as_integer(), None);
}
#[test]
fn frame_collection_accessors() {
let arr = Frame::Array(Some(vec![Frame::Integer(1)]));
assert_eq!(arr.as_array(), Some([Frame::Integer(1)].as_slice()));
assert_eq!(Frame::Array(None).as_array(), None);
let set = Frame::Set(vec![Frame::Integer(1)]);
assert_eq!(set.as_set(), Some([Frame::Integer(1)].as_slice()));
let map = Frame::Map(vec![(
Frame::SimpleString(Bytes::from("k")),
Frame::Integer(1),
)]);
assert!(map.as_map().is_some());
let push = Frame::Push(vec![Frame::Integer(1)]);
assert_eq!(push.as_push(), Some([Frame::Integer(1)].as_slice()));
}
#[test]
fn frame_verbatim_string_accessor() {
let v = Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello"));
let (fmt, content) = v.as_verbatim_string().unwrap();
assert_eq!(fmt, &Bytes::from("txt"));
assert_eq!(content, &Bytes::from("hello"));
assert_eq!(Frame::Null.as_verbatim_string(), None);
}
#[test]
fn frame_into_conversions() {
assert_eq!(
Frame::Array(Some(vec![Frame::Integer(1)])).into_array(),
Ok(vec![Frame::Integer(1)])
);
assert!(Frame::Array(None).into_array().is_err());
assert_eq!(
Frame::BulkString(Some(Bytes::from("x"))).into_bulk_string(),
Ok(Bytes::from("x"))
);
assert!(Frame::BulkString(None).into_bulk_string().is_err());
let map = Frame::Map(vec![(Frame::Integer(1), Frame::Integer(2))]);
assert!(map.into_map().is_ok());
let set = Frame::Set(vec![Frame::Integer(1)]);
assert!(set.into_set().is_ok());
}
#[test]
fn frame_is_null() {
assert!(Frame::Null.is_null());
assert!(Frame::BulkString(None).is_null());
assert!(Frame::Array(None).is_null());
assert!(!Frame::BulkString(Some(Bytes::new())).is_null());
assert!(!Frame::Integer(0).is_null());
}
#[test]
fn frame_is_error() {
assert!(Frame::Error(Bytes::from("ERR")).is_error());
assert!(Frame::BlobError(Bytes::from("ERR")).is_error());
assert!(!Frame::SimpleString(Bytes::from("OK")).is_error());
}
}