Skip to main content

resp_rs/
resp3.rs

1//! Zero-copy RESP3 parser.
2//!
3//! Parses RESP3 frames using `bytes::Bytes` for efficient, zero-copy operation.
4//! Supports all RESP3 data types, including fixed-length and streaming variants.
5//!
6//! # Performance
7//!
8//! RESP3 parsing is roughly 3x slower than RESP2 for simple types due to the
9//! larger type tag match. The gap narrows for collection-heavy workloads. For
10//! complete buffers, call [`parse_frame`] directly rather than using [`Parser`]
11//! (see [crate-level performance docs](crate#performance)).
12//!
13//! # Protocol permissiveness
14//!
15//! - **Simple strings and errors** are treated as raw bytes, not validated UTF-8.
16//!   The parser accepts any byte sequence that does not contain `\r` or `\n`.
17//! - **Double parsing** accepts case-insensitive and non-canonical float spellings
18//!   (e.g., `INF`, `Infinity`, `NAN`) via Rust's `f64::parse`, then normalizes
19//!   them to canonical [`Frame::SpecialFloat`] values (`inf`, `-inf`, `nan`).
20//!   This means roundtrip is semantic (value-preserving) but not lexical
21//!   (byte-preserving) for non-canonical inputs.
22//! - **Streaming support** for blob errors and verbatim strings is limited:
23//!   `parse_streaming_sequence` passes through their streaming headers
24//!   (`!?\r\n`, `=?\r\n`) without accumulation, since RESP3 does not define
25//!   a chunk format for these types. Use low-level `parse_frame` to handle
26//!   these headers manually if needed.
27
28use alloc::string::ToString;
29use alloc::vec::Vec;
30
31use bytes::{BufMut, Bytes, BytesMut};
32
33/// Maximum reasonable size for collections to prevent DoS attacks.
34const MAX_COLLECTION_SIZE: usize = 10_000_000;
35
36/// Maximum reasonable size for bulk string/blob/chunk payloads (512 MB).
37const MAX_BULK_STRING_SIZE: usize = 512 * 1024 * 1024;
38
39/// A streaming parser for RESP3 frames.
40///
41/// This parser allows feeding data in chunks and extracting frames as they become available.
42/// It maintains an internal buffer of accumulated data and attempts to parse frames from it.
43#[derive(Default, Debug)]
44pub struct Parser {
45    buffer: BytesMut,
46}
47
48impl Parser {
49    /// Creates a new parser with an empty buffer.
50    pub fn new() -> Self {
51        Self {
52            buffer: BytesMut::new(),
53        }
54    }
55
56    /// Feeds a chunk of data into the parser.
57    ///
58    /// The data is appended to the internal buffer.
59    pub fn feed(&mut self, data: Bytes) {
60        self.buffer.extend_from_slice(&data);
61    }
62
63    /// Attempts to extract the next complete frame from the buffer.
64    ///
65    /// Returns `Ok(None)` if there is not enough data to parse a complete frame.
66    /// Returns `Ok(Some(frame))` on success, consuming the parsed bytes.
67    /// Returns `Err` on protocol errors, clearing the buffer.
68    pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
69        if self.buffer.is_empty() {
70            return Ok(None);
71        }
72
73        let bytes = self.buffer.split().freeze();
74
75        match parse_frame_inner(&bytes, 0) {
76            Ok((frame, consumed)) => {
77                if consumed < bytes.len() {
78                    self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
79                }
80                Ok(Some(frame))
81            }
82            Err(ParseError::Incomplete) => {
83                self.buffer.unsplit(bytes.into());
84                Ok(None)
85            }
86            Err(e) => {
87                // Buffer was emptied by split() above; intentionally not restored
88                // on hard errors so the parser doesn't re-parse corrupt data.
89                Err(e)
90            }
91        }
92    }
93
94    /// Returns the number of bytes currently in the buffer.
95    pub fn buffered_bytes(&self) -> usize {
96        self.buffer.len()
97    }
98
99    /// Clears the internal buffer.
100    pub fn clear(&mut self) {
101        self.buffer.clear();
102    }
103}
104
105// --- Zero-copy Frame enum and parser using bytes::Bytes ---
106/// A parsed RESP3 frame.
107///
108/// Each variant corresponds to one of the RESP3 types, including both fixed-length
109/// and streaming headers for bulk strings, arrays, sets, maps, attributes, and pushes.
110#[derive(Debug, Clone, PartialEq)]
111pub enum Frame {
112    /// Simple string: +&lt;string&gt;\r\n
113    SimpleString(Bytes),
114    /// Simple error: -&lt;error&gt;\r\n
115    Error(Bytes),
116    /// Integer: :&lt;number&gt;\r\n
117    Integer(i64),
118    /// Blob string: $&lt;length&gt;\r\n&lt;bytes&gt;\r\n
119    // BulkString(Option<Vec<u8>>),
120    BulkString(Option<Bytes>),
121    /// Blob error: !&lt;length&gt;\r\n&lt;bytes&gt;\r\n
122    BlobError(Bytes),
123    /// Streaming blob string header: $?\r\n
124    StreamedStringHeader,
125    /// Streaming blob error header: !?\r\n
126    StreamedBlobErrorHeader,
127    /// Streaming verbatim string header: =?\r\n
128    StreamedVerbatimStringHeader,
129    /// Streaming array header: *?\r\n
130    StreamedArrayHeader,
131    /// Streaming set header: ~?\r\n
132    StreamedSetHeader,
133    /// Streaming map header: %?\r\n
134    StreamedMapHeader,
135    /// Streaming attribute header: |?\r\n
136    StreamedAttributeHeader,
137    /// Streaming push header: >?\r\n
138    StreamedPushHeader,
139    /// Streaming string chunk: ;length\r\ndata\r\n
140    ///
141    /// Represents an individual chunk in a streaming string sequence.
142    /// These chunks are parsed from `;{length}\r\n{data}\r\n` format.
143    /// A zero-length chunk (`;0\r\n`) indicates the end of the stream.
144    ///
145    /// # Example
146    /// ```
147    /// use resp_rs::resp3::Frame;
148    /// use bytes::Bytes;
149    ///
150    /// // Chunk containing "Hello"
151    /// // Wire format: ;5\r\nHello\r\n
152    /// Frame::StreamedStringChunk(Bytes::from("Hello"));
153    /// ```
154    StreamedStringChunk(Bytes),
155
156    /// Accumulated streaming string data from multiple chunks
157    ///
158    /// Created by `parse_streaming_sequence()` when parsing a complete
159    /// streaming string sequence (`$?\r\n` + chunks + `;0\r\n`).
160    /// Contains all chunks in order, allowing reconstruction of the full string.
161    ///
162    /// # Example
163    /// ```
164    /// use resp_rs::resp3::Frame;
165    /// use bytes::Bytes;
166    ///
167    /// // Represents "Hello world" from chunks ["Hello ", "world"]
168    /// Frame::StreamedString(vec![
169    ///     Bytes::from("Hello "),
170    ///     Bytes::from("world")
171    /// ]);
172    /// ```
173    StreamedString(Vec<Bytes>),
174
175    /// Accumulated streaming array data from multiple chunks
176    ///
177    /// Created when parsing a streaming array sequence (`*?\r\n` + frames + `.\r\n`).
178    /// Contains all frames that were streamed as part of the array.
179    ///
180    /// # Example
181    /// ```
182    /// use resp_rs::resp3::Frame;
183    /// use bytes::Bytes;
184    ///
185    /// // Array with mixed types
186    /// Frame::StreamedArray(vec![
187    ///     Frame::SimpleString(Bytes::from("hello")),
188    ///     Frame::Integer(42),
189    ///     Frame::Boolean(true)
190    /// ]);
191    /// ```
192    StreamedArray(Vec<Frame>),
193
194    /// Accumulated streaming set data from multiple chunks
195    ///
196    /// Created when parsing a streaming set sequence (`~?\r\n` + frames + `.\r\n`).
197    /// Contains all unique elements that were streamed as part of the set.
198    StreamedSet(Vec<Frame>),
199
200    /// Accumulated streaming map data from multiple chunks
201    ///
202    /// Created when parsing a streaming map sequence (`%?\r\n` + key-value pairs + `.\r\n`).
203    /// Contains all key-value pairs that were streamed as part of the map.
204    ///
205    /// # Example
206    /// ```
207    /// use resp_rs::resp3::Frame;
208    /// use bytes::Bytes;
209    ///
210    /// Frame::StreamedMap(vec![
211    ///     (Frame::SimpleString(Bytes::from("name")), Frame::SimpleString(Bytes::from("Alice"))),
212    ///     (Frame::SimpleString(Bytes::from("age")), Frame::Integer(25))
213    /// ]);
214    /// ```
215    StreamedMap(Vec<(Frame, Frame)>),
216
217    /// Accumulated streaming attribute data from multiple chunks
218    ///
219    /// Created when parsing a streaming attribute sequence (`|?\r\n` + key-value pairs + `.\r\n`).
220    /// Attributes provide out-of-band metadata that doesn't affect the main data structure.
221    StreamedAttribute(Vec<(Frame, Frame)>),
222
223    /// Accumulated streaming push data from multiple chunks
224    ///
225    /// Created when parsing a streaming push sequence (`>?\r\n` + frames + `.\r\n`).
226    /// Push messages are server-initiated communications (e.g., pub/sub messages).
227    ///
228    /// # Example
229    /// ```
230    /// use resp_rs::resp3::Frame;
231    /// use bytes::Bytes;
232    ///
233    /// // Pub/sub message
234    /// Frame::StreamedPush(vec![
235    ///     Frame::SimpleString(Bytes::from("pubsub")),
236    ///     Frame::SimpleString(Bytes::from("channel1")),
237    ///     Frame::SimpleString(Bytes::from("message content"))
238    /// ]);
239    /// ```
240    StreamedPush(Vec<Frame>),
241    /// End-of-stream terminator for all chunked sequences: .\r\n
242    StreamTerminator,
243    /// Null: _\r\n
244    Null,
245    /// Double: ,&lt;float&gt;\r\n
246    Double(f64),
247    /// Special Float: ,inf\r\n, -inf\r\n, nan\r\n
248    SpecialFloat(Bytes),
249    /// Boolean: #t\r\n or #f\r\n
250    Boolean(bool),
251    /// Big number: (&lt;number&gt;\r\n
252    BigNumber(Bytes),
253    /// Verbatim string: =format:content\r\n
254    // VerbatimString { format: String, content: String },
255    VerbatimString(Bytes, Bytes),
256    /// Array: *&lt;count&gt;\r\n... (or streaming header *?\r\n)
257    Array(Option<Vec<Frame>>),
258    /// Set: ~&lt;count&gt;\r\n... (or streaming header ~?\r\n)
259    Set(Vec<Frame>),
260    /// Map: %&lt;count&gt;\r\n... (or streaming header %?\r\n)
261    Map(Vec<(Frame, Frame)>),
262    /// Attribute: |&lt;count&gt;\r\n... (or streaming header |?\r\n)
263    Attribute(Vec<(Frame, Frame)>),
264    /// Push: > &lt;count&gt;\r\n... (or streaming header >?\r\n)
265    Push(Vec<Frame>),
266}
267
268impl Frame {
269    /// Returns the bytes if this is a string-like frame (`SimpleString`, `Error`,
270    /// `BulkString`, `BlobError`, or `BigNumber`).
271    ///
272    /// For `BulkString(None)` (null), returns `None`.
273    pub fn as_bytes(&self) -> Option<&Bytes> {
274        match self {
275            Frame::SimpleString(b)
276            | Frame::Error(b)
277            | Frame::BlobError(b)
278            | Frame::BigNumber(b) => Some(b),
279            Frame::BulkString(opt) => opt.as_ref(),
280            _ => None,
281        }
282    }
283
284    /// Returns the string data as a UTF-8 `&str`, if this is a string-like frame
285    /// and contains valid UTF-8.
286    pub fn as_str(&self) -> Option<&str> {
287        self.as_bytes().and_then(|b| core::str::from_utf8(b).ok())
288    }
289
290    /// Returns the integer value if this is an `Integer` frame.
291    pub fn as_integer(&self) -> Option<i64> {
292        match self {
293            Frame::Integer(v) => Some(*v),
294            _ => None,
295        }
296    }
297
298    /// Returns the double value if this is a `Double` frame.
299    pub fn as_double(&self) -> Option<f64> {
300        match self {
301            Frame::Double(v) => Some(*v),
302            _ => None,
303        }
304    }
305
306    /// Returns the boolean value if this is a `Boolean` frame.
307    pub fn as_boolean(&self) -> Option<bool> {
308        match self {
309            Frame::Boolean(v) => Some(*v),
310            _ => None,
311        }
312    }
313
314    /// Returns a reference to the array items if this is an `Array`.
315    ///
316    /// For `Array(None)` (null), returns `None`.
317    pub fn as_array(&self) -> Option<&[Frame]> {
318        match self {
319            Frame::Array(Some(items)) => Some(items),
320            _ => None,
321        }
322    }
323
324    /// Returns a reference to the set items if this is a `Set`.
325    pub fn as_set(&self) -> Option<&[Frame]> {
326        match self {
327            Frame::Set(items) => Some(items),
328            _ => None,
329        }
330    }
331
332    /// Returns a reference to the map pairs if this is a `Map`.
333    pub fn as_map(&self) -> Option<&[(Frame, Frame)]> {
334        match self {
335            Frame::Map(pairs) => Some(pairs),
336            _ => None,
337        }
338    }
339
340    /// Returns a reference to the push items if this is a `Push`.
341    pub fn as_push(&self) -> Option<&[Frame]> {
342        match self {
343            Frame::Push(items) => Some(items),
344            _ => None,
345        }
346    }
347
348    /// Returns the verbatim string format and content if this is a `VerbatimString`.
349    pub fn as_verbatim_string(&self) -> Option<(&Bytes, &Bytes)> {
350        match self {
351            Frame::VerbatimString(format, content) => Some((format, content)),
352            _ => None,
353        }
354    }
355
356    /// Consumes the frame and returns the array items.
357    ///
358    /// Returns `Err(self)` if this is not a non-null `Array`.
359    pub fn into_array(self) -> Result<Vec<Frame>, Frame> {
360        match self {
361            Frame::Array(Some(items)) => Ok(items),
362            other => Err(other),
363        }
364    }
365
366    /// Consumes the frame and returns the bulk string bytes.
367    ///
368    /// Returns `Err(self)` if this is not a non-null `BulkString`.
369    pub fn into_bulk_string(self) -> Result<Bytes, Frame> {
370        match self {
371            Frame::BulkString(Some(b)) => Ok(b),
372            other => Err(other),
373        }
374    }
375
376    /// Consumes the frame and returns the map pairs.
377    ///
378    /// Returns `Err(self)` if this is not a `Map`.
379    pub fn into_map(self) -> Result<Vec<(Frame, Frame)>, Frame> {
380        match self {
381            Frame::Map(pairs) => Ok(pairs),
382            other => Err(other),
383        }
384    }
385
386    /// Consumes the frame and returns the set items.
387    ///
388    /// Returns `Err(self)` if this is not a `Set`.
389    pub fn into_set(self) -> Result<Vec<Frame>, Frame> {
390        match self {
391            Frame::Set(items) => Ok(items),
392            other => Err(other),
393        }
394    }
395
396    /// Returns `true` if this is `Null` or a null bulk string/array.
397    pub fn is_null(&self) -> bool {
398        matches!(
399            self,
400            Frame::Null | Frame::BulkString(None) | Frame::Array(None)
401        )
402    }
403
404    /// Returns `true` if this is an `Error` or `BlobError` frame.
405    pub fn is_error(&self) -> bool {
406        matches!(self, Frame::Error(_) | Frame::BlobError(_))
407    }
408}
409
410pub use crate::ParseError;
411
412/// Parse a single RESP3 frame from the provided `Bytes`.
413///
414/// Returns the parsed `Frame` and the remaining unconsumed `Bytes`, or a `ParseError` on failure.
415pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
416    let (frame, consumed) = parse_frame_inner(&input, 0)?;
417    Ok((frame, input.slice(consumed..)))
418}
419
420/// Parse a length-prefixed blob: shared logic for bulk string, blob error, and
421/// streamed string chunks. Returns `(data_start, data_end, after_crlf)` on success.
422#[inline(never)]
423fn parse_blob_bounds(
424    buf: &[u8],
425    after_crlf: usize,
426    len: usize,
427) -> Result<(usize, usize), ParseError> {
428    if len == 0 {
429        if after_crlf + 1 >= buf.len() {
430            return Err(ParseError::Incomplete);
431        }
432        if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
433            return Ok((after_crlf, after_crlf));
434        } else {
435            return Err(ParseError::InvalidFormat);
436        }
437    }
438    let data_start = after_crlf;
439    let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
440    if data_end + 1 >= buf.len() {
441        return Err(ParseError::Incomplete);
442    }
443    if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
444        return Err(ParseError::InvalidFormat);
445    }
446    Ok((data_start, data_end))
447}
448
449/// Parse a bulk string frame (`$`).
450fn parse_bulk_string(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
451    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
452    let len_bytes = &buf[pos + 1..line_end];
453    if len_bytes == b"?" {
454        return Ok((Frame::StreamedStringHeader, after_crlf));
455    }
456    if len_bytes == b"-1" {
457        return Ok((Frame::BulkString(None), after_crlf));
458    }
459    let len = parse_usize(len_bytes)?;
460    if len > MAX_BULK_STRING_SIZE {
461        return Err(ParseError::BadLength);
462    }
463    let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
464    if data_start == data_end {
465        Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2))
466    } else {
467        Ok((
468            Frame::BulkString(Some(input.slice(data_start..data_end))),
469            data_end + 2,
470        ))
471    }
472}
473
474/// Parse a double frame (`,`).
475#[inline(never)]
476fn parse_double_frame(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
477    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
478    let line_bytes = &buf[pos + 1..line_end];
479    if line_bytes == b"inf" || line_bytes == b"-inf" || line_bytes == b"nan" {
480        return Ok((
481            Frame::SpecialFloat(input.slice(pos + 1..line_end)),
482            after_crlf,
483        ));
484    }
485    let s = core::str::from_utf8(line_bytes).map_err(|_| ParseError::Utf8Error)?;
486    let v = s.parse::<f64>().map_err(|_| ParseError::InvalidFormat)?;
487    if v.is_infinite() || v.is_nan() {
488        let canonical = if v.is_nan() {
489            "nan"
490        } else if v.is_sign_negative() {
491            "-inf"
492        } else {
493            "inf"
494        };
495        return Ok((Frame::SpecialFloat(Bytes::from(canonical)), after_crlf));
496    }
497    Ok((Frame::Double(v), after_crlf))
498}
499
500/// Parse a verbatim string frame (`=`).
501#[inline(never)]
502fn parse_verbatim(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
503    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
504    let len_bytes = &buf[pos + 1..line_end];
505    if len_bytes == b"?" {
506        return Ok((Frame::StreamedVerbatimStringHeader, after_crlf));
507    }
508    if len_bytes == b"-1" {
509        return Err(ParseError::BadLength);
510    }
511    let len = parse_usize(len_bytes)?;
512    if len > MAX_BULK_STRING_SIZE {
513        return Err(ParseError::BadLength);
514    }
515    let data_start = after_crlf;
516    let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
517    if data_end + 1 >= buf.len() {
518        return Err(ParseError::Incomplete);
519    }
520    if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
521        return Err(ParseError::InvalidFormat);
522    }
523    let sep = buf[data_start..data_end]
524        .iter()
525        .position(|&b| b == b':')
526        .ok_or(ParseError::InvalidFormat)?;
527    if sep != 3 {
528        return Err(ParseError::InvalidFormat);
529    }
530    let format = input.slice(data_start..data_start + sep);
531    let content = input.slice(data_start + sep + 1..data_end);
532    Ok((Frame::VerbatimString(format, content), data_end + 2))
533}
534
535/// Parse a blob error frame (`!`).
536#[inline(never)]
537fn parse_blob_error(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
538    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
539    let len_bytes = &buf[pos + 1..line_end];
540    if len_bytes == b"?" {
541        return Ok((Frame::StreamedBlobErrorHeader, after_crlf));
542    }
543    if len_bytes == b"-1" {
544        return Err(ParseError::BadLength);
545    }
546    let len = parse_usize(len_bytes)?;
547    if len > MAX_BULK_STRING_SIZE {
548        return Err(ParseError::BadLength);
549    }
550    let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
551    if data_start == data_end {
552        Ok((Frame::BlobError(Bytes::new()), after_crlf + 2))
553    } else {
554        Ok((
555            Frame::BlobError(input.slice(data_start..data_end)),
556            data_end + 2,
557        ))
558    }
559}
560
561/// Parse a collection (array, set, push) with element count.
562#[inline(never)]
563fn parse_collection(
564    input: &Bytes,
565    buf: &[u8],
566    pos: usize,
567    tag: u8,
568) -> Result<(Frame, usize), ParseError> {
569    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
570    let len_bytes = &buf[pos + 1..line_end];
571
572    // Streaming headers
573    if len_bytes == b"?" {
574        return match tag {
575            b'*' => Ok((Frame::StreamedArrayHeader, after_crlf)),
576            b'~' => Ok((Frame::StreamedSetHeader, after_crlf)),
577            b'>' => Ok((Frame::StreamedPushHeader, after_crlf)),
578            _ => unreachable!(),
579        };
580    }
581    // Null array
582    if tag == b'*' && len_bytes == b"-1" {
583        return Ok((Frame::Array(None), after_crlf));
584    }
585    let count = parse_count(len_bytes)?;
586    if count == 0 {
587        return match tag {
588            b'*' => Ok((Frame::Array(Some(Vec::new())), after_crlf)),
589            b'~' => Ok((Frame::Set(Vec::new()), after_crlf)),
590            b'>' => Ok((Frame::Push(Vec::new()), after_crlf)),
591            _ => unreachable!(),
592        };
593    }
594    let mut cursor = after_crlf;
595    let mut items = Vec::with_capacity(count);
596    for _ in 0..count {
597        let (item, next) = parse_frame_inner(input, cursor)?;
598        items.push(item);
599        cursor = next;
600    }
601    match tag {
602        b'*' => Ok((Frame::Array(Some(items)), cursor)),
603        b'~' => Ok((Frame::Set(items), cursor)),
604        b'>' => Ok((Frame::Push(items), cursor)),
605        _ => unreachable!(),
606    }
607}
608
609/// Parse a map or attribute (`%` / `|`).
610#[inline(never)]
611fn parse_pairs(
612    input: &Bytes,
613    buf: &[u8],
614    pos: usize,
615    tag: u8,
616) -> Result<(Frame, usize), ParseError> {
617    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
618    let len_bytes = &buf[pos + 1..line_end];
619    if len_bytes == b"?" {
620        return if tag == b'%' {
621            Ok((Frame::StreamedMapHeader, after_crlf))
622        } else {
623            Ok((Frame::StreamedAttributeHeader, after_crlf))
624        };
625    }
626    let count = parse_count(len_bytes)?;
627    let mut cursor = after_crlf;
628    let mut pairs = Vec::with_capacity(count);
629    for _ in 0..count {
630        let (key, next1) = parse_frame_inner(input, cursor)?;
631        let (val, next2) = parse_frame_inner(input, next1)?;
632        pairs.push((key, val));
633        cursor = next2;
634    }
635    if tag == b'%' {
636        Ok((Frame::Map(pairs), cursor))
637    } else {
638        Ok((Frame::Attribute(pairs), cursor))
639    }
640}
641
642/// Parse a streamed string chunk (`;`).
643#[inline(never)]
644fn parse_streamed_chunk(
645    input: &Bytes,
646    buf: &[u8],
647    pos: usize,
648) -> Result<(Frame, usize), ParseError> {
649    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
650    let len = parse_usize(&buf[pos + 1..line_end])?;
651    if len > MAX_BULK_STRING_SIZE {
652        return Err(ParseError::BadLength);
653    }
654    let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
655    if data_start == data_end {
656        Ok((Frame::StreamedStringChunk(Bytes::new()), after_crlf + 2))
657    } else {
658        Ok((
659            Frame::StreamedStringChunk(input.slice(data_start..data_end)),
660            data_end + 2,
661        ))
662    }
663}
664
665/// Offset-based internal parser. The match body is kept minimal to reduce
666/// instruction-cache pressure; heavy arms are extracted into `#[inline(never)]`
667/// helpers so the hot dispatch stays small.
668pub(crate) fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
669    let buf = input.as_ref();
670    if pos >= buf.len() {
671        return Err(ParseError::Incomplete);
672    }
673
674    let tag = buf[pos];
675
676    match tag {
677        // Line-based types (small, stay inline)
678        b'+' => {
679            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
680            Ok((
681                Frame::SimpleString(input.slice(pos + 1..line_end)),
682                after_crlf,
683            ))
684        }
685        b'-' => {
686            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
687            Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
688        }
689        b':' => {
690            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
691            let v = parse_i64(&buf[pos + 1..line_end])?;
692            Ok((Frame::Integer(v), after_crlf))
693        }
694        b'#' => {
695            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
696            match &buf[pos + 1..line_end] {
697                b"t" => Ok((Frame::Boolean(true), after_crlf)),
698                b"f" => Ok((Frame::Boolean(false), after_crlf)),
699                _ => Err(ParseError::InvalidBoolean),
700            }
701        }
702        b'(' => {
703            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
704            Ok((Frame::BigNumber(input.slice(pos + 1..line_end)), after_crlf))
705        }
706        b'_' => {
707            if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
708                Ok((Frame::Null, pos + 3))
709            } else {
710                Err(ParseError::Incomplete)
711            }
712        }
713        b'.' => {
714            if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
715                Ok((Frame::StreamTerminator, pos + 3))
716            } else {
717                Err(ParseError::Incomplete)
718            }
719        }
720
721        // Length-prefixed types (extracted to reduce icache pressure)
722        b'$' => parse_bulk_string(input, buf, pos),
723        b',' => parse_double_frame(input, buf, pos),
724        b'=' => parse_verbatim(input, buf, pos),
725        b'!' => parse_blob_error(input, buf, pos),
726        b';' => parse_streamed_chunk(input, buf, pos),
727
728        // Collections (extracted)
729        b'*' | b'~' | b'>' => parse_collection(input, buf, pos, tag),
730        b'%' | b'|' => parse_pairs(input, buf, pos, tag),
731
732        _ => Err(ParseError::InvalidTag(tag)),
733    }
734}
735
736#[cfg(feature = "unsafe-internals")]
737#[path = "resp3_unchecked.rs"]
738mod unchecked;
739#[cfg(feature = "unsafe-internals")]
740pub use unchecked::parse_frame_unchecked;
741
742#[cfg(feature = "codec")]
743#[path = "resp3_codec.rs"]
744mod codec_impl;
745#[cfg(feature = "codec")]
746pub use codec_impl::Codec;
747
748/// Parse a complete RESP3 streaming sequence, accumulating chunks until termination.
749///
750/// This function handles RESP3 streaming sequences that begin with streaming headers
751/// (`$?`, `*?`, `~?`, `%?`, `|?`, `>?`) and accumulates the following data until
752/// the appropriate terminator is encountered.
753///
754/// # Streaming Types Supported
755///
756/// - **Streaming Strings**: `$?\r\n` followed by chunks terminated with `;0\r\n`
757/// - **Streaming Arrays**: `*?\r\n` followed by frames terminated with `.\r\n`
758/// - **Streaming Sets**: `~?\r\n` followed by frames terminated with `.\r\n`
759/// - **Streaming Maps**: `%?\r\n` followed by key-value pairs terminated with `.\r\n`
760/// - **Streaming Attributes**: `|?\r\n` followed by key-value pairs terminated with `.\r\n`
761/// - **Streaming Push**: `>?\r\n` followed by frames terminated with `.\r\n`
762///
763/// # Examples
764///
765/// ## Streaming String
766/// ```rust
767/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
768/// use bytes::Bytes;
769///
770/// let data = Bytes::from("$?\r\n;4\r\nHell\r\n;6\r\no worl\r\n;1\r\nd\r\n;0\r\n\r\n");
771/// let (frame, rest) = parse_streaming_sequence(data).unwrap();
772///
773/// if let Frame::StreamedString(chunks) = frame {
774///     assert_eq!(chunks.len(), 3);
775///     let full_string: String = chunks.iter()
776///         .map(|chunk| core::str::from_utf8(chunk).unwrap())
777///         .collect::<Vec<_>>()
778///         .join("");
779///     assert_eq!(full_string, "Hello world");
780/// }
781/// assert!(rest.is_empty());
782/// ```
783///
784/// ## Streaming Array
785/// ```rust
786/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
787/// use bytes::Bytes;
788///
789/// let data = Bytes::from("*?\r\n+hello\r\n:42\r\n#t\r\n.\r\n");
790/// let (frame, _) = parse_streaming_sequence(data).unwrap();
791///
792/// if let Frame::StreamedArray(items) = frame {
793///     assert_eq!(items.len(), 3);
794///     // items[0] = SimpleString("hello")
795///     // items[1] = Integer(42)
796///     // items[2] = Boolean(true)
797/// }
798/// ```
799///
800/// ## Streaming Map
801/// ```rust
802/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
803/// use bytes::Bytes;
804///
805/// let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+key2\r\n:123\r\n.\r\n");
806/// let (frame, _) = parse_streaming_sequence(data).unwrap();
807///
808/// if let Frame::StreamedMap(pairs) = frame {
809///     assert_eq!(pairs.len(), 2);
810///     // pairs[0] = (SimpleString("key1"), SimpleString("val1"))
811///     // pairs[1] = (SimpleString("key2"), Integer(123))
812/// }
813/// ```
814///
815/// # Errors
816///
817/// Returns `ParseError::Incomplete` if the stream is not complete or if required
818/// terminators are missing. Returns `ParseError::InvalidFormat` for malformed
819/// chunk data or unexpected frame types within streaming sequences.
820///
821/// # Notes
822///
823/// - For non-streaming frames, this function simply returns the parsed frame
824/// - Streaming string chunks are accumulated in order
825/// - All other streaming types accumulate complete frames until termination
826/// - Zero-copy parsing is used where possible to minimize allocations
827pub fn parse_streaming_sequence(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
828    if input.is_empty() {
829        return Err(ParseError::Incomplete);
830    }
831
832    let (header, mut rest) = parse_frame(input)?;
833
834    match header {
835        Frame::StreamedStringHeader => {
836            // Parse streaming string chunks until zero-length chunk
837            let mut chunks = Vec::new();
838
839            loop {
840                let (frame, new_rest) = parse_frame(rest)?;
841                rest = new_rest;
842
843                match frame {
844                    Frame::StreamedStringChunk(chunk) => {
845                        if chunk.is_empty() {
846                            // Zero-length chunk indicates end of stream
847                            break;
848                        }
849                        chunks.push(chunk);
850                    }
851                    _ => {
852                        return Err(ParseError::InvalidFormat);
853                    }
854                }
855            }
856
857            Ok((Frame::StreamedString(chunks), rest))
858        }
859        Frame::StreamedBlobErrorHeader | Frame::StreamedVerbatimStringHeader => {
860            // RESP3 does not define a streaming chunk format for blob errors
861            // or verbatim strings. Return the header as-is for low-level consumers.
862            Ok((header, rest))
863        }
864        Frame::StreamedArrayHeader => {
865            // Parse streaming array items until terminator
866            let mut items = Vec::new();
867
868            loop {
869                let (frame, new_rest) = parse_frame(rest)?;
870                rest = new_rest;
871
872                match frame {
873                    Frame::StreamTerminator => {
874                        break;
875                    }
876                    item => {
877                        items.push(item);
878                    }
879                }
880            }
881
882            Ok((Frame::StreamedArray(items), rest))
883        }
884        Frame::StreamedSetHeader => {
885            // Parse streaming set items until terminator
886            let mut items = Vec::new();
887
888            loop {
889                let (frame, new_rest) = parse_frame(rest)?;
890                rest = new_rest;
891
892                match frame {
893                    Frame::StreamTerminator => {
894                        break;
895                    }
896                    item => {
897                        items.push(item);
898                    }
899                }
900            }
901
902            Ok((Frame::StreamedSet(items), rest))
903        }
904        Frame::StreamedMapHeader => {
905            // Parse streaming map pairs until terminator
906            let mut pairs = Vec::new();
907
908            loop {
909                let (frame, new_rest) = parse_frame(rest)?;
910                rest = new_rest;
911
912                match frame {
913                    Frame::StreamTerminator => {
914                        break;
915                    }
916                    key => {
917                        let (value, newer_rest) = parse_frame(rest)?;
918                        if matches!(value, Frame::StreamTerminator) {
919                            return Err(ParseError::InvalidFormat);
920                        }
921                        rest = newer_rest;
922                        pairs.push((key, value));
923                    }
924                }
925            }
926
927            Ok((Frame::StreamedMap(pairs), rest))
928        }
929        Frame::StreamedAttributeHeader => {
930            // Parse streaming attribute pairs until terminator
931            let mut pairs = Vec::new();
932
933            loop {
934                let (frame, new_rest) = parse_frame(rest)?;
935                rest = new_rest;
936
937                match frame {
938                    Frame::StreamTerminator => {
939                        break;
940                    }
941                    key => {
942                        let (value, newer_rest) = parse_frame(rest)?;
943                        if matches!(value, Frame::StreamTerminator) {
944                            return Err(ParseError::InvalidFormat);
945                        }
946                        rest = newer_rest;
947                        pairs.push((key, value));
948                    }
949                }
950            }
951
952            Ok((Frame::StreamedAttribute(pairs), rest))
953        }
954        Frame::StreamedPushHeader => {
955            // Parse streaming push items until terminator
956            let mut items = Vec::new();
957
958            loop {
959                let (frame, new_rest) = parse_frame(rest)?;
960                rest = new_rest;
961
962                match frame {
963                    Frame::StreamTerminator => {
964                        break;
965                    }
966                    item => {
967                        items.push(item);
968                    }
969                }
970            }
971
972            Ok((Frame::StreamedPush(items), rest))
973        }
974        _ => {
975            // Not a streaming sequence, just return the original frame
976            Ok((header, rest))
977        }
978    }
979}
980
981/// Find `\r\n` in `buf` starting at `from`. Returns `(line_end, after_crlf)` where
982/// `line_end` is the position of `\r` and `after_crlf` is the position after `\n`.
983#[inline]
984fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
985    let mut i = from;
986    let len = buf.len();
987    while i + 1 < len {
988        if buf[i] == b'\r' && buf[i + 1] == b'\n' {
989            return Ok((i, i + 2));
990        }
991        i += 1;
992    }
993    Err(ParseError::Incomplete)
994}
995
996/// Parse a `usize` directly from ASCII digit bytes, no UTF-8 validation needed.
997#[inline]
998fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
999    if buf.is_empty() {
1000        return Err(ParseError::BadLength);
1001    }
1002    let mut v: usize = 0;
1003    for &b in buf {
1004        if !b.is_ascii_digit() {
1005            return Err(ParseError::BadLength);
1006        }
1007        v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
1008        v = v
1009            .checked_add((b - b'0') as usize)
1010            .ok_or(ParseError::BadLength)?;
1011    }
1012    Ok(v)
1013}
1014
1015/// Parse an `i64` directly from ASCII bytes (optional leading `-`), no UTF-8 validation.
1016#[inline]
1017fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
1018    if buf.is_empty() {
1019        return Err(ParseError::InvalidFormat);
1020    }
1021    let (neg, digits) = if buf[0] == b'-' {
1022        (true, &buf[1..])
1023    } else {
1024        (false, buf)
1025    };
1026    if digits.is_empty() {
1027        return Err(ParseError::InvalidFormat);
1028    }
1029    let mut v: i64 = 0;
1030    for (i, &d) in digits.iter().enumerate() {
1031        if !d.is_ascii_digit() {
1032            return Err(ParseError::InvalidFormat);
1033        }
1034        let digit = (d - b'0') as i64;
1035        if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
1036            return Ok(i64::MIN);
1037        }
1038        if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
1039            return Err(ParseError::Overflow);
1040        }
1041        v = v * 10 + digit;
1042    }
1043    if neg { Ok(-v) } else { Ok(v) }
1044}
1045
1046/// Parse a collection count (usize) with MAX_COLLECTION_SIZE check.
1047#[inline]
1048fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
1049    let count = parse_usize(buf)?;
1050    if count > MAX_COLLECTION_SIZE {
1051        return Err(ParseError::BadLength);
1052    }
1053    Ok(count)
1054}
1055
1056/// Converts a Frame to its RESP3 byte representation.
1057///
1058/// This function serializes a Frame into the corresponding RESP3 protocol bytes.
1059pub fn frame_to_bytes(frame: &Frame) -> Bytes {
1060    let mut buf = BytesMut::new();
1061    serialize_frame(frame, &mut buf);
1062    buf.freeze()
1063}
1064
1065fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
1066    match frame {
1067        Frame::SimpleString(s) => {
1068            buf.put_u8(b'+');
1069            buf.extend_from_slice(s);
1070            buf.extend_from_slice(b"\r\n");
1071        }
1072        Frame::Error(e) => {
1073            buf.put_u8(b'-');
1074            buf.extend_from_slice(e);
1075            buf.extend_from_slice(b"\r\n");
1076        }
1077        Frame::Integer(i) => {
1078            buf.put_u8(b':');
1079            let s = i.to_string();
1080            buf.extend_from_slice(s.as_bytes());
1081            buf.extend_from_slice(b"\r\n");
1082        }
1083        Frame::BulkString(opt) => {
1084            buf.put_u8(b'$');
1085            match opt {
1086                Some(data) => {
1087                    let len = data.len().to_string();
1088                    buf.extend_from_slice(len.as_bytes());
1089                    buf.extend_from_slice(b"\r\n");
1090                    buf.extend_from_slice(data);
1091                    buf.extend_from_slice(b"\r\n");
1092                }
1093                None => {
1094                    buf.extend_from_slice(b"-1\r\n");
1095                }
1096            }
1097        }
1098        Frame::BlobError(data) => {
1099            buf.put_u8(b'!');
1100            let len = data.len().to_string();
1101            buf.extend_from_slice(len.as_bytes());
1102            buf.extend_from_slice(b"\r\n");
1103            buf.extend_from_slice(data);
1104            buf.extend_from_slice(b"\r\n");
1105        }
1106        Frame::StreamedStringHeader => {
1107            buf.extend_from_slice(b"$?\r\n");
1108        }
1109        Frame::StreamedBlobErrorHeader => {
1110            buf.extend_from_slice(b"!?\r\n");
1111        }
1112        Frame::StreamedVerbatimStringHeader => {
1113            buf.extend_from_slice(b"=?\r\n");
1114        }
1115        Frame::StreamedArrayHeader => {
1116            buf.extend_from_slice(b"*?\r\n");
1117        }
1118        Frame::StreamedSetHeader => {
1119            buf.extend_from_slice(b"~?\r\n");
1120        }
1121        Frame::StreamedMapHeader => {
1122            buf.extend_from_slice(b"%?\r\n");
1123        }
1124        Frame::StreamedAttributeHeader => {
1125            buf.extend_from_slice(b"|?\r\n");
1126        }
1127        Frame::StreamedPushHeader => {
1128            buf.extend_from_slice(b">?\r\n");
1129        }
1130        Frame::StreamedStringChunk(data) => {
1131            buf.put_u8(b';');
1132            let len = data.len().to_string();
1133            buf.extend_from_slice(len.as_bytes());
1134            buf.extend_from_slice(b"\r\n");
1135            buf.extend_from_slice(data);
1136            buf.extend_from_slice(b"\r\n");
1137        }
1138        Frame::StreamedString(chunks) => {
1139            // Serialize as streaming string sequence: $?\r\n + chunks + terminator
1140            buf.extend_from_slice(b"$?\r\n");
1141            for chunk in chunks {
1142                buf.put_u8(b';');
1143                let len = chunk.len().to_string();
1144                buf.extend_from_slice(len.as_bytes());
1145                buf.extend_from_slice(b"\r\n");
1146                buf.extend_from_slice(chunk);
1147                buf.extend_from_slice(b"\r\n");
1148            }
1149            buf.extend_from_slice(b";0\r\n\r\n");
1150        }
1151        Frame::StreamedArray(items) => {
1152            buf.extend_from_slice(b"*?\r\n");
1153            for item in items {
1154                serialize_frame(item, buf);
1155            }
1156            buf.extend_from_slice(b".\r\n");
1157        }
1158        Frame::StreamedSet(items) => {
1159            buf.extend_from_slice(b"~?\r\n");
1160            for item in items {
1161                serialize_frame(item, buf);
1162            }
1163            buf.extend_from_slice(b".\r\n");
1164        }
1165        Frame::StreamedMap(pairs) => {
1166            buf.extend_from_slice(b"%?\r\n");
1167            for (key, value) in pairs {
1168                serialize_frame(key, buf);
1169                serialize_frame(value, buf);
1170            }
1171            buf.extend_from_slice(b".\r\n");
1172        }
1173        Frame::StreamedAttribute(pairs) => {
1174            buf.extend_from_slice(b"|?\r\n");
1175            for (key, value) in pairs {
1176                serialize_frame(key, buf);
1177                serialize_frame(value, buf);
1178            }
1179            buf.extend_from_slice(b".\r\n");
1180        }
1181        Frame::StreamedPush(items) => {
1182            buf.extend_from_slice(b">?\r\n");
1183            for item in items {
1184                serialize_frame(item, buf);
1185            }
1186            buf.extend_from_slice(b".\r\n");
1187        }
1188        Frame::StreamTerminator => {
1189            buf.extend_from_slice(b".\r\n");
1190        }
1191        Frame::Null => {
1192            buf.extend_from_slice(b"_\r\n");
1193        }
1194        Frame::Double(d) => {
1195            buf.put_u8(b',');
1196            let s = d.to_string();
1197            buf.extend_from_slice(s.as_bytes());
1198            buf.extend_from_slice(b"\r\n");
1199        }
1200        Frame::SpecialFloat(f) => {
1201            buf.put_u8(b',');
1202            buf.extend_from_slice(f);
1203            buf.extend_from_slice(b"\r\n");
1204        }
1205        Frame::Boolean(b) => {
1206            buf.extend_from_slice(if *b { b"#t\r\n" } else { b"#f\r\n" });
1207        }
1208        Frame::BigNumber(n) => {
1209            buf.put_u8(b'(');
1210            buf.extend_from_slice(n);
1211            buf.extend_from_slice(b"\r\n");
1212        }
1213        Frame::VerbatimString(format, content) => {
1214            buf.put_u8(b'=');
1215            let total_len = format.len() + 1 + content.len(); // +1 for the colon
1216            let len = total_len.to_string();
1217            buf.extend_from_slice(len.as_bytes());
1218            buf.extend_from_slice(b"\r\n");
1219            buf.extend_from_slice(format);
1220            buf.put_u8(b':');
1221            buf.extend_from_slice(content);
1222            buf.extend_from_slice(b"\r\n");
1223        }
1224        Frame::Array(opt) => {
1225            buf.put_u8(b'*');
1226            match opt {
1227                Some(items) => {
1228                    let len = items.len().to_string();
1229                    buf.extend_from_slice(len.as_bytes());
1230                    buf.extend_from_slice(b"\r\n");
1231                    for item in items {
1232                        serialize_frame(item, buf);
1233                    }
1234                }
1235                None => {
1236                    buf.extend_from_slice(b"-1\r\n");
1237                }
1238            }
1239        }
1240        Frame::Set(items) => {
1241            buf.put_u8(b'~');
1242            let len = items.len().to_string();
1243            buf.extend_from_slice(len.as_bytes());
1244            buf.extend_from_slice(b"\r\n");
1245            for item in items {
1246                serialize_frame(item, buf);
1247            }
1248        }
1249        Frame::Map(pairs) => {
1250            buf.put_u8(b'%');
1251            let len = pairs.len().to_string();
1252            buf.extend_from_slice(len.as_bytes());
1253            buf.extend_from_slice(b"\r\n");
1254            for (key, value) in pairs {
1255                serialize_frame(key, buf);
1256                serialize_frame(value, buf);
1257            }
1258        }
1259        Frame::Attribute(pairs) => {
1260            buf.put_u8(b'|');
1261            let len = pairs.len().to_string();
1262            buf.extend_from_slice(len.as_bytes());
1263            buf.extend_from_slice(b"\r\n");
1264            for (key, value) in pairs {
1265                serialize_frame(key, buf);
1266                serialize_frame(value, buf);
1267            }
1268        }
1269        Frame::Push(items) => {
1270            buf.put_u8(b'>');
1271            let len = items.len().to_string();
1272            buf.extend_from_slice(len.as_bytes());
1273            buf.extend_from_slice(b"\r\n");
1274            for item in items {
1275                serialize_frame(item, buf);
1276            }
1277        }
1278    }
1279}
1280
1281#[cfg(test)]
1282mod tests {
1283    use super::{Frame, ParseError, Parser, frame_to_bytes, parse_frame, parse_streaming_sequence};
1284    use bytes::Bytes;
1285
1286    #[test]
1287    fn test_parse_frame_simple_string() {
1288        let input = Bytes::from("+HELLO\r\nWORLD");
1289        let (frame, rest) = parse_frame(input.clone()).unwrap();
1290        assert_eq!(frame, Frame::SimpleString(Bytes::from("HELLO")));
1291        assert_eq!(rest, Bytes::from("WORLD"));
1292    }
1293
1294    #[test]
1295    fn test_parse_frame_blob_error() {
1296        let input = Bytes::from("!5\r\nERROR\r\nREST");
1297        let (frame, rest) = parse_frame(input.clone()).unwrap();
1298        assert_eq!(frame, Frame::BlobError(Bytes::from("ERROR")));
1299        assert_eq!(rest, Bytes::from("REST"));
1300    }
1301
1302    #[test]
1303    fn test_parse_frame_error() {
1304        let input = Bytes::from("-ERR fail\r\nLEFT");
1305        let (frame, rest) = parse_frame(input.clone()).unwrap();
1306        assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
1307        assert_eq!(rest, Bytes::from("LEFT"));
1308    }
1309
1310    #[test]
1311    fn test_parse_frame_integer() {
1312        let input = Bytes::from(":42\r\nTAIL");
1313        let (frame, rest) = parse_frame(input.clone()).unwrap();
1314        assert_eq!(frame, Frame::Integer(42));
1315        assert_eq!(rest, Bytes::from("TAIL"));
1316    }
1317
1318    #[test]
1319    fn test_parse_frame_bulk_string() {
1320        let input = Bytes::from("$3\r\nfoo\r\nREST");
1321        let (frame, rest) = parse_frame(input.clone()).unwrap();
1322        assert_eq!(frame, Frame::BulkString(Some(Bytes::from("foo"))));
1323        assert_eq!(rest, Bytes::from("REST"));
1324        let null_input = Bytes::from("$-1\r\nAFTER");
1325        let (frame, rest) = parse_frame(null_input.clone()).unwrap();
1326        assert_eq!(frame, Frame::BulkString(None));
1327        assert_eq!(rest, Bytes::from("AFTER"));
1328    }
1329
1330    #[test]
1331    fn test_parse_frame_null() {
1332        let input = Bytes::from("_\r\nLEFT");
1333        let (frame, rest) = parse_frame(input.clone()).unwrap();
1334        assert_eq!(frame, Frame::Null);
1335        assert_eq!(rest, Bytes::from("LEFT"));
1336    }
1337
1338    #[test]
1339    fn test_parse_frame_double_and_special_float() {
1340        let input = Bytes::from(",3.5\r\nNEXT");
1341        let (frame, rest) = parse_frame(input.clone()).unwrap();
1342        assert_eq!(frame, Frame::Double(3.5));
1343        assert_eq!(rest, Bytes::from("NEXT"));
1344        let input_inf = Bytes::from(",inf\r\nTAIL");
1345        let (frame, rest) = parse_frame(input_inf.clone()).unwrap();
1346        assert_eq!(frame, Frame::SpecialFloat(Bytes::from("inf")));
1347        assert_eq!(rest, Bytes::from("TAIL"));
1348    }
1349
1350    #[test]
1351    fn test_parse_frame_boolean() {
1352        let input_true = Bytes::from("#t\r\nXYZ");
1353        let (frame, rest) = parse_frame(input_true.clone()).unwrap();
1354        assert_eq!(frame, Frame::Boolean(true));
1355        assert_eq!(rest, Bytes::from("XYZ"));
1356        let input_false = Bytes::from("#f\r\nDONE");
1357        let (frame, rest) = parse_frame(input_false.clone()).unwrap();
1358        assert_eq!(frame, Frame::Boolean(false));
1359        assert_eq!(rest, Bytes::from("DONE"));
1360    }
1361
1362    #[test]
1363    fn test_parse_frame_big_number() {
1364        let input = Bytes::from("(123456789\r\nEND");
1365        let (frame, rest) = parse_frame(input.clone()).unwrap();
1366        assert_eq!(frame, Frame::BigNumber(Bytes::from("123456789")));
1367        assert_eq!(rest, Bytes::from("END"));
1368    }
1369
1370    #[test]
1371    fn test_parse_frame_verbatim_string() {
1372        let input = Bytes::from("=12\r\ntxt:hi there\r\nAFTER");
1373        let (frame, rest) = parse_frame(input.clone()).unwrap();
1374        assert_eq!(
1375            frame,
1376            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hi there")) // Frame::VerbatimString {
1377                                                                               //     format: "txt".to_string(),
1378                                                                               //     content: "hi there".to_string()
1379                                                                               // }
1380        );
1381        assert_eq!(rest, Bytes::from("AFTER"));
1382    }
1383
1384    #[test]
1385    fn test_parse_frame_array_set_push_map_attribute() {
1386        // Array of two bulk strings
1387        let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\nTAIL");
1388        let (frame, rest) = parse_frame(input.clone()).unwrap();
1389        assert_eq!(
1390            frame,
1391            Frame::Array(Some(vec![
1392                Frame::BulkString(Some(Bytes::from("foo"))),
1393                Frame::BulkString(Some(Bytes::from("bar")))
1394            ]))
1395        );
1396        assert_eq!(rest, Bytes::from("TAIL"));
1397        // Null array
1398        let input_null = Bytes::from("*-1\r\nEND");
1399        let (frame, rest) = parse_frame(input_null.clone()).unwrap();
1400        assert_eq!(frame, Frame::Array(None));
1401        assert_eq!(rest, Bytes::from("END"));
1402        // Set of two simple strings
1403        let input_set = Bytes::from("~2\r\n+foo\r\n+bar\r\nTAIL");
1404        let (frame, rest) = parse_frame(input_set.clone()).unwrap();
1405        assert_eq!(
1406            frame,
1407            Frame::Set(vec![
1408                Frame::SimpleString(Bytes::from("foo")),
1409                Frame::SimpleString(Bytes::from("bar")),
1410            ])
1411        );
1412        assert_eq!(rest, Bytes::from("TAIL"));
1413        // Map of two key-value pairs
1414        let input_map = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\nTRAIL");
1415        let (frame, rest) = parse_frame(input_map.clone()).unwrap();
1416        assert_eq!(
1417            frame,
1418            Frame::Map(vec![
1419                (
1420                    Frame::SimpleString(Bytes::from("key1")),
1421                    Frame::SimpleString(Bytes::from("val1"))
1422                ),
1423                (
1424                    Frame::SimpleString(Bytes::from("key2")),
1425                    Frame::SimpleString(Bytes::from("val2"))
1426                ),
1427            ])
1428        );
1429        assert_eq!(rest, Bytes::from("TRAIL"));
1430        // Attribute
1431        let input_attr = Bytes::from("|1\r\n+meta\r\n+data\r\nAFTER");
1432        let (frame, rest) = parse_frame(input_attr.clone()).unwrap();
1433        assert_eq!(
1434            frame,
1435            Frame::Attribute(vec![(
1436                Frame::SimpleString(Bytes::from("meta")),
1437                Frame::SimpleString(Bytes::from("data"))
1438            ),])
1439        );
1440        assert_eq!(rest, Bytes::from("AFTER"));
1441        // Push
1442        let input_push = Bytes::from(">2\r\n+type\r\n:1\r\nNEXT");
1443        let (frame, rest) = parse_frame(input_push.clone()).unwrap();
1444        assert_eq!(
1445            frame,
1446            Frame::Push(vec![
1447                Frame::SimpleString(Bytes::from("type")),
1448                Frame::Integer(1),
1449            ])
1450        );
1451        assert_eq!(rest, Bytes::from("NEXT"));
1452    }
1453
1454    #[test]
1455    fn test_parse_frame_empty_input() {
1456        assert!(parse_frame(Bytes::new()).is_err());
1457    }
1458
1459    #[test]
1460    fn test_parse_frame_invalid_tag() {
1461        let input = Bytes::from("X123\r\n");
1462        assert!(parse_frame(input).is_err());
1463    }
1464
1465    #[test]
1466    fn test_parse_frame_malformed_bulk_length() {
1467        let input = Bytes::from("$x\r\nfoo\r\n");
1468        assert!(parse_frame(input).is_err());
1469    }
1470
1471    #[test]
1472    fn test_parse_frame_zero_length_bulk() {
1473        let input = Bytes::from("$0\r\n\r\nTAIL");
1474        let (frame, rest) = parse_frame(input.clone()).unwrap();
1475        assert_eq!(frame, Frame::BulkString(Some(Bytes::from(""))));
1476        assert_eq!(rest, Bytes::from("TAIL"));
1477    }
1478
1479    #[test]
1480    fn test_parse_frame_zero_length_blob_error() {
1481        let input = Bytes::from("!0\r\n\r\nREST");
1482        let (frame, rest) = parse_frame(input.clone()).unwrap();
1483        assert_eq!(frame, Frame::BlobError(Bytes::new()));
1484        assert_eq!(rest, Bytes::from("REST"));
1485    }
1486
1487    #[test]
1488    fn test_parse_frame_missing_crlf() {
1489        let input = Bytes::from(":42\nTAIL");
1490        assert!(parse_frame(input).is_err());
1491    }
1492
1493    #[test]
1494    fn test_parse_frame_unicode_simple_string() {
1495        let input = Bytes::from("+こんにちは\r\nEND");
1496        let (frame, rest) = parse_frame(input.clone()).unwrap();
1497        assert_eq!(frame, Frame::SimpleString(Bytes::from("こんにちは")));
1498        assert_eq!(rest, Bytes::from("END"));
1499    }
1500
1501    #[test]
1502    fn test_parse_frame_chained_frames() {
1503        let combined = Bytes::from("+OK\r\n:1\r\nfoo");
1504        let (f1, rem) = parse_frame(combined.clone()).unwrap();
1505        assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
1506        let (f2, rem2) = parse_frame(rem).unwrap();
1507        assert_eq!(f2, Frame::Integer(1));
1508        assert_eq!(rem2, Bytes::from("foo"));
1509    }
1510
1511    #[test]
1512    fn test_parse_frame_empty_array() {
1513        let input = Bytes::from("*0\r\nTAIL");
1514        let (frame, rest) = parse_frame(input.clone()).unwrap();
1515        assert_eq!(frame, Frame::Array(Some(vec![])));
1516        assert_eq!(rest, Bytes::from("TAIL"));
1517    }
1518
1519    #[test]
1520    fn test_parse_frame_partial_array_data() {
1521        let input = Bytes::from("*2\r\n+OK\r\n");
1522        assert!(parse_frame(input).is_err());
1523    }
1524
1525    #[test]
1526    fn test_parse_frame_streamed_string() {
1527        let input = Bytes::from("$?\r\n$5\r\nhello\r\n$0\r\n\r\nREST");
1528        let (frame, rem) = parse_frame(input.clone()).unwrap();
1529        assert_eq!(frame, Frame::StreamedStringHeader);
1530        let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1531        assert_eq!(chunk, Frame::BulkString(Some(Bytes::from("hello"))));
1532        let (terminator, rest) = parse_frame(rem2.clone()).unwrap();
1533        assert_eq!(terminator, Frame::BulkString(Some(Bytes::from(""))));
1534        assert_eq!(rest, Bytes::from("REST"));
1535    }
1536
1537    #[test]
1538    fn test_parse_frame_streamed_blob_error() {
1539        let input = Bytes::from("!?\r\n!5\r\nERROR\r\nREST");
1540        let (frame, rem) = parse_frame(input.clone()).unwrap();
1541        assert_eq!(frame, Frame::StreamedBlobErrorHeader);
1542        let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1543        assert_eq!(chunk, Frame::BlobError(Bytes::from("ERROR")));
1544        assert_eq!(rem2, Bytes::from("REST"));
1545    }
1546
1547    #[test]
1548    fn test_parse_frame_streamed_verbatim_string() {
1549        let input = Bytes::from("=?\r\n=9\r\ntxt:hello\r\nTAIL");
1550        let (frame, rem) = parse_frame(input.clone()).unwrap();
1551        assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
1552        let (chunk, rest) = parse_frame(rem.clone()).unwrap();
1553        assert_eq!(
1554            chunk,
1555            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello")) // Frame::VerbatimString {
1556                                                                            //     format: "txt".to_string(),
1557                                                                            //     content: "hello".to_string()
1558                                                                            // }
1559        );
1560        assert_eq!(rest, Bytes::from("TAIL"));
1561    }
1562
1563    #[test]
1564    fn test_parse_frame_streamed_array() {
1565        let input = Bytes::from("*?\r\n+one\r\n+two\r\n*0\r\nEND");
1566        let (header, rem) = parse_frame(input.clone()).unwrap();
1567        assert_eq!(header, Frame::StreamedArrayHeader);
1568        let (item1, rem2) = parse_frame(rem.clone()).unwrap();
1569        assert_eq!(item1, Frame::SimpleString(Bytes::from("one")));
1570        let (item2, rem3) = parse_frame(rem2.clone()).unwrap();
1571        assert_eq!(item2, Frame::SimpleString(Bytes::from("two")));
1572        let (terminator, rest) = parse_frame(rem3.clone()).unwrap();
1573        assert_eq!(terminator, Frame::Array(Some(vec![])));
1574        assert_eq!(rest, Bytes::from("END"));
1575    }
1576
1577    #[test]
1578    fn test_parse_frame_streamed_set_map_attr_push() {
1579        // Set
1580        let input_set = Bytes::from("~?\r\n+foo\r\n+bar\r\n~0\r\nTAIL");
1581        let (h_set, rem0) = parse_frame(input_set.clone()).unwrap();
1582        assert_eq!(h_set, Frame::StreamedSetHeader);
1583        let (s1, rem1) = parse_frame(rem0.clone()).unwrap();
1584        assert_eq!(s1, Frame::SimpleString(Bytes::from("foo")));
1585        let (s2, rem2) = parse_frame(rem1.clone()).unwrap();
1586        assert_eq!(s2, Frame::SimpleString(Bytes::from("bar")));
1587        let (term_set, rest_set) = parse_frame(rem2.clone()).unwrap();
1588        assert_eq!(term_set, Frame::Set(vec![]));
1589        assert_eq!(rest_set, Bytes::from("TAIL"));
1590        // Map
1591        let input_map = Bytes::from("%?\r\n+key\r\n+val\r\n%0\r\nNEXT");
1592        let (h_map, rem_map) = parse_frame(input_map.clone()).unwrap();
1593        assert_eq!(h_map, Frame::StreamedMapHeader);
1594        let (k, rem_map2) = parse_frame(rem_map.clone()).unwrap();
1595        assert_eq!(k, Frame::SimpleString(Bytes::from("key")));
1596        let (v, rem_map3) = parse_frame(rem_map2.clone()).unwrap();
1597        assert_eq!(v, Frame::SimpleString(Bytes::from("val")));
1598        let (term_map, rest_map4) = parse_frame(rem_map3.clone()).unwrap();
1599        assert_eq!(term_map, Frame::Map(vec![]));
1600        assert_eq!(rest_map4, Bytes::from("NEXT"));
1601        // Attribute
1602        let input_attr = Bytes::from("|?\r\n+meta\r\n+info\r\n|0\r\nMORE");
1603        let (h_attr, rem_attr) = parse_frame(input_attr.clone()).unwrap();
1604        assert_eq!(h_attr, Frame::StreamedAttributeHeader);
1605        let (a1, rem_attr2) = parse_frame(rem_attr.clone()).unwrap();
1606        assert_eq!(a1, Frame::SimpleString(Bytes::from("meta")));
1607        let (a2, rem_attr3) = parse_frame(rem_attr2.clone()).unwrap();
1608        assert_eq!(a2, Frame::SimpleString(Bytes::from("info")));
1609        let (term_attr, rest_attr) = parse_frame(rem_attr3.clone()).unwrap();
1610        assert_eq!(term_attr, Frame::Attribute(vec![]));
1611        assert_eq!(rest_attr, Bytes::from("MORE"));
1612        // Push
1613        let input_push = Bytes::from(">?\r\n:1\r\n:2\r\n>0\r\nEND");
1614        let (h_push, rem_push) = parse_frame(input_push.clone()).unwrap();
1615        assert_eq!(h_push, Frame::StreamedPushHeader);
1616        let (p1, rem_push2) = parse_frame(rem_push.clone()).unwrap();
1617        assert_eq!(p1, Frame::Integer(1));
1618        let (p2, rem_push3) = parse_frame(rem_push2.clone()).unwrap();
1619        assert_eq!(p2, Frame::Integer(2));
1620        let (term_push, rest_push) = parse_frame(rem_push3.clone()).unwrap();
1621        assert_eq!(term_push, Frame::Push(vec![]));
1622        assert_eq!(rest_push, Bytes::from("END"));
1623    }
1624
1625    #[test]
1626    fn test_parse_frame_stream_terminator() {
1627        let input = Bytes::from(".\r\nREST");
1628        let (frame, rest) = parse_frame(input.clone()).unwrap();
1629        assert_eq!(frame, Frame::StreamTerminator);
1630        assert_eq!(rest, Bytes::from("REST"));
1631    }
1632
1633    #[test]
1634    fn test_parse_frame_null_blob_error_rejected() {
1635        let input = Bytes::from("!-1\r\nTAIL");
1636        assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1637    }
1638
1639    #[test]
1640    fn test_parse_frame_null_verbatim_rejected() {
1641        let input = Bytes::from("=-1\r\nTAIL");
1642        assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1643    }
1644
1645    #[test]
1646    fn test_verbatim_string_format_must_be_3_bytes() {
1647        // Too short (1 byte format)
1648        let input = Bytes::from("=6\r\nx:data\r\n");
1649        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1650
1651        // Too long (4 byte format)
1652        let input = Bytes::from("=9\r\ntxtx:data\r\n");
1653        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1654
1655        // Empty format (colon at start)
1656        let input = Bytes::from("=5\r\n:data\r\n");
1657        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1658
1659        // Valid 3-byte format should still work
1660        let input = Bytes::from("=8\r\ntxt:data\r\n");
1661        let (frame, _) = parse_frame(input).unwrap();
1662        assert_eq!(
1663            frame,
1664            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("data"))
1665        );
1666    }
1667
1668    #[test]
1669    fn test_parse_frame_special_float_nan() {
1670        let input = Bytes::from(",nan\r\nTAIL");
1671        let (frame, rest) = parse_frame(input.clone()).unwrap();
1672        assert_eq!(frame, Frame::SpecialFloat(Bytes::from("nan")));
1673        assert_eq!(rest, Bytes::from("TAIL"));
1674    }
1675
1676    #[test]
1677    fn test_parse_frame_big_number_zero() {
1678        let input = Bytes::from("(0\r\nEND");
1679        let (frame, rest) = parse_frame(input.clone()).unwrap();
1680        assert_eq!(frame, Frame::BigNumber(Bytes::from("0")));
1681        assert_eq!(rest, Bytes::from("END"));
1682    }
1683
1684    #[test]
1685    fn test_parse_frame_collection_empty() {
1686        let input_push = Bytes::from(">0\r\nTAIL");
1687        let (f_push, r_push) = parse_frame(input_push.clone()).unwrap();
1688        assert_eq!(f_push, Frame::Push(vec![]));
1689        assert_eq!(r_push, Bytes::from("TAIL"));
1690        let input_attr = Bytes::from("|0\r\nAFTER");
1691        let (f_attr, r_attr) = parse_frame(input_attr.clone()).unwrap();
1692        assert_eq!(f_attr, Frame::Attribute(vec![]));
1693        assert_eq!(r_attr, Bytes::from("AFTER"));
1694        let input_map = Bytes::from("%0\r\nEND");
1695        let (f_map, r_map) = parse_frame(input_map.clone()).unwrap();
1696        assert_eq!(f_map, Frame::Map(vec![]));
1697        assert_eq!(r_map, Bytes::from("END"));
1698        let input_set = Bytes::from("~0\r\nDONE");
1699        let (f_set, r_set) = parse_frame(input_set.clone()).unwrap();
1700        assert_eq!(f_set, Frame::Set(vec![]));
1701        assert_eq!(r_set, Bytes::from("DONE"));
1702        let input_arr = Bytes::from("*-1\r\nFIN");
1703        let (f_arr, r_arr) = parse_frame(input_arr.clone()).unwrap();
1704        assert_eq!(f_arr, Frame::Array(None));
1705        assert_eq!(r_arr, Bytes::from("FIN"));
1706    }
1707
1708    // Round-trip tests for serialization and parsing
1709
1710    #[test]
1711    fn test_roundtrip_simple_string() {
1712        let original = Bytes::from("+hello\r\n");
1713        let (frame, _) = parse_frame(original.clone()).unwrap();
1714        let serialized = frame_to_bytes(&frame);
1715        assert_eq!(original, serialized);
1716
1717        let (reparsed, _) = parse_frame(serialized).unwrap();
1718        assert_eq!(frame, reparsed);
1719    }
1720
1721    #[test]
1722    fn test_roundtrip_error() {
1723        let original = Bytes::from("-ERR error message\r\n");
1724        let (frame, _) = parse_frame(original.clone()).unwrap();
1725        let serialized = frame_to_bytes(&frame);
1726        assert_eq!(original, serialized);
1727
1728        let (reparsed, _) = parse_frame(serialized).unwrap();
1729        assert_eq!(frame, reparsed);
1730    }
1731
1732    #[test]
1733    fn test_roundtrip_integer() {
1734        let original = Bytes::from(":12345\r\n");
1735        let (frame, _) = parse_frame(original.clone()).unwrap();
1736        let serialized = frame_to_bytes(&frame);
1737        assert_eq!(original, serialized);
1738
1739        let (reparsed, _) = parse_frame(serialized).unwrap();
1740        assert_eq!(frame, reparsed);
1741    }
1742
1743    #[test]
1744    fn test_roundtrip_bulk_string() {
1745        let original = Bytes::from("$5\r\nhello\r\n");
1746        let (frame, _) = parse_frame(original.clone()).unwrap();
1747        let serialized = frame_to_bytes(&frame);
1748        assert_eq!(original, serialized);
1749
1750        let (reparsed, _) = parse_frame(serialized).unwrap();
1751        assert_eq!(frame, reparsed);
1752
1753        // Test null bulk string
1754        let original_null = Bytes::from("$-1\r\n");
1755        let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1756        let serialized_null = frame_to_bytes(&frame_null);
1757        assert_eq!(original_null, serialized_null);
1758
1759        let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1760        assert_eq!(frame_null, reparsed_null);
1761    }
1762
1763    #[test]
1764    fn test_roundtrip_blob_error() {
1765        let original = Bytes::from("!5\r\nerror\r\n");
1766        let (frame, _) = parse_frame(original.clone()).unwrap();
1767        let serialized = frame_to_bytes(&frame);
1768        assert_eq!(original, serialized);
1769
1770        let (reparsed, _) = parse_frame(serialized).unwrap();
1771        assert_eq!(frame, reparsed);
1772    }
1773
1774    #[test]
1775    fn test_roundtrip_null() {
1776        let original = Bytes::from("_\r\n");
1777        let (frame, _) = parse_frame(original.clone()).unwrap();
1778        let serialized = frame_to_bytes(&frame);
1779        assert_eq!(original, serialized);
1780
1781        let (reparsed, _) = parse_frame(serialized).unwrap();
1782        assert_eq!(frame, reparsed);
1783    }
1784
1785    #[test]
1786    fn test_roundtrip_double() {
1787        let original = Bytes::from(",3.14159\r\n");
1788        let (frame, _) = parse_frame(original.clone()).unwrap();
1789        let serialized = frame_to_bytes(&frame);
1790
1791        // Note: The exact string representation of floating point numbers might differ
1792        // between parsing and serializing due to formatting differences
1793        let (reparsed, _) = parse_frame(serialized).unwrap();
1794        assert_eq!(frame, reparsed);
1795    }
1796
1797    #[test]
1798    fn test_roundtrip_special_float() {
1799        let original = Bytes::from(",inf\r\n");
1800        let (frame, _) = parse_frame(original.clone()).unwrap();
1801        let serialized = frame_to_bytes(&frame);
1802        assert_eq!(original, serialized);
1803
1804        let (reparsed, _) = parse_frame(serialized).unwrap();
1805        assert_eq!(frame, reparsed);
1806    }
1807
1808    #[test]
1809    fn test_roundtrip_boolean() {
1810        let original_true = Bytes::from("#t\r\n");
1811        let (frame_true, _) = parse_frame(original_true.clone()).unwrap();
1812        let serialized_true = frame_to_bytes(&frame_true);
1813        assert_eq!(original_true, serialized_true);
1814
1815        let (reparsed_true, _) = parse_frame(serialized_true).unwrap();
1816        assert_eq!(frame_true, reparsed_true);
1817
1818        let original_false = Bytes::from("#f\r\n");
1819        let (frame_false, _) = parse_frame(original_false.clone()).unwrap();
1820        let serialized_false = frame_to_bytes(&frame_false);
1821        assert_eq!(original_false, serialized_false);
1822
1823        let (reparsed_false, _) = parse_frame(serialized_false).unwrap();
1824        assert_eq!(frame_false, reparsed_false);
1825    }
1826
1827    #[test]
1828    fn test_roundtrip_big_number() {
1829        let original = Bytes::from("(12345678901234567890\r\n");
1830        let (frame, _) = parse_frame(original.clone()).unwrap();
1831        let serialized = frame_to_bytes(&frame);
1832        assert_eq!(original, serialized);
1833
1834        let (reparsed, _) = parse_frame(serialized).unwrap();
1835        assert_eq!(frame, reparsed);
1836    }
1837
1838    #[test]
1839    fn test_roundtrip_verbatim_string() {
1840        let original = Bytes::from("=10\r\ntxt:hello!\r\n");
1841        let (frame, _) = parse_frame(original.clone()).unwrap();
1842        let serialized = frame_to_bytes(&frame);
1843        assert_eq!(original, serialized);
1844
1845        let (reparsed, _) = parse_frame(serialized).unwrap();
1846        assert_eq!(frame, reparsed);
1847    }
1848
1849    #[test]
1850    fn test_roundtrip_array() {
1851        let original = Bytes::from("*2\r\n+hello\r\n:123\r\n");
1852        let (frame, _) = parse_frame(original.clone()).unwrap();
1853        let serialized = frame_to_bytes(&frame);
1854        assert_eq!(original, serialized);
1855
1856        let (reparsed, _) = parse_frame(serialized).unwrap();
1857        assert_eq!(frame, reparsed);
1858
1859        // Test null array
1860        let original_null = Bytes::from("*-1\r\n");
1861        let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1862        let serialized_null = frame_to_bytes(&frame_null);
1863        assert_eq!(original_null, serialized_null);
1864
1865        let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1866        assert_eq!(frame_null, reparsed_null);
1867    }
1868
1869    #[test]
1870    fn test_roundtrip_set() {
1871        let original = Bytes::from("~2\r\n+one\r\n+two\r\n");
1872        let (frame, _) = parse_frame(original.clone()).unwrap();
1873        let serialized = frame_to_bytes(&frame);
1874        assert_eq!(original, serialized);
1875
1876        let (reparsed, _) = parse_frame(serialized).unwrap();
1877        assert_eq!(frame, reparsed);
1878    }
1879
1880    #[test]
1881    fn test_roundtrip_map() {
1882        let original = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\n");
1883        let (frame, _) = parse_frame(original.clone()).unwrap();
1884        let serialized = frame_to_bytes(&frame);
1885        assert_eq!(original, serialized);
1886
1887        let (reparsed, _) = parse_frame(serialized).unwrap();
1888        assert_eq!(frame, reparsed);
1889    }
1890
1891    #[test]
1892    fn test_roundtrip_attribute() {
1893        let original = Bytes::from("|1\r\n+key\r\n+val\r\n");
1894        let (frame, _) = parse_frame(original.clone()).unwrap();
1895        let serialized = frame_to_bytes(&frame);
1896        assert_eq!(original, serialized);
1897
1898        let (reparsed, _) = parse_frame(serialized).unwrap();
1899        assert_eq!(frame, reparsed);
1900    }
1901
1902    #[test]
1903    fn test_roundtrip_push() {
1904        let original = Bytes::from(">2\r\n+msg\r\n+data\r\n");
1905        let (frame, _) = parse_frame(original.clone()).unwrap();
1906        let serialized = frame_to_bytes(&frame);
1907        assert_eq!(original, serialized);
1908
1909        let (reparsed, _) = parse_frame(serialized).unwrap();
1910        assert_eq!(frame, reparsed);
1911    }
1912
1913    #[test]
1914    fn test_roundtrip_streaming_headers() {
1915        let headers = [
1916            ("$?\r\n", Frame::StreamedStringHeader),
1917            ("!?\r\n", Frame::StreamedBlobErrorHeader),
1918            ("=?\r\n", Frame::StreamedVerbatimStringHeader),
1919            ("*?\r\n", Frame::StreamedArrayHeader),
1920            ("~?\r\n", Frame::StreamedSetHeader),
1921            ("%?\r\n", Frame::StreamedMapHeader),
1922            ("|?\r\n", Frame::StreamedAttributeHeader),
1923            (">?\r\n", Frame::StreamedPushHeader),
1924            (".\r\n", Frame::StreamTerminator),
1925        ];
1926
1927        for (original_str, expected_frame) in headers {
1928            let original = Bytes::from(original_str);
1929            let (frame, _) = parse_frame(original.clone()).unwrap();
1930            assert_eq!(frame, expected_frame);
1931
1932            let serialized = frame_to_bytes(&frame);
1933            assert_eq!(original, serialized);
1934
1935            let (reparsed, _) = parse_frame(serialized).unwrap();
1936            assert_eq!(frame, reparsed);
1937        }
1938    }
1939
1940    #[test]
1941    fn test_roundtrip_streaming_chunks() {
1942        let chunks = [
1943            (
1944                ";4\r\nHell\r\n",
1945                Frame::StreamedStringChunk(Bytes::from("Hell")),
1946            ),
1947            (
1948                ";5\r\no wor\r\n",
1949                Frame::StreamedStringChunk(Bytes::from("o wor")),
1950            ),
1951            (";1\r\nd\r\n", Frame::StreamedStringChunk(Bytes::from("d"))),
1952            (";0\r\n\r\n", Frame::StreamedStringChunk(Bytes::new())),
1953            (
1954                ";11\r\nHello World\r\n",
1955                Frame::StreamedStringChunk(Bytes::from("Hello World")),
1956            ),
1957        ];
1958
1959        for (original_str, expected_frame) in chunks {
1960            let original = Bytes::from(original_str);
1961            let (frame, rest) = parse_frame(original.clone()).unwrap();
1962            assert_eq!(frame, expected_frame);
1963            assert!(rest.is_empty());
1964
1965            let serialized = frame_to_bytes(&frame);
1966            assert_eq!(original, serialized);
1967
1968            let (reparsed, _) = parse_frame(serialized).unwrap();
1969            assert_eq!(frame, reparsed);
1970        }
1971    }
1972
1973    #[test]
1974    fn test_streaming_chunks_edge_cases() {
1975        // Test incomplete chunk (missing data)
1976        let data = Bytes::from(";4\r\nHel");
1977        let result = parse_frame(data);
1978        assert!(matches!(result, Err(ParseError::Incomplete)));
1979
1980        // Test incomplete chunk (missing CRLF)
1981        let data = Bytes::from(";4\r\nHell");
1982        let result = parse_frame(data);
1983        assert!(matches!(result, Err(ParseError::Incomplete)));
1984
1985        // Test invalid length format
1986        let data = Bytes::from(";abc\r\ndata\r\n");
1987        let result = parse_frame(data);
1988        assert!(matches!(result, Err(ParseError::BadLength)));
1989
1990        // Test negative length
1991        let data = Bytes::from(";-1\r\ndata\r\n");
1992        let result = parse_frame(data);
1993        assert!(matches!(result, Err(ParseError::BadLength)));
1994
1995        // Test length mismatch (length says 5 but only 4 bytes)
1996        let data = Bytes::from(";5\r\nHell\r\n");
1997        let result = parse_frame(data);
1998        assert!(matches!(result, Err(ParseError::Incomplete)));
1999
2000        // Test zero-length chunk without trailing CRLF returns Incomplete
2001        let data = Bytes::from(";0\r\n");
2002        let result = parse_frame(data);
2003        assert!(matches!(result, Err(ParseError::Incomplete)));
2004
2005        // Test binary data in chunk
2006        let binary_data = b"\x00\x01\x02\x03\xFF";
2007        let mut chunk_data = Vec::new();
2008        chunk_data.extend_from_slice(b";5\r\n");
2009        chunk_data.extend_from_slice(binary_data);
2010        chunk_data.extend_from_slice(b"\r\n");
2011        let data = Bytes::from(chunk_data);
2012        let result = parse_frame(data);
2013        assert!(result.is_ok());
2014        let (frame, _) = result.unwrap();
2015        if let Frame::StreamedStringChunk(chunk) = frame {
2016            assert_eq!(chunk.as_ref(), binary_data);
2017        }
2018    }
2019
2020    #[test]
2021    fn test_roundtrip_streaming_sequences() {
2022        // Test streaming string roundtrip
2023        let streaming_string = Frame::StreamedString(vec![
2024            Bytes::from("Hell"),
2025            Bytes::from("o wor"),
2026            Bytes::from("ld"),
2027        ]);
2028        let serialized = frame_to_bytes(&streaming_string);
2029        let expected = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;2\r\nld\r\n;0\r\n\r\n";
2030        assert_eq!(serialized, Bytes::from(expected));
2031
2032        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
2033        assert_eq!(parsed, streaming_string);
2034
2035        // Test streaming array roundtrip
2036        let streaming_array = Frame::StreamedArray(vec![
2037            Frame::SimpleString(Bytes::from("hello")),
2038            Frame::Integer(42),
2039            Frame::Boolean(true),
2040        ]);
2041        let serialized = frame_to_bytes(&streaming_array);
2042        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
2043        assert_eq!(parsed, streaming_array);
2044
2045        // Test streaming map roundtrip
2046        let streaming_map = Frame::StreamedMap(vec![
2047            (
2048                Frame::SimpleString(Bytes::from("key1")),
2049                Frame::SimpleString(Bytes::from("val1")),
2050            ),
2051            (
2052                Frame::SimpleString(Bytes::from("key2")),
2053                Frame::Integer(123),
2054            ),
2055        ]);
2056        let serialized = frame_to_bytes(&streaming_map);
2057        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
2058        assert_eq!(parsed, streaming_map);
2059
2060        // Test empty streaming string
2061        let empty_streaming = Frame::StreamedString(vec![]);
2062        let serialized = frame_to_bytes(&empty_streaming);
2063        let expected = "$?\r\n;0\r\n\r\n";
2064        assert_eq!(serialized, Bytes::from(expected));
2065        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
2066        assert_eq!(parsed, empty_streaming);
2067
2068        // Test streaming set roundtrip
2069        let streaming_set = Frame::StreamedSet(vec![
2070            Frame::SimpleString(Bytes::from("apple")),
2071            Frame::SimpleString(Bytes::from("banana")),
2072            Frame::Integer(42),
2073        ]);
2074        let serialized = frame_to_bytes(&streaming_set);
2075        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
2076        assert_eq!(parsed, streaming_set);
2077
2078        // Test streaming attribute roundtrip
2079        let streaming_attribute = Frame::StreamedAttribute(vec![
2080            (
2081                Frame::SimpleString(Bytes::from("trace-id")),
2082                Frame::SimpleString(Bytes::from("abc123")),
2083            ),
2084            (
2085                Frame::SimpleString(Bytes::from("span-id")),
2086                Frame::SimpleString(Bytes::from("def456")),
2087            ),
2088        ]);
2089        let serialized = frame_to_bytes(&streaming_attribute);
2090        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
2091        assert_eq!(parsed, streaming_attribute);
2092
2093        // Test streaming push roundtrip
2094        let streaming_push = Frame::StreamedPush(vec![
2095            Frame::SimpleString(Bytes::from("pubsub")),
2096            Frame::SimpleString(Bytes::from("channel1")),
2097            Frame::SimpleString(Bytes::from("message data")),
2098        ]);
2099        let serialized = frame_to_bytes(&streaming_push);
2100        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
2101        assert_eq!(parsed, streaming_push);
2102
2103        // Test empty streaming containers
2104        let empty_array = Frame::StreamedArray(vec![]);
2105        let serialized = frame_to_bytes(&empty_array);
2106        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
2107        assert_eq!(parsed, empty_array);
2108
2109        let empty_set = Frame::StreamedSet(vec![]);
2110        let serialized = frame_to_bytes(&empty_set);
2111        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
2112        assert_eq!(parsed, empty_set);
2113    }
2114
2115    #[test]
2116    fn test_streaming_sequences_edge_cases() {
2117        // Test incomplete streaming string (missing zero-length terminator)
2118        let data = Bytes::from("$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n");
2119        let result = parse_streaming_sequence(data);
2120        assert!(matches!(result, Err(ParseError::Incomplete)));
2121
2122        // Test malformed chunk in streaming sequence
2123        let data = Bytes::from("$?\r\n;abc\r\nHell\r\n;0\r\n");
2124        let result = parse_streaming_sequence(data);
2125        assert!(matches!(result, Err(ParseError::BadLength)));
2126
2127        // Test streaming array with incomplete terminator
2128        let data = Bytes::from("*?\r\n+hello\r\n:42\r\n");
2129        let result = parse_streaming_sequence(data);
2130        assert!(matches!(result, Err(ParseError::Incomplete)));
2131
2132        // Test mixed streaming and non-streaming content
2133        let data = Bytes::from("*?\r\n+hello\r\n*2\r\n:1\r\n:2\r\n.\r\n");
2134        let result = parse_streaming_sequence(data);
2135        assert!(result.is_ok());
2136        let (frame, _) = result.unwrap();
2137        if let Frame::StreamedArray(items) = frame {
2138            assert_eq!(items.len(), 2);
2139            assert!(matches!(items[0], Frame::SimpleString(_)));
2140            assert!(matches!(items[1], Frame::Array(_)));
2141        }
2142
2143        // Test empty streaming containers
2144        let data = Bytes::from("*?\r\n.\r\n");
2145        let result = parse_streaming_sequence(data);
2146        assert!(result.is_ok());
2147        let (frame, _) = result.unwrap();
2148        if let Frame::StreamedArray(items) = frame {
2149            assert!(items.is_empty());
2150        }
2151
2152        // Test streaming map with odd number of elements
2153        let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+orphan\r\n.\r\n");
2154        let result = parse_streaming_sequence(data);
2155        assert!(matches!(result, Err(ParseError::InvalidFormat)));
2156
2157        // Test non-streaming frame passed to parse_streaming_sequence
2158        let data = Bytes::from("+simple\r\n");
2159        let result = parse_streaming_sequence(data);
2160        assert!(result.is_ok());
2161        let (frame, _) = result.unwrap();
2162        assert!(matches!(frame, Frame::SimpleString(_)));
2163
2164        // Test extremely large chunk size (should fail gracefully)
2165        let data = Bytes::from(";999999999999999999\r\ndata\r\n");
2166        let result = parse_frame(data);
2167        // The parsing might succeed but fail later when trying to read the data
2168        // Let's check what actually happens and accept either BadLength or Incomplete
2169        match &result {
2170            Err(ParseError::BadLength) => {}  // Expected
2171            Err(ParseError::Incomplete) => {} // Also acceptable - might not have enough data for huge chunk
2172            Err(e) => panic!("Got unexpected error type: {e:?}"),
2173            Ok(_) => panic!("Large chunk size should fail"),
2174        }
2175
2176        // Test streaming string with non-chunk frame mixed in
2177        let data = Bytes::from("$?\r\n+invalid\r\n;0\r\n");
2178        let result = parse_streaming_sequence(data);
2179        assert!(matches!(result, Err(ParseError::InvalidFormat)));
2180
2181        // Test streaming sequence with corrupted terminator
2182        let data = Bytes::from("*?\r\n+hello\r\n.corrupted\r\n");
2183        let result = parse_streaming_sequence(data);
2184        assert!(matches!(result, Err(ParseError::Incomplete)));
2185
2186        // Test empty input to parse_streaming_sequence
2187        let data = Bytes::new();
2188        let result = parse_streaming_sequence(data);
2189        assert!(matches!(result, Err(ParseError::Incomplete)));
2190
2191        // Test streaming sequence with partial frame at end
2192        let data = Bytes::from("*?\r\n+hello\r\n$5\r\nwo");
2193        let result = parse_streaming_sequence(data);
2194        assert!(matches!(result, Err(ParseError::Incomplete)));
2195    }
2196
2197    #[test]
2198    fn test_roundtrip_nested_structures() {
2199        // Test a complex nested structure
2200        let original = Bytes::from(
2201            "*3\r\n+hello\r\n%2\r\n+key1\r\n:123\r\n+key2\r\n~1\r\n+item\r\n|1\r\n+meta\r\n+data\r\n",
2202        );
2203        let (frame, _) = parse_frame(original.clone()).unwrap();
2204        let serialized = frame_to_bytes(&frame);
2205
2206        let (reparsed, _) = parse_frame(serialized).unwrap();
2207        assert_eq!(frame, reparsed);
2208    }
2209
2210    #[test]
2211    fn test_zero_length_bulk_string_requires_trailing_crlf() {
2212        // Complete: $0\r\n\r\n
2213        let input = Bytes::from("$0\r\n\r\nTAIL");
2214        let (frame, rest) = parse_frame(input).unwrap();
2215        assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
2216        assert_eq!(rest, Bytes::from("TAIL"));
2217
2218        // Incomplete: $0\r\n with no trailing data
2219        let input = Bytes::from("$0\r\n");
2220        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2221
2222        // Incomplete: $0\r\n with only one byte
2223        let input = Bytes::from("$0\r\n\r");
2224        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2225
2226        // Invalid: $0\r\n followed by non-CRLF
2227        let input = Bytes::from("$0\r\nXY");
2228        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2229    }
2230
2231    #[test]
2232    fn test_zero_length_streamed_chunk_requires_trailing_crlf() {
2233        // Complete: ;0\r\n\r\n
2234        let input = Bytes::from(";0\r\n\r\nTAIL");
2235        let (frame, rest) = parse_frame(input).unwrap();
2236        assert_eq!(frame, Frame::StreamedStringChunk(Bytes::new()));
2237        assert_eq!(rest, Bytes::from("TAIL"));
2238
2239        // Incomplete: ;0\r\n with no trailing data
2240        let input = Bytes::from(";0\r\n");
2241        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2242
2243        // Invalid: ;0\r\n followed by non-CRLF
2244        let input = Bytes::from(";0\r\nXY");
2245        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2246    }
2247
2248    #[test]
2249    fn test_integer_overflow_returns_overflow_error() {
2250        // One past i64::MAX
2251        let input = Bytes::from(":9223372036854775808\r\n");
2252        assert_eq!(parse_frame(input), Err(ParseError::Overflow));
2253
2254        // i64::MAX should succeed
2255        let input = Bytes::from(":9223372036854775807\r\n");
2256        let (frame, _) = parse_frame(input).unwrap();
2257        assert_eq!(frame, Frame::Integer(i64::MAX));
2258
2259        // i64::MIN should succeed
2260        let input = Bytes::from(":-9223372036854775808\r\n");
2261        let (frame, _) = parse_frame(input).unwrap();
2262        assert_eq!(frame, Frame::Integer(i64::MIN));
2263    }
2264
2265    #[test]
2266    fn test_parser_propagates_errors() {
2267        let mut parser = Parser::new();
2268        parser.feed(Bytes::from("XINVALID\r\n"));
2269        let result = parser.next_frame();
2270        assert!(result.is_err());
2271        assert_eq!(result.unwrap_err(), ParseError::InvalidTag(b'X'));
2272    }
2273
2274    #[test]
2275    fn test_parser_returns_ok_none_for_incomplete() {
2276        let mut parser = Parser::new();
2277        parser.feed(Bytes::from("+HELL"));
2278        assert_eq!(parser.next_frame().unwrap(), None);
2279    }
2280
2281    #[test]
2282    fn test_integer_negative_overflow() {
2283        // One past i64::MIN
2284        assert!(parse_frame(Bytes::from(":-9223372036854775809\r\n")).is_err());
2285    }
2286
2287    #[test]
2288    fn test_nonempty_bulk_malformed_terminator() {
2289        // Not enough data after payload
2290        assert_eq!(
2291            parse_frame(Bytes::from("$3\r\nfoo")),
2292            Err(ParseError::Incomplete)
2293        );
2294        // Only one byte after payload
2295        assert_eq!(
2296            parse_frame(Bytes::from("$3\r\nfooX")),
2297            Err(ParseError::Incomplete)
2298        );
2299        // Two bytes present but wrong
2300        assert_eq!(
2301            parse_frame(Bytes::from("$3\r\nfooXY")),
2302            Err(ParseError::InvalidFormat)
2303        );
2304    }
2305
2306    #[test]
2307    fn test_blob_error_malformed_terminator() {
2308        assert_eq!(
2309            parse_frame(Bytes::from("!3\r\nerr")),
2310            Err(ParseError::Incomplete)
2311        );
2312        assert_eq!(
2313            parse_frame(Bytes::from("!3\r\nerrXY")),
2314            Err(ParseError::InvalidFormat)
2315        );
2316    }
2317
2318    #[test]
2319    fn test_verbatim_string_malformed_terminator() {
2320        assert_eq!(
2321            parse_frame(Bytes::from("=8\r\ntxt:data")),
2322            Err(ParseError::Incomplete)
2323        );
2324        assert_eq!(
2325            parse_frame(Bytes::from("=8\r\ntxt:dataXY")),
2326            Err(ParseError::InvalidFormat)
2327        );
2328    }
2329
2330    #[test]
2331    fn test_streamed_chunk_malformed_terminator() {
2332        assert_eq!(
2333            parse_frame(Bytes::from(";3\r\nabc")),
2334            Err(ParseError::Incomplete)
2335        );
2336        assert_eq!(
2337            parse_frame(Bytes::from(";3\r\nabcXY")),
2338            Err(ParseError::InvalidFormat)
2339        );
2340    }
2341
2342    #[test]
2343    fn test_bulk_string_size_limit() {
2344        // Over MAX_BULK_STRING_SIZE (512 MB)
2345        assert_eq!(
2346            parse_frame(Bytes::from("$536870913\r\n")),
2347            Err(ParseError::BadLength)
2348        );
2349    }
2350
2351    #[test]
2352    fn test_blob_error_size_limit() {
2353        assert_eq!(
2354            parse_frame(Bytes::from("!536870913\r\n")),
2355            Err(ParseError::BadLength)
2356        );
2357    }
2358
2359    #[test]
2360    fn test_verbatim_string_size_limit() {
2361        assert_eq!(
2362            parse_frame(Bytes::from("=536870913\r\n")),
2363            Err(ParseError::BadLength)
2364        );
2365    }
2366
2367    #[test]
2368    fn test_streamed_chunk_size_limit() {
2369        assert_eq!(
2370            parse_frame(Bytes::from(";536870913\r\n")),
2371            Err(ParseError::BadLength)
2372        );
2373    }
2374
2375    #[test]
2376    fn test_invalid_double() {
2377        assert_eq!(
2378            parse_frame(Bytes::from(",foo\r\n")),
2379            Err(ParseError::InvalidFormat)
2380        );
2381    }
2382
2383    #[test]
2384    fn test_invalid_boolean() {
2385        assert_eq!(
2386            parse_frame(Bytes::from("#\r\n")),
2387            Err(ParseError::InvalidBoolean)
2388        );
2389        assert_eq!(
2390            parse_frame(Bytes::from("#true\r\n")),
2391            Err(ParseError::InvalidBoolean)
2392        );
2393    }
2394
2395    #[test]
2396    fn test_parser_clears_buffer_on_error() {
2397        let mut parser = Parser::new();
2398        parser.feed(Bytes::from("X\r\n"));
2399        assert_eq!(parser.next_frame(), Err(ParseError::InvalidTag(b'X')));
2400        assert_eq!(parser.buffered_bytes(), 0);
2401    }
2402
2403    #[test]
2404    fn test_parser_recovers_after_error() {
2405        let mut parser = Parser::new();
2406        parser.feed(Bytes::from("X\r\n"));
2407        assert!(parser.next_frame().is_err());
2408        assert_eq!(parser.buffered_bytes(), 0);
2409
2410        parser.feed(Bytes::from("+OK\r\n"));
2411        let frame = parser.next_frame().unwrap().unwrap();
2412        assert_eq!(frame, Frame::SimpleString(Bytes::from("OK")));
2413    }
2414
2415    #[test]
2416    fn test_streaming_set_roundtrip() {
2417        let data = Bytes::from("~?\r\n+a\r\n+b\r\n+c\r\n.\r\n");
2418        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2419        assert_eq!(
2420            frame,
2421            Frame::StreamedSet(vec![
2422                Frame::SimpleString(Bytes::from("a")),
2423                Frame::SimpleString(Bytes::from("b")),
2424                Frame::SimpleString(Bytes::from("c")),
2425            ])
2426        );
2427        assert!(rest.is_empty());
2428    }
2429
2430    #[test]
2431    fn test_streaming_attribute_roundtrip() {
2432        let data = Bytes::from("|?\r\n+key\r\n+val\r\n.\r\n");
2433        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2434        assert_eq!(
2435            frame,
2436            Frame::StreamedAttribute(vec![(
2437                Frame::SimpleString(Bytes::from("key")),
2438                Frame::SimpleString(Bytes::from("val")),
2439            )])
2440        );
2441        assert!(rest.is_empty());
2442    }
2443
2444    #[test]
2445    fn test_streaming_push_roundtrip() {
2446        let data = Bytes::from(">?\r\n+pubsub\r\n+channel\r\n+message\r\n.\r\n");
2447        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2448        assert_eq!(
2449            frame,
2450            Frame::StreamedPush(vec![
2451                Frame::SimpleString(Bytes::from("pubsub")),
2452                Frame::SimpleString(Bytes::from("channel")),
2453                Frame::SimpleString(Bytes::from("message")),
2454            ])
2455        );
2456        assert!(rest.is_empty());
2457    }
2458
2459    #[test]
2460    fn test_empty_streaming_containers() {
2461        // Empty streaming string
2462        let data = Bytes::from("$?\r\n;0\r\n\r\n");
2463        let (frame, _) = parse_streaming_sequence(data).unwrap();
2464        assert_eq!(frame, Frame::StreamedString(vec![]));
2465
2466        // Empty streaming array
2467        let data = Bytes::from("*?\r\n.\r\n");
2468        let (frame, _) = parse_streaming_sequence(data).unwrap();
2469        assert_eq!(frame, Frame::StreamedArray(vec![]));
2470
2471        // Empty streaming set
2472        let data = Bytes::from("~?\r\n.\r\n");
2473        let (frame, _) = parse_streaming_sequence(data).unwrap();
2474        assert_eq!(frame, Frame::StreamedSet(vec![]));
2475
2476        // Empty streaming map
2477        let data = Bytes::from("%?\r\n.\r\n");
2478        let (frame, _) = parse_streaming_sequence(data).unwrap();
2479        assert_eq!(frame, Frame::StreamedMap(vec![]));
2480    }
2481
2482    #[test]
2483    fn test_streaming_attribute_odd_elements_errors() {
2484        let data = Bytes::from("|?\r\n+key\r\n+val\r\n+orphan\r\n.\r\n");
2485        let result = parse_streaming_sequence(data);
2486        assert!(matches!(result, Err(ParseError::InvalidFormat)));
2487    }
2488
2489    #[test]
2490    fn test_streaming_blob_error_header_passthrough() {
2491        // Blob error streaming is not supported; header is passed through
2492        let data = Bytes::from("!?\r\n!5\r\nERROR\r\n");
2493        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2494        assert_eq!(frame, Frame::StreamedBlobErrorHeader);
2495        // Rest contains the subsequent data
2496        assert!(!rest.is_empty());
2497    }
2498
2499    #[test]
2500    fn test_streaming_verbatim_header_passthrough() {
2501        // Verbatim string streaming is not supported; header is passed through
2502        let data = Bytes::from("=?\r\n=9\r\ntxt:hello\r\n");
2503        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2504        assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
2505        assert!(!rest.is_empty());
2506    }
2507
2508    #[test]
2509    fn frame_as_bytes() {
2510        assert_eq!(
2511            Frame::SimpleString(Bytes::from("OK")).as_bytes(),
2512            Some(&Bytes::from("OK"))
2513        );
2514        assert_eq!(
2515            Frame::BlobError(Bytes::from("ERR")).as_bytes(),
2516            Some(&Bytes::from("ERR"))
2517        );
2518        assert_eq!(
2519            Frame::BigNumber(Bytes::from("123")).as_bytes(),
2520            Some(&Bytes::from("123"))
2521        );
2522        assert_eq!(Frame::BulkString(None).as_bytes(), None);
2523        assert_eq!(Frame::Null.as_bytes(), None);
2524        assert_eq!(Frame::Boolean(true).as_bytes(), None);
2525    }
2526
2527    #[test]
2528    fn frame_as_str() {
2529        assert_eq!(Frame::SimpleString(Bytes::from("OK")).as_str(), Some("OK"));
2530        assert_eq!(
2531            Frame::BulkString(Some(Bytes::from_static(&[0xFF]))).as_str(),
2532            None
2533        );
2534    }
2535
2536    #[test]
2537    fn frame_scalar_accessors() {
2538        assert_eq!(Frame::Integer(42).as_integer(), Some(42));
2539        assert_eq!(Frame::Double(3.5).as_double(), Some(3.5));
2540        assert_eq!(Frame::Boolean(true).as_boolean(), Some(true));
2541        // Wrong type returns None
2542        assert_eq!(Frame::Integer(42).as_double(), None);
2543        assert_eq!(Frame::Double(3.5).as_integer(), None);
2544    }
2545
2546    #[test]
2547    fn frame_collection_accessors() {
2548        let arr = Frame::Array(Some(vec![Frame::Integer(1)]));
2549        assert_eq!(arr.as_array(), Some([Frame::Integer(1)].as_slice()));
2550        assert_eq!(Frame::Array(None).as_array(), None);
2551
2552        let set = Frame::Set(vec![Frame::Integer(1)]);
2553        assert_eq!(set.as_set(), Some([Frame::Integer(1)].as_slice()));
2554
2555        let map = Frame::Map(vec![(
2556            Frame::SimpleString(Bytes::from("k")),
2557            Frame::Integer(1),
2558        )]);
2559        assert!(map.as_map().is_some());
2560
2561        let push = Frame::Push(vec![Frame::Integer(1)]);
2562        assert_eq!(push.as_push(), Some([Frame::Integer(1)].as_slice()));
2563    }
2564
2565    #[test]
2566    fn frame_verbatim_string_accessor() {
2567        let v = Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello"));
2568        let (fmt, content) = v.as_verbatim_string().unwrap();
2569        assert_eq!(fmt, &Bytes::from("txt"));
2570        assert_eq!(content, &Bytes::from("hello"));
2571        assert_eq!(Frame::Null.as_verbatim_string(), None);
2572    }
2573
2574    #[test]
2575    fn frame_into_conversions() {
2576        assert_eq!(
2577            Frame::Array(Some(vec![Frame::Integer(1)])).into_array(),
2578            Ok(vec![Frame::Integer(1)])
2579        );
2580        assert!(Frame::Array(None).into_array().is_err());
2581
2582        assert_eq!(
2583            Frame::BulkString(Some(Bytes::from("x"))).into_bulk_string(),
2584            Ok(Bytes::from("x"))
2585        );
2586        assert!(Frame::BulkString(None).into_bulk_string().is_err());
2587
2588        let map = Frame::Map(vec![(Frame::Integer(1), Frame::Integer(2))]);
2589        assert!(map.into_map().is_ok());
2590
2591        let set = Frame::Set(vec![Frame::Integer(1)]);
2592        assert!(set.into_set().is_ok());
2593    }
2594
2595    #[test]
2596    fn frame_is_null() {
2597        assert!(Frame::Null.is_null());
2598        assert!(Frame::BulkString(None).is_null());
2599        assert!(Frame::Array(None).is_null());
2600        assert!(!Frame::BulkString(Some(Bytes::new())).is_null());
2601        assert!(!Frame::Integer(0).is_null());
2602    }
2603
2604    #[test]
2605    fn frame_is_error() {
2606        assert!(Frame::Error(Bytes::from("ERR")).is_error());
2607        assert!(Frame::BlobError(Bytes::from("ERR")).is_error());
2608        assert!(!Frame::SimpleString(Bytes::from("OK")).is_error());
2609    }
2610}