Skip to main content

resp_rs/
resp3.rs

1//! Zero-copy RESP3 parser.
2//!
3//! Parses RESP3 frames using `bytes::Bytes` for efficient, zero-copy operation.
4//! Supports all RESP3 data types, including fixed-length and streaming variants.
5//!
6//! # Performance
7//!
8//! RESP3 parsing is roughly 3x slower than RESP2 for simple types due to the
9//! larger type tag match. The gap narrows for collection-heavy workloads. For
10//! complete buffers, call [`parse_frame`] directly rather than using [`Parser`]
11//! (see [crate-level performance docs](crate#performance)).
12//!
13//! # Protocol permissiveness
14//!
15//! - **Simple strings and errors** are treated as raw bytes, not validated UTF-8.
16//!   The parser accepts any byte sequence that does not contain `\r` or `\n`.
17//! - **Double parsing** accepts case-insensitive and non-canonical float spellings
18//!   (e.g., `INF`, `Infinity`, `NAN`) via Rust's `f64::parse`, then normalizes
19//!   them to canonical [`Frame::SpecialFloat`] values (`inf`, `-inf`, `nan`).
20//!   This means roundtrip is semantic (value-preserving) but not lexical
21//!   (byte-preserving) for non-canonical inputs.
22//! - **Streaming support** for blob errors and verbatim strings is limited:
23//!   `parse_streaming_sequence` passes through their streaming headers
24//!   (`!?\r\n`, `=?\r\n`) without accumulation, since RESP3 does not define
25//!   a chunk format for these types. Use low-level `parse_frame` to handle
26//!   these headers manually if needed.
27
28use bytes::{BufMut, Bytes, BytesMut};
29
30/// Maximum reasonable size for collections to prevent DoS attacks.
31const MAX_COLLECTION_SIZE: usize = 10_000_000;
32
33/// Maximum reasonable size for bulk string/blob/chunk payloads (512 MB).
34const MAX_BULK_STRING_SIZE: usize = 512 * 1024 * 1024;
35
36/// A streaming parser for RESP3 frames.
37///
38/// This parser allows feeding data in chunks and extracting frames as they become available.
39/// It maintains an internal buffer of accumulated data and attempts to parse frames from it.
40#[derive(Default, Debug)]
41pub struct Parser {
42    buffer: BytesMut,
43}
44
45impl Parser {
46    /// Creates a new parser with an empty buffer.
47    pub fn new() -> Self {
48        Self {
49            buffer: BytesMut::new(),
50        }
51    }
52
53    /// Feeds a chunk of data into the parser.
54    ///
55    /// The data is appended to the internal buffer.
56    pub fn feed(&mut self, data: Bytes) {
57        self.buffer.extend_from_slice(&data);
58    }
59
60    /// Attempts to extract the next complete frame from the buffer.
61    ///
62    /// Returns `Ok(None)` if there is not enough data to parse a complete frame.
63    /// Returns `Ok(Some(frame))` on success, consuming the parsed bytes.
64    /// Returns `Err` on protocol errors, clearing the buffer.
65    pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
66        if self.buffer.is_empty() {
67            return Ok(None);
68        }
69
70        let bytes = self.buffer.split().freeze();
71
72        match parse_frame_inner(&bytes, 0) {
73            Ok((frame, consumed)) => {
74                if consumed < bytes.len() {
75                    self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
76                }
77                Ok(Some(frame))
78            }
79            Err(ParseError::Incomplete) => {
80                self.buffer.unsplit(bytes.into());
81                Ok(None)
82            }
83            Err(e) => {
84                // Buffer was emptied by split() above; intentionally not restored
85                // on hard errors so the parser doesn't re-parse corrupt data.
86                Err(e)
87            }
88        }
89    }
90
91    /// Returns the number of bytes currently in the buffer.
92    pub fn buffered_bytes(&self) -> usize {
93        self.buffer.len()
94    }
95
96    /// Clears the internal buffer.
97    pub fn clear(&mut self) {
98        self.buffer.clear();
99    }
100}
101
102// --- Zero-copy Frame enum and parser using bytes::Bytes ---
103/// A parsed RESP3 frame.
104///
105/// Each variant corresponds to one of the RESP3 types, including both fixed-length
106/// and streaming headers for bulk strings, arrays, sets, maps, attributes, and pushes.
107#[derive(Debug, Clone, PartialEq)]
108pub enum Frame {
109    /// Simple string: +&lt;string&gt;\r\n
110    SimpleString(Bytes),
111    /// Simple error: -&lt;error&gt;\r\n
112    Error(Bytes),
113    /// Integer: :&lt;number&gt;\r\n
114    Integer(i64),
115    /// Blob string: $&lt;length&gt;\r\n&lt;bytes&gt;\r\n
116    // BulkString(Option<Vec<u8>>),
117    BulkString(Option<Bytes>),
118    /// Blob error: !&lt;length&gt;\r\n&lt;bytes&gt;\r\n
119    BlobError(Bytes),
120    /// Streaming blob string header: $?\r\n
121    StreamedStringHeader,
122    /// Streaming blob error header: !?\r\n
123    StreamedBlobErrorHeader,
124    /// Streaming verbatim string header: =?\r\n
125    StreamedVerbatimStringHeader,
126    /// Streaming array header: *?\r\n
127    StreamedArrayHeader,
128    /// Streaming set header: ~?\r\n
129    StreamedSetHeader,
130    /// Streaming map header: %?\r\n
131    StreamedMapHeader,
132    /// Streaming attribute header: |?\r\n
133    StreamedAttributeHeader,
134    /// Streaming push header: >?\r\n
135    StreamedPushHeader,
136    /// Streaming string chunk: ;length\r\ndata\r\n
137    ///
138    /// Represents an individual chunk in a streaming string sequence.
139    /// These chunks are parsed from `;{length}\r\n{data}\r\n` format.
140    /// A zero-length chunk (`;0\r\n`) indicates the end of the stream.
141    ///
142    /// # Example
143    /// ```
144    /// use resp_rs::resp3::Frame;
145    /// use bytes::Bytes;
146    ///
147    /// // Chunk containing "Hello"
148    /// // Wire format: ;5\r\nHello\r\n
149    /// Frame::StreamedStringChunk(Bytes::from("Hello"));
150    /// ```
151    StreamedStringChunk(Bytes),
152
153    /// Accumulated streaming string data from multiple chunks
154    ///
155    /// Created by `parse_streaming_sequence()` when parsing a complete
156    /// streaming string sequence (`$?\r\n` + chunks + `;0\r\n`).
157    /// Contains all chunks in order, allowing reconstruction of the full string.
158    ///
159    /// # Example
160    /// ```
161    /// use resp_rs::resp3::Frame;
162    /// use bytes::Bytes;
163    ///
164    /// // Represents "Hello world" from chunks ["Hello ", "world"]
165    /// Frame::StreamedString(vec![
166    ///     Bytes::from("Hello "),
167    ///     Bytes::from("world")
168    /// ]);
169    /// ```
170    StreamedString(Vec<Bytes>),
171
172    /// Accumulated streaming array data from multiple chunks
173    ///
174    /// Created when parsing a streaming array sequence (`*?\r\n` + frames + `.\r\n`).
175    /// Contains all frames that were streamed as part of the array.
176    ///
177    /// # Example
178    /// ```
179    /// use resp_rs::resp3::Frame;
180    /// use bytes::Bytes;
181    ///
182    /// // Array with mixed types
183    /// Frame::StreamedArray(vec![
184    ///     Frame::SimpleString(Bytes::from("hello")),
185    ///     Frame::Integer(42),
186    ///     Frame::Boolean(true)
187    /// ]);
188    /// ```
189    StreamedArray(Vec<Frame>),
190
191    /// Accumulated streaming set data from multiple chunks
192    ///
193    /// Created when parsing a streaming set sequence (`~?\r\n` + frames + `.\r\n`).
194    /// Contains all unique elements that were streamed as part of the set.
195    StreamedSet(Vec<Frame>),
196
197    /// Accumulated streaming map data from multiple chunks
198    ///
199    /// Created when parsing a streaming map sequence (`%?\r\n` + key-value pairs + `.\r\n`).
200    /// Contains all key-value pairs that were streamed as part of the map.
201    ///
202    /// # Example
203    /// ```
204    /// use resp_rs::resp3::Frame;
205    /// use bytes::Bytes;
206    ///
207    /// Frame::StreamedMap(vec![
208    ///     (Frame::SimpleString(Bytes::from("name")), Frame::SimpleString(Bytes::from("Alice"))),
209    ///     (Frame::SimpleString(Bytes::from("age")), Frame::Integer(25))
210    /// ]);
211    /// ```
212    StreamedMap(Vec<(Frame, Frame)>),
213
214    /// Accumulated streaming attribute data from multiple chunks
215    ///
216    /// Created when parsing a streaming attribute sequence (`|?\r\n` + key-value pairs + `.\r\n`).
217    /// Attributes provide out-of-band metadata that doesn't affect the main data structure.
218    StreamedAttribute(Vec<(Frame, Frame)>),
219
220    /// Accumulated streaming push data from multiple chunks
221    ///
222    /// Created when parsing a streaming push sequence (`>?\r\n` + frames + `.\r\n`).
223    /// Push messages are server-initiated communications (e.g., pub/sub messages).
224    ///
225    /// # Example
226    /// ```
227    /// use resp_rs::resp3::Frame;
228    /// use bytes::Bytes;
229    ///
230    /// // Pub/sub message
231    /// Frame::StreamedPush(vec![
232    ///     Frame::SimpleString(Bytes::from("pubsub")),
233    ///     Frame::SimpleString(Bytes::from("channel1")),
234    ///     Frame::SimpleString(Bytes::from("message content"))
235    /// ]);
236    /// ```
237    StreamedPush(Vec<Frame>),
238    /// End-of-stream terminator for all chunked sequences: .\r\n
239    StreamTerminator,
240    /// Null: _\r\n
241    Null,
242    /// Double: ,&lt;float&gt;\r\n
243    Double(f64),
244    /// Special Float: ,inf\r\n, -inf\r\n, nan\r\n
245    SpecialFloat(Bytes),
246    /// Boolean: #t\r\n or #f\r\n
247    Boolean(bool),
248    /// Big number: (&lt;number&gt;\r\n
249    BigNumber(Bytes),
250    /// Verbatim string: =format:content\r\n
251    // VerbatimString { format: String, content: String },
252    VerbatimString(Bytes, Bytes),
253    /// Array: *&lt;count&gt;\r\n... (or streaming header *?\r\n)
254    Array(Option<Vec<Frame>>),
255    /// Set: ~&lt;count&gt;\r\n... (or streaming header ~?\r\n)
256    Set(Vec<Frame>),
257    /// Map: %&lt;count&gt;\r\n... (or streaming header %?\r\n)
258    Map(Vec<(Frame, Frame)>),
259    /// Attribute: |&lt;count&gt;\r\n... (or streaming header |?\r\n)
260    Attribute(Vec<(Frame, Frame)>),
261    /// Push: > &lt;count&gt;\r\n... (or streaming header >?\r\n)
262    Push(Vec<Frame>),
263}
264
265pub use crate::ParseError;
266
267/// Parse a single RESP3 frame from the provided `Bytes`.
268///
269/// Returns the parsed `Frame` and the remaining unconsumed `Bytes`, or a `ParseError` on failure.
270pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
271    let (frame, consumed) = parse_frame_inner(&input, 0)?;
272    Ok((frame, input.slice(consumed..)))
273}
274
275/// Parse a length-prefixed blob: shared logic for bulk string, blob error, and
276/// streamed string chunks. Returns `(data_start, data_end, after_crlf)` on success.
277#[inline(never)]
278fn parse_blob_bounds(
279    buf: &[u8],
280    after_crlf: usize,
281    len: usize,
282) -> Result<(usize, usize), ParseError> {
283    if len == 0 {
284        if after_crlf + 1 >= buf.len() {
285            return Err(ParseError::Incomplete);
286        }
287        if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
288            return Ok((after_crlf, after_crlf));
289        } else {
290            return Err(ParseError::InvalidFormat);
291        }
292    }
293    let data_start = after_crlf;
294    let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
295    if data_end + 1 >= buf.len() {
296        return Err(ParseError::Incomplete);
297    }
298    if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
299        return Err(ParseError::InvalidFormat);
300    }
301    Ok((data_start, data_end))
302}
303
304/// Parse a bulk string frame (`$`).
305fn parse_bulk_string(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
306    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
307    let len_bytes = &buf[pos + 1..line_end];
308    if len_bytes == b"?" {
309        return Ok((Frame::StreamedStringHeader, after_crlf));
310    }
311    if len_bytes == b"-1" {
312        return Ok((Frame::BulkString(None), after_crlf));
313    }
314    let len = parse_usize(len_bytes)?;
315    if len > MAX_BULK_STRING_SIZE {
316        return Err(ParseError::BadLength);
317    }
318    let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
319    if data_start == data_end {
320        Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2))
321    } else {
322        Ok((
323            Frame::BulkString(Some(input.slice(data_start..data_end))),
324            data_end + 2,
325        ))
326    }
327}
328
329/// Parse a double frame (`,`).
330#[inline(never)]
331fn parse_double_frame(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
332    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
333    let line_bytes = &buf[pos + 1..line_end];
334    if line_bytes == b"inf" || line_bytes == b"-inf" || line_bytes == b"nan" {
335        return Ok((
336            Frame::SpecialFloat(input.slice(pos + 1..line_end)),
337            after_crlf,
338        ));
339    }
340    let s = std::str::from_utf8(line_bytes).map_err(|_| ParseError::Utf8Error)?;
341    let v = s.parse::<f64>().map_err(|_| ParseError::InvalidFormat)?;
342    if v.is_infinite() || v.is_nan() {
343        let canonical = if v.is_nan() {
344            "nan"
345        } else if v.is_sign_negative() {
346            "-inf"
347        } else {
348            "inf"
349        };
350        return Ok((Frame::SpecialFloat(Bytes::from(canonical)), after_crlf));
351    }
352    Ok((Frame::Double(v), after_crlf))
353}
354
355/// Parse a verbatim string frame (`=`).
356#[inline(never)]
357fn parse_verbatim(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
358    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
359    let len_bytes = &buf[pos + 1..line_end];
360    if len_bytes == b"?" {
361        return Ok((Frame::StreamedVerbatimStringHeader, after_crlf));
362    }
363    if len_bytes == b"-1" {
364        return Err(ParseError::BadLength);
365    }
366    let len = parse_usize(len_bytes)?;
367    if len > MAX_BULK_STRING_SIZE {
368        return Err(ParseError::BadLength);
369    }
370    let data_start = after_crlf;
371    let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
372    if data_end + 1 >= buf.len() {
373        return Err(ParseError::Incomplete);
374    }
375    if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
376        return Err(ParseError::InvalidFormat);
377    }
378    let sep = buf[data_start..data_end]
379        .iter()
380        .position(|&b| b == b':')
381        .ok_or(ParseError::InvalidFormat)?;
382    if sep != 3 {
383        return Err(ParseError::InvalidFormat);
384    }
385    let format = input.slice(data_start..data_start + sep);
386    let content = input.slice(data_start + sep + 1..data_end);
387    Ok((Frame::VerbatimString(format, content), data_end + 2))
388}
389
390/// Parse a blob error frame (`!`).
391#[inline(never)]
392fn parse_blob_error(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
393    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
394    let len_bytes = &buf[pos + 1..line_end];
395    if len_bytes == b"?" {
396        return Ok((Frame::StreamedBlobErrorHeader, after_crlf));
397    }
398    if len_bytes == b"-1" {
399        return Err(ParseError::BadLength);
400    }
401    let len = parse_usize(len_bytes)?;
402    if len > MAX_BULK_STRING_SIZE {
403        return Err(ParseError::BadLength);
404    }
405    let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
406    if data_start == data_end {
407        Ok((Frame::BlobError(Bytes::new()), after_crlf + 2))
408    } else {
409        Ok((
410            Frame::BlobError(input.slice(data_start..data_end)),
411            data_end + 2,
412        ))
413    }
414}
415
416/// Parse a collection (array, set, push) with element count.
417#[inline(never)]
418fn parse_collection(
419    input: &Bytes,
420    buf: &[u8],
421    pos: usize,
422    tag: u8,
423) -> Result<(Frame, usize), ParseError> {
424    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
425    let len_bytes = &buf[pos + 1..line_end];
426
427    // Streaming headers
428    if len_bytes == b"?" {
429        return match tag {
430            b'*' => Ok((Frame::StreamedArrayHeader, after_crlf)),
431            b'~' => Ok((Frame::StreamedSetHeader, after_crlf)),
432            b'>' => Ok((Frame::StreamedPushHeader, after_crlf)),
433            _ => unreachable!(),
434        };
435    }
436    // Null array
437    if tag == b'*' && len_bytes == b"-1" {
438        return Ok((Frame::Array(None), after_crlf));
439    }
440    let count = parse_count(len_bytes)?;
441    if count == 0 {
442        return match tag {
443            b'*' => Ok((Frame::Array(Some(Vec::new())), after_crlf)),
444            b'~' => Ok((Frame::Set(Vec::new()), after_crlf)),
445            b'>' => Ok((Frame::Push(Vec::new()), after_crlf)),
446            _ => unreachable!(),
447        };
448    }
449    let mut cursor = after_crlf;
450    let mut items = Vec::with_capacity(count);
451    for _ in 0..count {
452        let (item, next) = parse_frame_inner(input, cursor)?;
453        items.push(item);
454        cursor = next;
455    }
456    match tag {
457        b'*' => Ok((Frame::Array(Some(items)), cursor)),
458        b'~' => Ok((Frame::Set(items), cursor)),
459        b'>' => Ok((Frame::Push(items), cursor)),
460        _ => unreachable!(),
461    }
462}
463
464/// Parse a map or attribute (`%` / `|`).
465#[inline(never)]
466fn parse_pairs(
467    input: &Bytes,
468    buf: &[u8],
469    pos: usize,
470    tag: u8,
471) -> Result<(Frame, usize), ParseError> {
472    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
473    let len_bytes = &buf[pos + 1..line_end];
474    if len_bytes == b"?" {
475        return if tag == b'%' {
476            Ok((Frame::StreamedMapHeader, after_crlf))
477        } else {
478            Ok((Frame::StreamedAttributeHeader, after_crlf))
479        };
480    }
481    let count = parse_count(len_bytes)?;
482    let mut cursor = after_crlf;
483    let mut pairs = Vec::with_capacity(count);
484    for _ in 0..count {
485        let (key, next1) = parse_frame_inner(input, cursor)?;
486        let (val, next2) = parse_frame_inner(input, next1)?;
487        pairs.push((key, val));
488        cursor = next2;
489    }
490    if tag == b'%' {
491        Ok((Frame::Map(pairs), cursor))
492    } else {
493        Ok((Frame::Attribute(pairs), cursor))
494    }
495}
496
497/// Parse a streamed string chunk (`;`).
498#[inline(never)]
499fn parse_streamed_chunk(
500    input: &Bytes,
501    buf: &[u8],
502    pos: usize,
503) -> Result<(Frame, usize), ParseError> {
504    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
505    let len = parse_usize(&buf[pos + 1..line_end])?;
506    if len > MAX_BULK_STRING_SIZE {
507        return Err(ParseError::BadLength);
508    }
509    let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
510    if data_start == data_end {
511        Ok((Frame::StreamedStringChunk(Bytes::new()), after_crlf + 2))
512    } else {
513        Ok((
514            Frame::StreamedStringChunk(input.slice(data_start..data_end)),
515            data_end + 2,
516        ))
517    }
518}
519
520/// Offset-based internal parser. The match body is kept minimal to reduce
521/// instruction-cache pressure; heavy arms are extracted into `#[inline(never)]`
522/// helpers so the hot dispatch stays small.
523fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
524    let buf = input.as_ref();
525    if pos >= buf.len() {
526        return Err(ParseError::Incomplete);
527    }
528
529    let tag = buf[pos];
530
531    match tag {
532        // Line-based types (small, stay inline)
533        b'+' => {
534            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
535            Ok((
536                Frame::SimpleString(input.slice(pos + 1..line_end)),
537                after_crlf,
538            ))
539        }
540        b'-' => {
541            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
542            Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
543        }
544        b':' => {
545            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
546            let v = parse_i64(&buf[pos + 1..line_end])?;
547            Ok((Frame::Integer(v), after_crlf))
548        }
549        b'#' => {
550            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
551            match &buf[pos + 1..line_end] {
552                b"t" => Ok((Frame::Boolean(true), after_crlf)),
553                b"f" => Ok((Frame::Boolean(false), after_crlf)),
554                _ => Err(ParseError::InvalidBoolean),
555            }
556        }
557        b'(' => {
558            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
559            Ok((Frame::BigNumber(input.slice(pos + 1..line_end)), after_crlf))
560        }
561        b'_' => {
562            if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
563                Ok((Frame::Null, pos + 3))
564            } else {
565                Err(ParseError::Incomplete)
566            }
567        }
568        b'.' => {
569            if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
570                Ok((Frame::StreamTerminator, pos + 3))
571            } else {
572                Err(ParseError::Incomplete)
573            }
574        }
575
576        // Length-prefixed types (extracted to reduce icache pressure)
577        b'$' => parse_bulk_string(input, buf, pos),
578        b',' => parse_double_frame(input, buf, pos),
579        b'=' => parse_verbatim(input, buf, pos),
580        b'!' => parse_blob_error(input, buf, pos),
581        b';' => parse_streamed_chunk(input, buf, pos),
582
583        // Collections (extracted)
584        b'*' | b'~' | b'>' => parse_collection(input, buf, pos, tag),
585        b'%' | b'|' => parse_pairs(input, buf, pos, tag),
586
587        _ => Err(ParseError::InvalidTag(tag)),
588    }
589}
590
591#[cfg(feature = "unsafe-internals")]
592#[path = "resp3_unchecked.rs"]
593mod unchecked;
594#[cfg(feature = "unsafe-internals")]
595pub use unchecked::parse_frame_unchecked;
596
597/// Parse a complete RESP3 streaming sequence, accumulating chunks until termination.
598///
599/// This function handles RESP3 streaming sequences that begin with streaming headers
600/// (`$?`, `*?`, `~?`, `%?`, `|?`, `>?`) and accumulates the following data until
601/// the appropriate terminator is encountered.
602///
603/// # Streaming Types Supported
604///
605/// - **Streaming Strings**: `$?\r\n` followed by chunks terminated with `;0\r\n`
606/// - **Streaming Arrays**: `*?\r\n` followed by frames terminated with `.\r\n`
607/// - **Streaming Sets**: `~?\r\n` followed by frames terminated with `.\r\n`
608/// - **Streaming Maps**: `%?\r\n` followed by key-value pairs terminated with `.\r\n`
609/// - **Streaming Attributes**: `|?\r\n` followed by key-value pairs terminated with `.\r\n`
610/// - **Streaming Push**: `>?\r\n` followed by frames terminated with `.\r\n`
611///
612/// # Examples
613///
614/// ## Streaming String
615/// ```rust
616/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
617/// use bytes::Bytes;
618///
619/// let data = Bytes::from("$?\r\n;4\r\nHell\r\n;6\r\no worl\r\n;1\r\nd\r\n;0\r\n\r\n");
620/// let (frame, rest) = parse_streaming_sequence(data).unwrap();
621///
622/// if let Frame::StreamedString(chunks) = frame {
623///     assert_eq!(chunks.len(), 3);
624///     let full_string: String = chunks.iter()
625///         .map(|chunk| std::str::from_utf8(chunk).unwrap())
626///         .collect::<Vec<_>>()
627///         .join("");
628///     assert_eq!(full_string, "Hello world");
629/// }
630/// assert!(rest.is_empty());
631/// ```
632///
633/// ## Streaming Array
634/// ```rust
635/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
636/// use bytes::Bytes;
637///
638/// let data = Bytes::from("*?\r\n+hello\r\n:42\r\n#t\r\n.\r\n");
639/// let (frame, _) = parse_streaming_sequence(data).unwrap();
640///
641/// if let Frame::StreamedArray(items) = frame {
642///     assert_eq!(items.len(), 3);
643///     // items[0] = SimpleString("hello")
644///     // items[1] = Integer(42)
645///     // items[2] = Boolean(true)
646/// }
647/// ```
648///
649/// ## Streaming Map
650/// ```rust
651/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
652/// use bytes::Bytes;
653///
654/// let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+key2\r\n:123\r\n.\r\n");
655/// let (frame, _) = parse_streaming_sequence(data).unwrap();
656///
657/// if let Frame::StreamedMap(pairs) = frame {
658///     assert_eq!(pairs.len(), 2);
659///     // pairs[0] = (SimpleString("key1"), SimpleString("val1"))
660///     // pairs[1] = (SimpleString("key2"), Integer(123))
661/// }
662/// ```
663///
664/// # Errors
665///
666/// Returns `ParseError::Incomplete` if the stream is not complete or if required
667/// terminators are missing. Returns `ParseError::InvalidFormat` for malformed
668/// chunk data or unexpected frame types within streaming sequences.
669///
670/// # Notes
671///
672/// - For non-streaming frames, this function simply returns the parsed frame
673/// - Streaming string chunks are accumulated in order
674/// - All other streaming types accumulate complete frames until termination
675/// - Zero-copy parsing is used where possible to minimize allocations
676pub fn parse_streaming_sequence(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
677    if input.is_empty() {
678        return Err(ParseError::Incomplete);
679    }
680
681    let (header, mut rest) = parse_frame(input)?;
682
683    match header {
684        Frame::StreamedStringHeader => {
685            // Parse streaming string chunks until zero-length chunk
686            let mut chunks = Vec::new();
687
688            loop {
689                let (frame, new_rest) = parse_frame(rest)?;
690                rest = new_rest;
691
692                match frame {
693                    Frame::StreamedStringChunk(chunk) => {
694                        if chunk.is_empty() {
695                            // Zero-length chunk indicates end of stream
696                            break;
697                        }
698                        chunks.push(chunk);
699                    }
700                    _ => {
701                        return Err(ParseError::InvalidFormat);
702                    }
703                }
704            }
705
706            Ok((Frame::StreamedString(chunks), rest))
707        }
708        Frame::StreamedBlobErrorHeader | Frame::StreamedVerbatimStringHeader => {
709            // RESP3 does not define a streaming chunk format for blob errors
710            // or verbatim strings. Return the header as-is for low-level consumers.
711            Ok((header, rest))
712        }
713        Frame::StreamedArrayHeader => {
714            // Parse streaming array items until terminator
715            let mut items = Vec::new();
716
717            loop {
718                let (frame, new_rest) = parse_frame(rest)?;
719                rest = new_rest;
720
721                match frame {
722                    Frame::StreamTerminator => {
723                        break;
724                    }
725                    item => {
726                        items.push(item);
727                    }
728                }
729            }
730
731            Ok((Frame::StreamedArray(items), rest))
732        }
733        Frame::StreamedSetHeader => {
734            // Parse streaming set items until terminator
735            let mut items = Vec::new();
736
737            loop {
738                let (frame, new_rest) = parse_frame(rest)?;
739                rest = new_rest;
740
741                match frame {
742                    Frame::StreamTerminator => {
743                        break;
744                    }
745                    item => {
746                        items.push(item);
747                    }
748                }
749            }
750
751            Ok((Frame::StreamedSet(items), rest))
752        }
753        Frame::StreamedMapHeader => {
754            // Parse streaming map pairs until terminator
755            let mut pairs = Vec::new();
756
757            loop {
758                let (frame, new_rest) = parse_frame(rest)?;
759                rest = new_rest;
760
761                match frame {
762                    Frame::StreamTerminator => {
763                        break;
764                    }
765                    key => {
766                        let (value, newer_rest) = parse_frame(rest)?;
767                        if matches!(value, Frame::StreamTerminator) {
768                            return Err(ParseError::InvalidFormat);
769                        }
770                        rest = newer_rest;
771                        pairs.push((key, value));
772                    }
773                }
774            }
775
776            Ok((Frame::StreamedMap(pairs), rest))
777        }
778        Frame::StreamedAttributeHeader => {
779            // Parse streaming attribute pairs until terminator
780            let mut pairs = Vec::new();
781
782            loop {
783                let (frame, new_rest) = parse_frame(rest)?;
784                rest = new_rest;
785
786                match frame {
787                    Frame::StreamTerminator => {
788                        break;
789                    }
790                    key => {
791                        let (value, newer_rest) = parse_frame(rest)?;
792                        if matches!(value, Frame::StreamTerminator) {
793                            return Err(ParseError::InvalidFormat);
794                        }
795                        rest = newer_rest;
796                        pairs.push((key, value));
797                    }
798                }
799            }
800
801            Ok((Frame::StreamedAttribute(pairs), rest))
802        }
803        Frame::StreamedPushHeader => {
804            // Parse streaming push items until terminator
805            let mut items = Vec::new();
806
807            loop {
808                let (frame, new_rest) = parse_frame(rest)?;
809                rest = new_rest;
810
811                match frame {
812                    Frame::StreamTerminator => {
813                        break;
814                    }
815                    item => {
816                        items.push(item);
817                    }
818                }
819            }
820
821            Ok((Frame::StreamedPush(items), rest))
822        }
823        _ => {
824            // Not a streaming sequence, just return the original frame
825            Ok((header, rest))
826        }
827    }
828}
829
830/// Find `\r\n` in `buf` starting at `from`. Returns `(line_end, after_crlf)` where
831/// `line_end` is the position of `\r` and `after_crlf` is the position after `\n`.
832#[inline]
833fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
834    let mut i = from;
835    let len = buf.len();
836    while i + 1 < len {
837        if buf[i] == b'\r' && buf[i + 1] == b'\n' {
838            return Ok((i, i + 2));
839        }
840        i += 1;
841    }
842    Err(ParseError::Incomplete)
843}
844
845/// Parse a `usize` directly from ASCII digit bytes, no UTF-8 validation needed.
846#[inline]
847fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
848    if buf.is_empty() {
849        return Err(ParseError::BadLength);
850    }
851    let mut v: usize = 0;
852    for &b in buf {
853        if !b.is_ascii_digit() {
854            return Err(ParseError::BadLength);
855        }
856        v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
857        v = v
858            .checked_add((b - b'0') as usize)
859            .ok_or(ParseError::BadLength)?;
860    }
861    Ok(v)
862}
863
864/// Parse an `i64` directly from ASCII bytes (optional leading `-`), no UTF-8 validation.
865#[inline]
866fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
867    if buf.is_empty() {
868        return Err(ParseError::InvalidFormat);
869    }
870    let (neg, digits) = if buf[0] == b'-' {
871        (true, &buf[1..])
872    } else {
873        (false, buf)
874    };
875    if digits.is_empty() {
876        return Err(ParseError::InvalidFormat);
877    }
878    let mut v: i64 = 0;
879    for (i, &d) in digits.iter().enumerate() {
880        if !d.is_ascii_digit() {
881            return Err(ParseError::InvalidFormat);
882        }
883        let digit = (d - b'0') as i64;
884        if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
885            return Ok(i64::MIN);
886        }
887        if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
888            return Err(ParseError::Overflow);
889        }
890        v = v * 10 + digit;
891    }
892    if neg { Ok(-v) } else { Ok(v) }
893}
894
895/// Parse a collection count (usize) with MAX_COLLECTION_SIZE check.
896#[inline]
897fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
898    let count = parse_usize(buf)?;
899    if count > MAX_COLLECTION_SIZE {
900        return Err(ParseError::BadLength);
901    }
902    Ok(count)
903}
904
905/// Converts a Frame to its RESP3 byte representation.
906///
907/// This function serializes a Frame into the corresponding RESP3 protocol bytes.
908pub fn frame_to_bytes(frame: &Frame) -> Bytes {
909    let mut buf = BytesMut::new();
910    serialize_frame(frame, &mut buf);
911    buf.freeze()
912}
913
914fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
915    match frame {
916        Frame::SimpleString(s) => {
917            buf.put_u8(b'+');
918            buf.extend_from_slice(s);
919            buf.extend_from_slice(b"\r\n");
920        }
921        Frame::Error(e) => {
922            buf.put_u8(b'-');
923            buf.extend_from_slice(e);
924            buf.extend_from_slice(b"\r\n");
925        }
926        Frame::Integer(i) => {
927            buf.put_u8(b':');
928            let s = i.to_string();
929            buf.extend_from_slice(s.as_bytes());
930            buf.extend_from_slice(b"\r\n");
931        }
932        Frame::BulkString(opt) => {
933            buf.put_u8(b'$');
934            match opt {
935                Some(data) => {
936                    let len = data.len().to_string();
937                    buf.extend_from_slice(len.as_bytes());
938                    buf.extend_from_slice(b"\r\n");
939                    buf.extend_from_slice(data);
940                    buf.extend_from_slice(b"\r\n");
941                }
942                None => {
943                    buf.extend_from_slice(b"-1\r\n");
944                }
945            }
946        }
947        Frame::BlobError(data) => {
948            buf.put_u8(b'!');
949            let len = data.len().to_string();
950            buf.extend_from_slice(len.as_bytes());
951            buf.extend_from_slice(b"\r\n");
952            buf.extend_from_slice(data);
953            buf.extend_from_slice(b"\r\n");
954        }
955        Frame::StreamedStringHeader => {
956            buf.extend_from_slice(b"$?\r\n");
957        }
958        Frame::StreamedBlobErrorHeader => {
959            buf.extend_from_slice(b"!?\r\n");
960        }
961        Frame::StreamedVerbatimStringHeader => {
962            buf.extend_from_slice(b"=?\r\n");
963        }
964        Frame::StreamedArrayHeader => {
965            buf.extend_from_slice(b"*?\r\n");
966        }
967        Frame::StreamedSetHeader => {
968            buf.extend_from_slice(b"~?\r\n");
969        }
970        Frame::StreamedMapHeader => {
971            buf.extend_from_slice(b"%?\r\n");
972        }
973        Frame::StreamedAttributeHeader => {
974            buf.extend_from_slice(b"|?\r\n");
975        }
976        Frame::StreamedPushHeader => {
977            buf.extend_from_slice(b">?\r\n");
978        }
979        Frame::StreamedStringChunk(data) => {
980            buf.put_u8(b';');
981            let len = data.len().to_string();
982            buf.extend_from_slice(len.as_bytes());
983            buf.extend_from_slice(b"\r\n");
984            buf.extend_from_slice(data);
985            buf.extend_from_slice(b"\r\n");
986        }
987        Frame::StreamedString(chunks) => {
988            // Serialize as streaming string sequence: $?\r\n + chunks + terminator
989            buf.extend_from_slice(b"$?\r\n");
990            for chunk in chunks {
991                buf.put_u8(b';');
992                let len = chunk.len().to_string();
993                buf.extend_from_slice(len.as_bytes());
994                buf.extend_from_slice(b"\r\n");
995                buf.extend_from_slice(chunk);
996                buf.extend_from_slice(b"\r\n");
997            }
998            buf.extend_from_slice(b";0\r\n\r\n");
999        }
1000        Frame::StreamedArray(items) => {
1001            buf.extend_from_slice(b"*?\r\n");
1002            for item in items {
1003                serialize_frame(item, buf);
1004            }
1005            buf.extend_from_slice(b".\r\n");
1006        }
1007        Frame::StreamedSet(items) => {
1008            buf.extend_from_slice(b"~?\r\n");
1009            for item in items {
1010                serialize_frame(item, buf);
1011            }
1012            buf.extend_from_slice(b".\r\n");
1013        }
1014        Frame::StreamedMap(pairs) => {
1015            buf.extend_from_slice(b"%?\r\n");
1016            for (key, value) in pairs {
1017                serialize_frame(key, buf);
1018                serialize_frame(value, buf);
1019            }
1020            buf.extend_from_slice(b".\r\n");
1021        }
1022        Frame::StreamedAttribute(pairs) => {
1023            buf.extend_from_slice(b"|?\r\n");
1024            for (key, value) in pairs {
1025                serialize_frame(key, buf);
1026                serialize_frame(value, buf);
1027            }
1028            buf.extend_from_slice(b".\r\n");
1029        }
1030        Frame::StreamedPush(items) => {
1031            buf.extend_from_slice(b">?\r\n");
1032            for item in items {
1033                serialize_frame(item, buf);
1034            }
1035            buf.extend_from_slice(b".\r\n");
1036        }
1037        Frame::StreamTerminator => {
1038            buf.extend_from_slice(b".\r\n");
1039        }
1040        Frame::Null => {
1041            buf.extend_from_slice(b"_\r\n");
1042        }
1043        Frame::Double(d) => {
1044            buf.put_u8(b',');
1045            let s = d.to_string();
1046            buf.extend_from_slice(s.as_bytes());
1047            buf.extend_from_slice(b"\r\n");
1048        }
1049        Frame::SpecialFloat(f) => {
1050            buf.put_u8(b',');
1051            buf.extend_from_slice(f);
1052            buf.extend_from_slice(b"\r\n");
1053        }
1054        Frame::Boolean(b) => {
1055            buf.extend_from_slice(if *b { b"#t\r\n" } else { b"#f\r\n" });
1056        }
1057        Frame::BigNumber(n) => {
1058            buf.put_u8(b'(');
1059            buf.extend_from_slice(n);
1060            buf.extend_from_slice(b"\r\n");
1061        }
1062        Frame::VerbatimString(format, content) => {
1063            buf.put_u8(b'=');
1064            let total_len = format.len() + 1 + content.len(); // +1 for the colon
1065            let len = total_len.to_string();
1066            buf.extend_from_slice(len.as_bytes());
1067            buf.extend_from_slice(b"\r\n");
1068            buf.extend_from_slice(format);
1069            buf.put_u8(b':');
1070            buf.extend_from_slice(content);
1071            buf.extend_from_slice(b"\r\n");
1072        }
1073        Frame::Array(opt) => {
1074            buf.put_u8(b'*');
1075            match opt {
1076                Some(items) => {
1077                    let len = items.len().to_string();
1078                    buf.extend_from_slice(len.as_bytes());
1079                    buf.extend_from_slice(b"\r\n");
1080                    for item in items {
1081                        serialize_frame(item, buf);
1082                    }
1083                }
1084                None => {
1085                    buf.extend_from_slice(b"-1\r\n");
1086                }
1087            }
1088        }
1089        Frame::Set(items) => {
1090            buf.put_u8(b'~');
1091            let len = items.len().to_string();
1092            buf.extend_from_slice(len.as_bytes());
1093            buf.extend_from_slice(b"\r\n");
1094            for item in items {
1095                serialize_frame(item, buf);
1096            }
1097        }
1098        Frame::Map(pairs) => {
1099            buf.put_u8(b'%');
1100            let len = pairs.len().to_string();
1101            buf.extend_from_slice(len.as_bytes());
1102            buf.extend_from_slice(b"\r\n");
1103            for (key, value) in pairs {
1104                serialize_frame(key, buf);
1105                serialize_frame(value, buf);
1106            }
1107        }
1108        Frame::Attribute(pairs) => {
1109            buf.put_u8(b'|');
1110            let len = pairs.len().to_string();
1111            buf.extend_from_slice(len.as_bytes());
1112            buf.extend_from_slice(b"\r\n");
1113            for (key, value) in pairs {
1114                serialize_frame(key, buf);
1115                serialize_frame(value, buf);
1116            }
1117        }
1118        Frame::Push(items) => {
1119            buf.put_u8(b'>');
1120            let len = items.len().to_string();
1121            buf.extend_from_slice(len.as_bytes());
1122            buf.extend_from_slice(b"\r\n");
1123            for item in items {
1124                serialize_frame(item, buf);
1125            }
1126        }
1127    }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132    use super::{Frame, ParseError, Parser, frame_to_bytes, parse_frame, parse_streaming_sequence};
1133    use bytes::Bytes;
1134
1135    #[test]
1136    fn test_parse_frame_simple_string() {
1137        let input = Bytes::from("+HELLO\r\nWORLD");
1138        let (frame, rest) = parse_frame(input.clone()).unwrap();
1139        assert_eq!(frame, Frame::SimpleString(Bytes::from("HELLO")));
1140        assert_eq!(rest, Bytes::from("WORLD"));
1141    }
1142
1143    #[test]
1144    fn test_parse_frame_blob_error() {
1145        let input = Bytes::from("!5\r\nERROR\r\nREST");
1146        let (frame, rest) = parse_frame(input.clone()).unwrap();
1147        assert_eq!(frame, Frame::BlobError(Bytes::from("ERROR")));
1148        assert_eq!(rest, Bytes::from("REST"));
1149    }
1150
1151    #[test]
1152    fn test_parse_frame_error() {
1153        let input = Bytes::from("-ERR fail\r\nLEFT");
1154        let (frame, rest) = parse_frame(input.clone()).unwrap();
1155        assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
1156        assert_eq!(rest, Bytes::from("LEFT"));
1157    }
1158
1159    #[test]
1160    fn test_parse_frame_integer() {
1161        let input = Bytes::from(":42\r\nTAIL");
1162        let (frame, rest) = parse_frame(input.clone()).unwrap();
1163        assert_eq!(frame, Frame::Integer(42));
1164        assert_eq!(rest, Bytes::from("TAIL"));
1165    }
1166
1167    #[test]
1168    fn test_parse_frame_bulk_string() {
1169        let input = Bytes::from("$3\r\nfoo\r\nREST");
1170        let (frame, rest) = parse_frame(input.clone()).unwrap();
1171        assert_eq!(frame, Frame::BulkString(Some(Bytes::from("foo"))));
1172        assert_eq!(rest, Bytes::from("REST"));
1173        let null_input = Bytes::from("$-1\r\nAFTER");
1174        let (frame, rest) = parse_frame(null_input.clone()).unwrap();
1175        assert_eq!(frame, Frame::BulkString(None));
1176        assert_eq!(rest, Bytes::from("AFTER"));
1177    }
1178
1179    #[test]
1180    fn test_parse_frame_null() {
1181        let input = Bytes::from("_\r\nLEFT");
1182        let (frame, rest) = parse_frame(input.clone()).unwrap();
1183        assert_eq!(frame, Frame::Null);
1184        assert_eq!(rest, Bytes::from("LEFT"));
1185    }
1186
1187    #[test]
1188    fn test_parse_frame_double_and_special_float() {
1189        let input = Bytes::from(",3.5\r\nNEXT");
1190        let (frame, rest) = parse_frame(input.clone()).unwrap();
1191        assert_eq!(frame, Frame::Double(3.5));
1192        assert_eq!(rest, Bytes::from("NEXT"));
1193        let input_inf = Bytes::from(",inf\r\nTAIL");
1194        let (frame, rest) = parse_frame(input_inf.clone()).unwrap();
1195        assert_eq!(frame, Frame::SpecialFloat(Bytes::from("inf")));
1196        assert_eq!(rest, Bytes::from("TAIL"));
1197    }
1198
1199    #[test]
1200    fn test_parse_frame_boolean() {
1201        let input_true = Bytes::from("#t\r\nXYZ");
1202        let (frame, rest) = parse_frame(input_true.clone()).unwrap();
1203        assert_eq!(frame, Frame::Boolean(true));
1204        assert_eq!(rest, Bytes::from("XYZ"));
1205        let input_false = Bytes::from("#f\r\nDONE");
1206        let (frame, rest) = parse_frame(input_false.clone()).unwrap();
1207        assert_eq!(frame, Frame::Boolean(false));
1208        assert_eq!(rest, Bytes::from("DONE"));
1209    }
1210
1211    #[test]
1212    fn test_parse_frame_big_number() {
1213        let input = Bytes::from("(123456789\r\nEND");
1214        let (frame, rest) = parse_frame(input.clone()).unwrap();
1215        assert_eq!(frame, Frame::BigNumber(Bytes::from("123456789")));
1216        assert_eq!(rest, Bytes::from("END"));
1217    }
1218
1219    #[test]
1220    fn test_parse_frame_verbatim_string() {
1221        let input = Bytes::from("=12\r\ntxt:hi there\r\nAFTER");
1222        let (frame, rest) = parse_frame(input.clone()).unwrap();
1223        assert_eq!(
1224            frame,
1225            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hi there")) // Frame::VerbatimString {
1226                                                                               //     format: "txt".to_string(),
1227                                                                               //     content: "hi there".to_string()
1228                                                                               // }
1229        );
1230        assert_eq!(rest, Bytes::from("AFTER"));
1231    }
1232
1233    #[test]
1234    fn test_parse_frame_array_set_push_map_attribute() {
1235        // Array of two bulk strings
1236        let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\nTAIL");
1237        let (frame, rest) = parse_frame(input.clone()).unwrap();
1238        assert_eq!(
1239            frame,
1240            Frame::Array(Some(vec![
1241                Frame::BulkString(Some(Bytes::from("foo"))),
1242                Frame::BulkString(Some(Bytes::from("bar")))
1243            ]))
1244        );
1245        assert_eq!(rest, Bytes::from("TAIL"));
1246        // Null array
1247        let input_null = Bytes::from("*-1\r\nEND");
1248        let (frame, rest) = parse_frame(input_null.clone()).unwrap();
1249        assert_eq!(frame, Frame::Array(None));
1250        assert_eq!(rest, Bytes::from("END"));
1251        // Set of two simple strings
1252        let input_set = Bytes::from("~2\r\n+foo\r\n+bar\r\nTAIL");
1253        let (frame, rest) = parse_frame(input_set.clone()).unwrap();
1254        assert_eq!(
1255            frame,
1256            Frame::Set(vec![
1257                Frame::SimpleString(Bytes::from("foo")),
1258                Frame::SimpleString(Bytes::from("bar")),
1259            ])
1260        );
1261        assert_eq!(rest, Bytes::from("TAIL"));
1262        // Map of two key-value pairs
1263        let input_map = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\nTRAIL");
1264        let (frame, rest) = parse_frame(input_map.clone()).unwrap();
1265        assert_eq!(
1266            frame,
1267            Frame::Map(vec![
1268                (
1269                    Frame::SimpleString(Bytes::from("key1")),
1270                    Frame::SimpleString(Bytes::from("val1"))
1271                ),
1272                (
1273                    Frame::SimpleString(Bytes::from("key2")),
1274                    Frame::SimpleString(Bytes::from("val2"))
1275                ),
1276            ])
1277        );
1278        assert_eq!(rest, Bytes::from("TRAIL"));
1279        // Attribute
1280        let input_attr = Bytes::from("|1\r\n+meta\r\n+data\r\nAFTER");
1281        let (frame, rest) = parse_frame(input_attr.clone()).unwrap();
1282        assert_eq!(
1283            frame,
1284            Frame::Attribute(vec![(
1285                Frame::SimpleString(Bytes::from("meta")),
1286                Frame::SimpleString(Bytes::from("data"))
1287            ),])
1288        );
1289        assert_eq!(rest, Bytes::from("AFTER"));
1290        // Push
1291        let input_push = Bytes::from(">2\r\n+type\r\n:1\r\nNEXT");
1292        let (frame, rest) = parse_frame(input_push.clone()).unwrap();
1293        assert_eq!(
1294            frame,
1295            Frame::Push(vec![
1296                Frame::SimpleString(Bytes::from("type")),
1297                Frame::Integer(1),
1298            ])
1299        );
1300        assert_eq!(rest, Bytes::from("NEXT"));
1301    }
1302
1303    #[test]
1304    fn test_parse_frame_empty_input() {
1305        assert!(parse_frame(Bytes::new()).is_err());
1306    }
1307
1308    #[test]
1309    fn test_parse_frame_invalid_tag() {
1310        let input = Bytes::from("X123\r\n");
1311        assert!(parse_frame(input).is_err());
1312    }
1313
1314    #[test]
1315    fn test_parse_frame_malformed_bulk_length() {
1316        let input = Bytes::from("$x\r\nfoo\r\n");
1317        assert!(parse_frame(input).is_err());
1318    }
1319
1320    #[test]
1321    fn test_parse_frame_zero_length_bulk() {
1322        let input = Bytes::from("$0\r\n\r\nTAIL");
1323        let (frame, rest) = parse_frame(input.clone()).unwrap();
1324        assert_eq!(frame, Frame::BulkString(Some(Bytes::from(""))));
1325        assert_eq!(rest, Bytes::from("TAIL"));
1326    }
1327
1328    #[test]
1329    fn test_parse_frame_zero_length_blob_error() {
1330        let input = Bytes::from("!0\r\n\r\nREST");
1331        let (frame, rest) = parse_frame(input.clone()).unwrap();
1332        assert_eq!(frame, Frame::BlobError(Bytes::new()));
1333        assert_eq!(rest, Bytes::from("REST"));
1334    }
1335
1336    #[test]
1337    fn test_parse_frame_missing_crlf() {
1338        let input = Bytes::from(":42\nTAIL");
1339        assert!(parse_frame(input).is_err());
1340    }
1341
1342    #[test]
1343    fn test_parse_frame_unicode_simple_string() {
1344        let input = Bytes::from("+こんにちは\r\nEND");
1345        let (frame, rest) = parse_frame(input.clone()).unwrap();
1346        assert_eq!(frame, Frame::SimpleString(Bytes::from("こんにちは")));
1347        assert_eq!(rest, Bytes::from("END"));
1348    }
1349
1350    #[test]
1351    fn test_parse_frame_chained_frames() {
1352        let combined = Bytes::from("+OK\r\n:1\r\nfoo");
1353        let (f1, rem) = parse_frame(combined.clone()).unwrap();
1354        assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
1355        let (f2, rem2) = parse_frame(rem).unwrap();
1356        assert_eq!(f2, Frame::Integer(1));
1357        assert_eq!(rem2, Bytes::from("foo"));
1358    }
1359
1360    #[test]
1361    fn test_parse_frame_empty_array() {
1362        let input = Bytes::from("*0\r\nTAIL");
1363        let (frame, rest) = parse_frame(input.clone()).unwrap();
1364        assert_eq!(frame, Frame::Array(Some(vec![])));
1365        assert_eq!(rest, Bytes::from("TAIL"));
1366    }
1367
1368    #[test]
1369    fn test_parse_frame_partial_array_data() {
1370        let input = Bytes::from("*2\r\n+OK\r\n");
1371        assert!(parse_frame(input).is_err());
1372    }
1373
1374    #[test]
1375    fn test_parse_frame_streamed_string() {
1376        let input = Bytes::from("$?\r\n$5\r\nhello\r\n$0\r\n\r\nREST");
1377        let (frame, rem) = parse_frame(input.clone()).unwrap();
1378        assert_eq!(frame, Frame::StreamedStringHeader);
1379        let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1380        assert_eq!(chunk, Frame::BulkString(Some(Bytes::from("hello"))));
1381        let (terminator, rest) = parse_frame(rem2.clone()).unwrap();
1382        assert_eq!(terminator, Frame::BulkString(Some(Bytes::from(""))));
1383        assert_eq!(rest, Bytes::from("REST"));
1384    }
1385
1386    #[test]
1387    fn test_parse_frame_streamed_blob_error() {
1388        let input = Bytes::from("!?\r\n!5\r\nERROR\r\nREST");
1389        let (frame, rem) = parse_frame(input.clone()).unwrap();
1390        assert_eq!(frame, Frame::StreamedBlobErrorHeader);
1391        let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1392        assert_eq!(chunk, Frame::BlobError(Bytes::from("ERROR")));
1393        assert_eq!(rem2, Bytes::from("REST"));
1394    }
1395
1396    #[test]
1397    fn test_parse_frame_streamed_verbatim_string() {
1398        let input = Bytes::from("=?\r\n=9\r\ntxt:hello\r\nTAIL");
1399        let (frame, rem) = parse_frame(input.clone()).unwrap();
1400        assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
1401        let (chunk, rest) = parse_frame(rem.clone()).unwrap();
1402        assert_eq!(
1403            chunk,
1404            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello")) // Frame::VerbatimString {
1405                                                                            //     format: "txt".to_string(),
1406                                                                            //     content: "hello".to_string()
1407                                                                            // }
1408        );
1409        assert_eq!(rest, Bytes::from("TAIL"));
1410    }
1411
1412    #[test]
1413    fn test_parse_frame_streamed_array() {
1414        let input = Bytes::from("*?\r\n+one\r\n+two\r\n*0\r\nEND");
1415        let (header, rem) = parse_frame(input.clone()).unwrap();
1416        assert_eq!(header, Frame::StreamedArrayHeader);
1417        let (item1, rem2) = parse_frame(rem.clone()).unwrap();
1418        assert_eq!(item1, Frame::SimpleString(Bytes::from("one")));
1419        let (item2, rem3) = parse_frame(rem2.clone()).unwrap();
1420        assert_eq!(item2, Frame::SimpleString(Bytes::from("two")));
1421        let (terminator, rest) = parse_frame(rem3.clone()).unwrap();
1422        assert_eq!(terminator, Frame::Array(Some(vec![])));
1423        assert_eq!(rest, Bytes::from("END"));
1424    }
1425
1426    #[test]
1427    fn test_parse_frame_streamed_set_map_attr_push() {
1428        // Set
1429        let input_set = Bytes::from("~?\r\n+foo\r\n+bar\r\n~0\r\nTAIL");
1430        let (h_set, rem0) = parse_frame(input_set.clone()).unwrap();
1431        assert_eq!(h_set, Frame::StreamedSetHeader);
1432        let (s1, rem1) = parse_frame(rem0.clone()).unwrap();
1433        assert_eq!(s1, Frame::SimpleString(Bytes::from("foo")));
1434        let (s2, rem2) = parse_frame(rem1.clone()).unwrap();
1435        assert_eq!(s2, Frame::SimpleString(Bytes::from("bar")));
1436        let (term_set, rest_set) = parse_frame(rem2.clone()).unwrap();
1437        assert_eq!(term_set, Frame::Set(vec![]));
1438        assert_eq!(rest_set, Bytes::from("TAIL"));
1439        // Map
1440        let input_map = Bytes::from("%?\r\n+key\r\n+val\r\n%0\r\nNEXT");
1441        let (h_map, rem_map) = parse_frame(input_map.clone()).unwrap();
1442        assert_eq!(h_map, Frame::StreamedMapHeader);
1443        let (k, rem_map2) = parse_frame(rem_map.clone()).unwrap();
1444        assert_eq!(k, Frame::SimpleString(Bytes::from("key")));
1445        let (v, rem_map3) = parse_frame(rem_map2.clone()).unwrap();
1446        assert_eq!(v, Frame::SimpleString(Bytes::from("val")));
1447        let (term_map, rest_map4) = parse_frame(rem_map3.clone()).unwrap();
1448        assert_eq!(term_map, Frame::Map(vec![]));
1449        assert_eq!(rest_map4, Bytes::from("NEXT"));
1450        // Attribute
1451        let input_attr = Bytes::from("|?\r\n+meta\r\n+info\r\n|0\r\nMORE");
1452        let (h_attr, rem_attr) = parse_frame(input_attr.clone()).unwrap();
1453        assert_eq!(h_attr, Frame::StreamedAttributeHeader);
1454        let (a1, rem_attr2) = parse_frame(rem_attr.clone()).unwrap();
1455        assert_eq!(a1, Frame::SimpleString(Bytes::from("meta")));
1456        let (a2, rem_attr3) = parse_frame(rem_attr2.clone()).unwrap();
1457        assert_eq!(a2, Frame::SimpleString(Bytes::from("info")));
1458        let (term_attr, rest_attr) = parse_frame(rem_attr3.clone()).unwrap();
1459        assert_eq!(term_attr, Frame::Attribute(vec![]));
1460        assert_eq!(rest_attr, Bytes::from("MORE"));
1461        // Push
1462        let input_push = Bytes::from(">?\r\n:1\r\n:2\r\n>0\r\nEND");
1463        let (h_push, rem_push) = parse_frame(input_push.clone()).unwrap();
1464        assert_eq!(h_push, Frame::StreamedPushHeader);
1465        let (p1, rem_push2) = parse_frame(rem_push.clone()).unwrap();
1466        assert_eq!(p1, Frame::Integer(1));
1467        let (p2, rem_push3) = parse_frame(rem_push2.clone()).unwrap();
1468        assert_eq!(p2, Frame::Integer(2));
1469        let (term_push, rest_push) = parse_frame(rem_push3.clone()).unwrap();
1470        assert_eq!(term_push, Frame::Push(vec![]));
1471        assert_eq!(rest_push, Bytes::from("END"));
1472    }
1473
1474    #[test]
1475    fn test_parse_frame_stream_terminator() {
1476        let input = Bytes::from(".\r\nREST");
1477        let (frame, rest) = parse_frame(input.clone()).unwrap();
1478        assert_eq!(frame, Frame::StreamTerminator);
1479        assert_eq!(rest, Bytes::from("REST"));
1480    }
1481
1482    #[test]
1483    fn test_parse_frame_null_blob_error_rejected() {
1484        let input = Bytes::from("!-1\r\nTAIL");
1485        assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1486    }
1487
1488    #[test]
1489    fn test_parse_frame_null_verbatim_rejected() {
1490        let input = Bytes::from("=-1\r\nTAIL");
1491        assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1492    }
1493
1494    #[test]
1495    fn test_verbatim_string_format_must_be_3_bytes() {
1496        // Too short (1 byte format)
1497        let input = Bytes::from("=6\r\nx:data\r\n");
1498        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1499
1500        // Too long (4 byte format)
1501        let input = Bytes::from("=9\r\ntxtx:data\r\n");
1502        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1503
1504        // Empty format (colon at start)
1505        let input = Bytes::from("=5\r\n:data\r\n");
1506        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1507
1508        // Valid 3-byte format should still work
1509        let input = Bytes::from("=8\r\ntxt:data\r\n");
1510        let (frame, _) = parse_frame(input).unwrap();
1511        assert_eq!(
1512            frame,
1513            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("data"))
1514        );
1515    }
1516
1517    #[test]
1518    fn test_parse_frame_special_float_nan() {
1519        let input = Bytes::from(",nan\r\nTAIL");
1520        let (frame, rest) = parse_frame(input.clone()).unwrap();
1521        assert_eq!(frame, Frame::SpecialFloat(Bytes::from("nan")));
1522        assert_eq!(rest, Bytes::from("TAIL"));
1523    }
1524
1525    #[test]
1526    fn test_parse_frame_big_number_zero() {
1527        let input = Bytes::from("(0\r\nEND");
1528        let (frame, rest) = parse_frame(input.clone()).unwrap();
1529        assert_eq!(frame, Frame::BigNumber(Bytes::from("0")));
1530        assert_eq!(rest, Bytes::from("END"));
1531    }
1532
1533    #[test]
1534    fn test_parse_frame_collection_empty() {
1535        let input_push = Bytes::from(">0\r\nTAIL");
1536        let (f_push, r_push) = parse_frame(input_push.clone()).unwrap();
1537        assert_eq!(f_push, Frame::Push(vec![]));
1538        assert_eq!(r_push, Bytes::from("TAIL"));
1539        let input_attr = Bytes::from("|0\r\nAFTER");
1540        let (f_attr, r_attr) = parse_frame(input_attr.clone()).unwrap();
1541        assert_eq!(f_attr, Frame::Attribute(vec![]));
1542        assert_eq!(r_attr, Bytes::from("AFTER"));
1543        let input_map = Bytes::from("%0\r\nEND");
1544        let (f_map, r_map) = parse_frame(input_map.clone()).unwrap();
1545        assert_eq!(f_map, Frame::Map(vec![]));
1546        assert_eq!(r_map, Bytes::from("END"));
1547        let input_set = Bytes::from("~0\r\nDONE");
1548        let (f_set, r_set) = parse_frame(input_set.clone()).unwrap();
1549        assert_eq!(f_set, Frame::Set(vec![]));
1550        assert_eq!(r_set, Bytes::from("DONE"));
1551        let input_arr = Bytes::from("*-1\r\nFIN");
1552        let (f_arr, r_arr) = parse_frame(input_arr.clone()).unwrap();
1553        assert_eq!(f_arr, Frame::Array(None));
1554        assert_eq!(r_arr, Bytes::from("FIN"));
1555    }
1556
1557    // Round-trip tests for serialization and parsing
1558
1559    #[test]
1560    fn test_roundtrip_simple_string() {
1561        let original = Bytes::from("+hello\r\n");
1562        let (frame, _) = parse_frame(original.clone()).unwrap();
1563        let serialized = frame_to_bytes(&frame);
1564        assert_eq!(original, serialized);
1565
1566        let (reparsed, _) = parse_frame(serialized).unwrap();
1567        assert_eq!(frame, reparsed);
1568    }
1569
1570    #[test]
1571    fn test_roundtrip_error() {
1572        let original = Bytes::from("-ERR error message\r\n");
1573        let (frame, _) = parse_frame(original.clone()).unwrap();
1574        let serialized = frame_to_bytes(&frame);
1575        assert_eq!(original, serialized);
1576
1577        let (reparsed, _) = parse_frame(serialized).unwrap();
1578        assert_eq!(frame, reparsed);
1579    }
1580
1581    #[test]
1582    fn test_roundtrip_integer() {
1583        let original = Bytes::from(":12345\r\n");
1584        let (frame, _) = parse_frame(original.clone()).unwrap();
1585        let serialized = frame_to_bytes(&frame);
1586        assert_eq!(original, serialized);
1587
1588        let (reparsed, _) = parse_frame(serialized).unwrap();
1589        assert_eq!(frame, reparsed);
1590    }
1591
1592    #[test]
1593    fn test_roundtrip_bulk_string() {
1594        let original = Bytes::from("$5\r\nhello\r\n");
1595        let (frame, _) = parse_frame(original.clone()).unwrap();
1596        let serialized = frame_to_bytes(&frame);
1597        assert_eq!(original, serialized);
1598
1599        let (reparsed, _) = parse_frame(serialized).unwrap();
1600        assert_eq!(frame, reparsed);
1601
1602        // Test null bulk string
1603        let original_null = Bytes::from("$-1\r\n");
1604        let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1605        let serialized_null = frame_to_bytes(&frame_null);
1606        assert_eq!(original_null, serialized_null);
1607
1608        let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1609        assert_eq!(frame_null, reparsed_null);
1610    }
1611
1612    #[test]
1613    fn test_roundtrip_blob_error() {
1614        let original = Bytes::from("!5\r\nerror\r\n");
1615        let (frame, _) = parse_frame(original.clone()).unwrap();
1616        let serialized = frame_to_bytes(&frame);
1617        assert_eq!(original, serialized);
1618
1619        let (reparsed, _) = parse_frame(serialized).unwrap();
1620        assert_eq!(frame, reparsed);
1621    }
1622
1623    #[test]
1624    fn test_roundtrip_null() {
1625        let original = Bytes::from("_\r\n");
1626        let (frame, _) = parse_frame(original.clone()).unwrap();
1627        let serialized = frame_to_bytes(&frame);
1628        assert_eq!(original, serialized);
1629
1630        let (reparsed, _) = parse_frame(serialized).unwrap();
1631        assert_eq!(frame, reparsed);
1632    }
1633
1634    #[test]
1635    fn test_roundtrip_double() {
1636        let original = Bytes::from(",3.14159\r\n");
1637        let (frame, _) = parse_frame(original.clone()).unwrap();
1638        let serialized = frame_to_bytes(&frame);
1639
1640        // Note: The exact string representation of floating point numbers might differ
1641        // between parsing and serializing due to formatting differences
1642        let (reparsed, _) = parse_frame(serialized).unwrap();
1643        assert_eq!(frame, reparsed);
1644    }
1645
1646    #[test]
1647    fn test_roundtrip_special_float() {
1648        let original = Bytes::from(",inf\r\n");
1649        let (frame, _) = parse_frame(original.clone()).unwrap();
1650        let serialized = frame_to_bytes(&frame);
1651        assert_eq!(original, serialized);
1652
1653        let (reparsed, _) = parse_frame(serialized).unwrap();
1654        assert_eq!(frame, reparsed);
1655    }
1656
1657    #[test]
1658    fn test_roundtrip_boolean() {
1659        let original_true = Bytes::from("#t\r\n");
1660        let (frame_true, _) = parse_frame(original_true.clone()).unwrap();
1661        let serialized_true = frame_to_bytes(&frame_true);
1662        assert_eq!(original_true, serialized_true);
1663
1664        let (reparsed_true, _) = parse_frame(serialized_true).unwrap();
1665        assert_eq!(frame_true, reparsed_true);
1666
1667        let original_false = Bytes::from("#f\r\n");
1668        let (frame_false, _) = parse_frame(original_false.clone()).unwrap();
1669        let serialized_false = frame_to_bytes(&frame_false);
1670        assert_eq!(original_false, serialized_false);
1671
1672        let (reparsed_false, _) = parse_frame(serialized_false).unwrap();
1673        assert_eq!(frame_false, reparsed_false);
1674    }
1675
1676    #[test]
1677    fn test_roundtrip_big_number() {
1678        let original = Bytes::from("(12345678901234567890\r\n");
1679        let (frame, _) = parse_frame(original.clone()).unwrap();
1680        let serialized = frame_to_bytes(&frame);
1681        assert_eq!(original, serialized);
1682
1683        let (reparsed, _) = parse_frame(serialized).unwrap();
1684        assert_eq!(frame, reparsed);
1685    }
1686
1687    #[test]
1688    fn test_roundtrip_verbatim_string() {
1689        let original = Bytes::from("=10\r\ntxt:hello!\r\n");
1690        let (frame, _) = parse_frame(original.clone()).unwrap();
1691        let serialized = frame_to_bytes(&frame);
1692        assert_eq!(original, serialized);
1693
1694        let (reparsed, _) = parse_frame(serialized).unwrap();
1695        assert_eq!(frame, reparsed);
1696    }
1697
1698    #[test]
1699    fn test_roundtrip_array() {
1700        let original = Bytes::from("*2\r\n+hello\r\n:123\r\n");
1701        let (frame, _) = parse_frame(original.clone()).unwrap();
1702        let serialized = frame_to_bytes(&frame);
1703        assert_eq!(original, serialized);
1704
1705        let (reparsed, _) = parse_frame(serialized).unwrap();
1706        assert_eq!(frame, reparsed);
1707
1708        // Test null array
1709        let original_null = Bytes::from("*-1\r\n");
1710        let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1711        let serialized_null = frame_to_bytes(&frame_null);
1712        assert_eq!(original_null, serialized_null);
1713
1714        let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1715        assert_eq!(frame_null, reparsed_null);
1716    }
1717
1718    #[test]
1719    fn test_roundtrip_set() {
1720        let original = Bytes::from("~2\r\n+one\r\n+two\r\n");
1721        let (frame, _) = parse_frame(original.clone()).unwrap();
1722        let serialized = frame_to_bytes(&frame);
1723        assert_eq!(original, serialized);
1724
1725        let (reparsed, _) = parse_frame(serialized).unwrap();
1726        assert_eq!(frame, reparsed);
1727    }
1728
1729    #[test]
1730    fn test_roundtrip_map() {
1731        let original = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\n");
1732        let (frame, _) = parse_frame(original.clone()).unwrap();
1733        let serialized = frame_to_bytes(&frame);
1734        assert_eq!(original, serialized);
1735
1736        let (reparsed, _) = parse_frame(serialized).unwrap();
1737        assert_eq!(frame, reparsed);
1738    }
1739
1740    #[test]
1741    fn test_roundtrip_attribute() {
1742        let original = Bytes::from("|1\r\n+key\r\n+val\r\n");
1743        let (frame, _) = parse_frame(original.clone()).unwrap();
1744        let serialized = frame_to_bytes(&frame);
1745        assert_eq!(original, serialized);
1746
1747        let (reparsed, _) = parse_frame(serialized).unwrap();
1748        assert_eq!(frame, reparsed);
1749    }
1750
1751    #[test]
1752    fn test_roundtrip_push() {
1753        let original = Bytes::from(">2\r\n+msg\r\n+data\r\n");
1754        let (frame, _) = parse_frame(original.clone()).unwrap();
1755        let serialized = frame_to_bytes(&frame);
1756        assert_eq!(original, serialized);
1757
1758        let (reparsed, _) = parse_frame(serialized).unwrap();
1759        assert_eq!(frame, reparsed);
1760    }
1761
1762    #[test]
1763    fn test_roundtrip_streaming_headers() {
1764        let headers = [
1765            ("$?\r\n", Frame::StreamedStringHeader),
1766            ("!?\r\n", Frame::StreamedBlobErrorHeader),
1767            ("=?\r\n", Frame::StreamedVerbatimStringHeader),
1768            ("*?\r\n", Frame::StreamedArrayHeader),
1769            ("~?\r\n", Frame::StreamedSetHeader),
1770            ("%?\r\n", Frame::StreamedMapHeader),
1771            ("|?\r\n", Frame::StreamedAttributeHeader),
1772            (">?\r\n", Frame::StreamedPushHeader),
1773            (".\r\n", Frame::StreamTerminator),
1774        ];
1775
1776        for (original_str, expected_frame) in headers {
1777            let original = Bytes::from(original_str);
1778            let (frame, _) = parse_frame(original.clone()).unwrap();
1779            assert_eq!(frame, expected_frame);
1780
1781            let serialized = frame_to_bytes(&frame);
1782            assert_eq!(original, serialized);
1783
1784            let (reparsed, _) = parse_frame(serialized).unwrap();
1785            assert_eq!(frame, reparsed);
1786        }
1787    }
1788
1789    #[test]
1790    fn test_roundtrip_streaming_chunks() {
1791        let chunks = [
1792            (
1793                ";4\r\nHell\r\n",
1794                Frame::StreamedStringChunk(Bytes::from("Hell")),
1795            ),
1796            (
1797                ";5\r\no wor\r\n",
1798                Frame::StreamedStringChunk(Bytes::from("o wor")),
1799            ),
1800            (";1\r\nd\r\n", Frame::StreamedStringChunk(Bytes::from("d"))),
1801            (";0\r\n\r\n", Frame::StreamedStringChunk(Bytes::new())),
1802            (
1803                ";11\r\nHello World\r\n",
1804                Frame::StreamedStringChunk(Bytes::from("Hello World")),
1805            ),
1806        ];
1807
1808        for (original_str, expected_frame) in chunks {
1809            let original = Bytes::from(original_str);
1810            let (frame, rest) = parse_frame(original.clone()).unwrap();
1811            assert_eq!(frame, expected_frame);
1812            assert!(rest.is_empty());
1813
1814            let serialized = frame_to_bytes(&frame);
1815            assert_eq!(original, serialized);
1816
1817            let (reparsed, _) = parse_frame(serialized).unwrap();
1818            assert_eq!(frame, reparsed);
1819        }
1820    }
1821
1822    #[test]
1823    fn test_streaming_chunks_edge_cases() {
1824        // Test incomplete chunk (missing data)
1825        let data = Bytes::from(";4\r\nHel");
1826        let result = parse_frame(data);
1827        assert!(matches!(result, Err(ParseError::Incomplete)));
1828
1829        // Test incomplete chunk (missing CRLF)
1830        let data = Bytes::from(";4\r\nHell");
1831        let result = parse_frame(data);
1832        assert!(matches!(result, Err(ParseError::Incomplete)));
1833
1834        // Test invalid length format
1835        let data = Bytes::from(";abc\r\ndata\r\n");
1836        let result = parse_frame(data);
1837        assert!(matches!(result, Err(ParseError::BadLength)));
1838
1839        // Test negative length
1840        let data = Bytes::from(";-1\r\ndata\r\n");
1841        let result = parse_frame(data);
1842        assert!(matches!(result, Err(ParseError::BadLength)));
1843
1844        // Test length mismatch (length says 5 but only 4 bytes)
1845        let data = Bytes::from(";5\r\nHell\r\n");
1846        let result = parse_frame(data);
1847        assert!(matches!(result, Err(ParseError::Incomplete)));
1848
1849        // Test zero-length chunk without trailing CRLF returns Incomplete
1850        let data = Bytes::from(";0\r\n");
1851        let result = parse_frame(data);
1852        assert!(matches!(result, Err(ParseError::Incomplete)));
1853
1854        // Test binary data in chunk
1855        let binary_data = b"\x00\x01\x02\x03\xFF";
1856        let mut chunk_data = Vec::new();
1857        chunk_data.extend_from_slice(b";5\r\n");
1858        chunk_data.extend_from_slice(binary_data);
1859        chunk_data.extend_from_slice(b"\r\n");
1860        let data = Bytes::from(chunk_data);
1861        let result = parse_frame(data);
1862        assert!(result.is_ok());
1863        let (frame, _) = result.unwrap();
1864        if let Frame::StreamedStringChunk(chunk) = frame {
1865            assert_eq!(chunk.as_ref(), binary_data);
1866        }
1867    }
1868
1869    #[test]
1870    fn test_roundtrip_streaming_sequences() {
1871        // Test streaming string roundtrip
1872        let streaming_string = Frame::StreamedString(vec![
1873            Bytes::from("Hell"),
1874            Bytes::from("o wor"),
1875            Bytes::from("ld"),
1876        ]);
1877        let serialized = frame_to_bytes(&streaming_string);
1878        let expected = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;2\r\nld\r\n;0\r\n\r\n";
1879        assert_eq!(serialized, Bytes::from(expected));
1880
1881        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1882        assert_eq!(parsed, streaming_string);
1883
1884        // Test streaming array roundtrip
1885        let streaming_array = Frame::StreamedArray(vec![
1886            Frame::SimpleString(Bytes::from("hello")),
1887            Frame::Integer(42),
1888            Frame::Boolean(true),
1889        ]);
1890        let serialized = frame_to_bytes(&streaming_array);
1891        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1892        assert_eq!(parsed, streaming_array);
1893
1894        // Test streaming map roundtrip
1895        let streaming_map = Frame::StreamedMap(vec![
1896            (
1897                Frame::SimpleString(Bytes::from("key1")),
1898                Frame::SimpleString(Bytes::from("val1")),
1899            ),
1900            (
1901                Frame::SimpleString(Bytes::from("key2")),
1902                Frame::Integer(123),
1903            ),
1904        ]);
1905        let serialized = frame_to_bytes(&streaming_map);
1906        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1907        assert_eq!(parsed, streaming_map);
1908
1909        // Test empty streaming string
1910        let empty_streaming = Frame::StreamedString(vec![]);
1911        let serialized = frame_to_bytes(&empty_streaming);
1912        let expected = "$?\r\n;0\r\n\r\n";
1913        assert_eq!(serialized, Bytes::from(expected));
1914        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1915        assert_eq!(parsed, empty_streaming);
1916
1917        // Test streaming set roundtrip
1918        let streaming_set = Frame::StreamedSet(vec![
1919            Frame::SimpleString(Bytes::from("apple")),
1920            Frame::SimpleString(Bytes::from("banana")),
1921            Frame::Integer(42),
1922        ]);
1923        let serialized = frame_to_bytes(&streaming_set);
1924        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1925        assert_eq!(parsed, streaming_set);
1926
1927        // Test streaming attribute roundtrip
1928        let streaming_attribute = Frame::StreamedAttribute(vec![
1929            (
1930                Frame::SimpleString(Bytes::from("trace-id")),
1931                Frame::SimpleString(Bytes::from("abc123")),
1932            ),
1933            (
1934                Frame::SimpleString(Bytes::from("span-id")),
1935                Frame::SimpleString(Bytes::from("def456")),
1936            ),
1937        ]);
1938        let serialized = frame_to_bytes(&streaming_attribute);
1939        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1940        assert_eq!(parsed, streaming_attribute);
1941
1942        // Test streaming push roundtrip
1943        let streaming_push = Frame::StreamedPush(vec![
1944            Frame::SimpleString(Bytes::from("pubsub")),
1945            Frame::SimpleString(Bytes::from("channel1")),
1946            Frame::SimpleString(Bytes::from("message data")),
1947        ]);
1948        let serialized = frame_to_bytes(&streaming_push);
1949        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1950        assert_eq!(parsed, streaming_push);
1951
1952        // Test empty streaming containers
1953        let empty_array = Frame::StreamedArray(vec![]);
1954        let serialized = frame_to_bytes(&empty_array);
1955        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1956        assert_eq!(parsed, empty_array);
1957
1958        let empty_set = Frame::StreamedSet(vec![]);
1959        let serialized = frame_to_bytes(&empty_set);
1960        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1961        assert_eq!(parsed, empty_set);
1962    }
1963
1964    #[test]
1965    fn test_streaming_sequences_edge_cases() {
1966        // Test incomplete streaming string (missing zero-length terminator)
1967        let data = Bytes::from("$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n");
1968        let result = parse_streaming_sequence(data);
1969        assert!(matches!(result, Err(ParseError::Incomplete)));
1970
1971        // Test malformed chunk in streaming sequence
1972        let data = Bytes::from("$?\r\n;abc\r\nHell\r\n;0\r\n");
1973        let result = parse_streaming_sequence(data);
1974        assert!(matches!(result, Err(ParseError::BadLength)));
1975
1976        // Test streaming array with incomplete terminator
1977        let data = Bytes::from("*?\r\n+hello\r\n:42\r\n");
1978        let result = parse_streaming_sequence(data);
1979        assert!(matches!(result, Err(ParseError::Incomplete)));
1980
1981        // Test mixed streaming and non-streaming content
1982        let data = Bytes::from("*?\r\n+hello\r\n*2\r\n:1\r\n:2\r\n.\r\n");
1983        let result = parse_streaming_sequence(data);
1984        assert!(result.is_ok());
1985        let (frame, _) = result.unwrap();
1986        if let Frame::StreamedArray(items) = frame {
1987            assert_eq!(items.len(), 2);
1988            assert!(matches!(items[0], Frame::SimpleString(_)));
1989            assert!(matches!(items[1], Frame::Array(_)));
1990        }
1991
1992        // Test empty streaming containers
1993        let data = Bytes::from("*?\r\n.\r\n");
1994        let result = parse_streaming_sequence(data);
1995        assert!(result.is_ok());
1996        let (frame, _) = result.unwrap();
1997        if let Frame::StreamedArray(items) = frame {
1998            assert!(items.is_empty());
1999        }
2000
2001        // Test streaming map with odd number of elements
2002        let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+orphan\r\n.\r\n");
2003        let result = parse_streaming_sequence(data);
2004        assert!(matches!(result, Err(ParseError::InvalidFormat)));
2005
2006        // Test non-streaming frame passed to parse_streaming_sequence
2007        let data = Bytes::from("+simple\r\n");
2008        let result = parse_streaming_sequence(data);
2009        assert!(result.is_ok());
2010        let (frame, _) = result.unwrap();
2011        assert!(matches!(frame, Frame::SimpleString(_)));
2012
2013        // Test extremely large chunk size (should fail gracefully)
2014        let data = Bytes::from(";999999999999999999\r\ndata\r\n");
2015        let result = parse_frame(data);
2016        // The parsing might succeed but fail later when trying to read the data
2017        // Let's check what actually happens and accept either BadLength or Incomplete
2018        match &result {
2019            Err(ParseError::BadLength) => {}  // Expected
2020            Err(ParseError::Incomplete) => {} // Also acceptable - might not have enough data for huge chunk
2021            Err(e) => panic!("Got unexpected error type: {e:?}"),
2022            Ok(_) => panic!("Large chunk size should fail"),
2023        }
2024
2025        // Test streaming string with non-chunk frame mixed in
2026        let data = Bytes::from("$?\r\n+invalid\r\n;0\r\n");
2027        let result = parse_streaming_sequence(data);
2028        assert!(matches!(result, Err(ParseError::InvalidFormat)));
2029
2030        // Test streaming sequence with corrupted terminator
2031        let data = Bytes::from("*?\r\n+hello\r\n.corrupted\r\n");
2032        let result = parse_streaming_sequence(data);
2033        assert!(matches!(result, Err(ParseError::Incomplete)));
2034
2035        // Test empty input to parse_streaming_sequence
2036        let data = Bytes::new();
2037        let result = parse_streaming_sequence(data);
2038        assert!(matches!(result, Err(ParseError::Incomplete)));
2039
2040        // Test streaming sequence with partial frame at end
2041        let data = Bytes::from("*?\r\n+hello\r\n$5\r\nwo");
2042        let result = parse_streaming_sequence(data);
2043        assert!(matches!(result, Err(ParseError::Incomplete)));
2044    }
2045
2046    #[test]
2047    fn test_roundtrip_nested_structures() {
2048        // Test a complex nested structure
2049        let original = Bytes::from(
2050            "*3\r\n+hello\r\n%2\r\n+key1\r\n:123\r\n+key2\r\n~1\r\n+item\r\n|1\r\n+meta\r\n+data\r\n",
2051        );
2052        let (frame, _) = parse_frame(original.clone()).unwrap();
2053        let serialized = frame_to_bytes(&frame);
2054
2055        let (reparsed, _) = parse_frame(serialized).unwrap();
2056        assert_eq!(frame, reparsed);
2057    }
2058
2059    #[test]
2060    fn test_zero_length_bulk_string_requires_trailing_crlf() {
2061        // Complete: $0\r\n\r\n
2062        let input = Bytes::from("$0\r\n\r\nTAIL");
2063        let (frame, rest) = parse_frame(input).unwrap();
2064        assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
2065        assert_eq!(rest, Bytes::from("TAIL"));
2066
2067        // Incomplete: $0\r\n with no trailing data
2068        let input = Bytes::from("$0\r\n");
2069        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2070
2071        // Incomplete: $0\r\n with only one byte
2072        let input = Bytes::from("$0\r\n\r");
2073        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2074
2075        // Invalid: $0\r\n followed by non-CRLF
2076        let input = Bytes::from("$0\r\nXY");
2077        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2078    }
2079
2080    #[test]
2081    fn test_zero_length_streamed_chunk_requires_trailing_crlf() {
2082        // Complete: ;0\r\n\r\n
2083        let input = Bytes::from(";0\r\n\r\nTAIL");
2084        let (frame, rest) = parse_frame(input).unwrap();
2085        assert_eq!(frame, Frame::StreamedStringChunk(Bytes::new()));
2086        assert_eq!(rest, Bytes::from("TAIL"));
2087
2088        // Incomplete: ;0\r\n with no trailing data
2089        let input = Bytes::from(";0\r\n");
2090        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2091
2092        // Invalid: ;0\r\n followed by non-CRLF
2093        let input = Bytes::from(";0\r\nXY");
2094        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2095    }
2096
2097    #[test]
2098    fn test_integer_overflow_returns_overflow_error() {
2099        // One past i64::MAX
2100        let input = Bytes::from(":9223372036854775808\r\n");
2101        assert_eq!(parse_frame(input), Err(ParseError::Overflow));
2102
2103        // i64::MAX should succeed
2104        let input = Bytes::from(":9223372036854775807\r\n");
2105        let (frame, _) = parse_frame(input).unwrap();
2106        assert_eq!(frame, Frame::Integer(i64::MAX));
2107
2108        // i64::MIN should succeed
2109        let input = Bytes::from(":-9223372036854775808\r\n");
2110        let (frame, _) = parse_frame(input).unwrap();
2111        assert_eq!(frame, Frame::Integer(i64::MIN));
2112    }
2113
2114    #[test]
2115    fn test_parser_propagates_errors() {
2116        let mut parser = Parser::new();
2117        parser.feed(Bytes::from("XINVALID\r\n"));
2118        let result = parser.next_frame();
2119        assert!(result.is_err());
2120        assert_eq!(result.unwrap_err(), ParseError::InvalidTag(b'X'));
2121    }
2122
2123    #[test]
2124    fn test_parser_returns_ok_none_for_incomplete() {
2125        let mut parser = Parser::new();
2126        parser.feed(Bytes::from("+HELL"));
2127        assert_eq!(parser.next_frame().unwrap(), None);
2128    }
2129
2130    #[test]
2131    fn test_integer_negative_overflow() {
2132        // One past i64::MIN
2133        assert!(parse_frame(Bytes::from(":-9223372036854775809\r\n")).is_err());
2134    }
2135
2136    #[test]
2137    fn test_nonempty_bulk_malformed_terminator() {
2138        // Not enough data after payload
2139        assert_eq!(
2140            parse_frame(Bytes::from("$3\r\nfoo")),
2141            Err(ParseError::Incomplete)
2142        );
2143        // Only one byte after payload
2144        assert_eq!(
2145            parse_frame(Bytes::from("$3\r\nfooX")),
2146            Err(ParseError::Incomplete)
2147        );
2148        // Two bytes present but wrong
2149        assert_eq!(
2150            parse_frame(Bytes::from("$3\r\nfooXY")),
2151            Err(ParseError::InvalidFormat)
2152        );
2153    }
2154
2155    #[test]
2156    fn test_blob_error_malformed_terminator() {
2157        assert_eq!(
2158            parse_frame(Bytes::from("!3\r\nerr")),
2159            Err(ParseError::Incomplete)
2160        );
2161        assert_eq!(
2162            parse_frame(Bytes::from("!3\r\nerrXY")),
2163            Err(ParseError::InvalidFormat)
2164        );
2165    }
2166
2167    #[test]
2168    fn test_verbatim_string_malformed_terminator() {
2169        assert_eq!(
2170            parse_frame(Bytes::from("=8\r\ntxt:data")),
2171            Err(ParseError::Incomplete)
2172        );
2173        assert_eq!(
2174            parse_frame(Bytes::from("=8\r\ntxt:dataXY")),
2175            Err(ParseError::InvalidFormat)
2176        );
2177    }
2178
2179    #[test]
2180    fn test_streamed_chunk_malformed_terminator() {
2181        assert_eq!(
2182            parse_frame(Bytes::from(";3\r\nabc")),
2183            Err(ParseError::Incomplete)
2184        );
2185        assert_eq!(
2186            parse_frame(Bytes::from(";3\r\nabcXY")),
2187            Err(ParseError::InvalidFormat)
2188        );
2189    }
2190
2191    #[test]
2192    fn test_bulk_string_size_limit() {
2193        // Over MAX_BULK_STRING_SIZE (512 MB)
2194        assert_eq!(
2195            parse_frame(Bytes::from("$536870913\r\n")),
2196            Err(ParseError::BadLength)
2197        );
2198    }
2199
2200    #[test]
2201    fn test_blob_error_size_limit() {
2202        assert_eq!(
2203            parse_frame(Bytes::from("!536870913\r\n")),
2204            Err(ParseError::BadLength)
2205        );
2206    }
2207
2208    #[test]
2209    fn test_verbatim_string_size_limit() {
2210        assert_eq!(
2211            parse_frame(Bytes::from("=536870913\r\n")),
2212            Err(ParseError::BadLength)
2213        );
2214    }
2215
2216    #[test]
2217    fn test_streamed_chunk_size_limit() {
2218        assert_eq!(
2219            parse_frame(Bytes::from(";536870913\r\n")),
2220            Err(ParseError::BadLength)
2221        );
2222    }
2223
2224    #[test]
2225    fn test_invalid_double() {
2226        assert_eq!(
2227            parse_frame(Bytes::from(",foo\r\n")),
2228            Err(ParseError::InvalidFormat)
2229        );
2230    }
2231
2232    #[test]
2233    fn test_invalid_boolean() {
2234        assert_eq!(
2235            parse_frame(Bytes::from("#\r\n")),
2236            Err(ParseError::InvalidBoolean)
2237        );
2238        assert_eq!(
2239            parse_frame(Bytes::from("#true\r\n")),
2240            Err(ParseError::InvalidBoolean)
2241        );
2242    }
2243
2244    #[test]
2245    fn test_parser_clears_buffer_on_error() {
2246        let mut parser = Parser::new();
2247        parser.feed(Bytes::from("X\r\n"));
2248        assert_eq!(parser.next_frame(), Err(ParseError::InvalidTag(b'X')));
2249        assert_eq!(parser.buffered_bytes(), 0);
2250    }
2251
2252    #[test]
2253    fn test_parser_recovers_after_error() {
2254        let mut parser = Parser::new();
2255        parser.feed(Bytes::from("X\r\n"));
2256        assert!(parser.next_frame().is_err());
2257        assert_eq!(parser.buffered_bytes(), 0);
2258
2259        parser.feed(Bytes::from("+OK\r\n"));
2260        let frame = parser.next_frame().unwrap().unwrap();
2261        assert_eq!(frame, Frame::SimpleString(Bytes::from("OK")));
2262    }
2263
2264    #[test]
2265    fn test_streaming_set_roundtrip() {
2266        let data = Bytes::from("~?\r\n+a\r\n+b\r\n+c\r\n.\r\n");
2267        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2268        assert_eq!(
2269            frame,
2270            Frame::StreamedSet(vec![
2271                Frame::SimpleString(Bytes::from("a")),
2272                Frame::SimpleString(Bytes::from("b")),
2273                Frame::SimpleString(Bytes::from("c")),
2274            ])
2275        );
2276        assert!(rest.is_empty());
2277    }
2278
2279    #[test]
2280    fn test_streaming_attribute_roundtrip() {
2281        let data = Bytes::from("|?\r\n+key\r\n+val\r\n.\r\n");
2282        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2283        assert_eq!(
2284            frame,
2285            Frame::StreamedAttribute(vec![(
2286                Frame::SimpleString(Bytes::from("key")),
2287                Frame::SimpleString(Bytes::from("val")),
2288            )])
2289        );
2290        assert!(rest.is_empty());
2291    }
2292
2293    #[test]
2294    fn test_streaming_push_roundtrip() {
2295        let data = Bytes::from(">?\r\n+pubsub\r\n+channel\r\n+message\r\n.\r\n");
2296        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2297        assert_eq!(
2298            frame,
2299            Frame::StreamedPush(vec![
2300                Frame::SimpleString(Bytes::from("pubsub")),
2301                Frame::SimpleString(Bytes::from("channel")),
2302                Frame::SimpleString(Bytes::from("message")),
2303            ])
2304        );
2305        assert!(rest.is_empty());
2306    }
2307
2308    #[test]
2309    fn test_empty_streaming_containers() {
2310        // Empty streaming string
2311        let data = Bytes::from("$?\r\n;0\r\n\r\n");
2312        let (frame, _) = parse_streaming_sequence(data).unwrap();
2313        assert_eq!(frame, Frame::StreamedString(vec![]));
2314
2315        // Empty streaming array
2316        let data = Bytes::from("*?\r\n.\r\n");
2317        let (frame, _) = parse_streaming_sequence(data).unwrap();
2318        assert_eq!(frame, Frame::StreamedArray(vec![]));
2319
2320        // Empty streaming set
2321        let data = Bytes::from("~?\r\n.\r\n");
2322        let (frame, _) = parse_streaming_sequence(data).unwrap();
2323        assert_eq!(frame, Frame::StreamedSet(vec![]));
2324
2325        // Empty streaming map
2326        let data = Bytes::from("%?\r\n.\r\n");
2327        let (frame, _) = parse_streaming_sequence(data).unwrap();
2328        assert_eq!(frame, Frame::StreamedMap(vec![]));
2329    }
2330
2331    #[test]
2332    fn test_streaming_attribute_odd_elements_errors() {
2333        let data = Bytes::from("|?\r\n+key\r\n+val\r\n+orphan\r\n.\r\n");
2334        let result = parse_streaming_sequence(data);
2335        assert!(matches!(result, Err(ParseError::InvalidFormat)));
2336    }
2337
2338    #[test]
2339    fn test_streaming_blob_error_header_passthrough() {
2340        // Blob error streaming is not supported; header is passed through
2341        let data = Bytes::from("!?\r\n!5\r\nERROR\r\n");
2342        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2343        assert_eq!(frame, Frame::StreamedBlobErrorHeader);
2344        // Rest contains the subsequent data
2345        assert!(!rest.is_empty());
2346    }
2347
2348    #[test]
2349    fn test_streaming_verbatim_header_passthrough() {
2350        // Verbatim string streaming is not supported; header is passed through
2351        let data = Bytes::from("=?\r\n=9\r\ntxt:hello\r\n");
2352        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2353        assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
2354        assert!(!rest.is_empty());
2355    }
2356}