Skip to main content

resp_rs/
resp3.rs

1//! Zero-copy RESP3 parser.
2//!
3//! Parses RESP3 frames using `bytes::Bytes` for efficient, zero-copy operation.
4//! Supports all RESP3 data types, including fixed-length and streaming variants.
5//!
6//! # Performance
7//!
8//! RESP3 parsing is roughly 3x slower than RESP2 for simple types due to the
9//! larger type tag match. The gap narrows for collection-heavy workloads. For
10//! complete buffers, call [`parse_frame`] directly rather than using [`Parser`]
11//! (see [crate-level performance docs](crate#performance)).
12//!
13//! # Protocol permissiveness
14//!
15//! - **Simple strings and errors** are treated as raw bytes, not validated UTF-8.
16//!   The parser accepts any byte sequence that does not contain `\r` or `\n`.
17//! - **Double parsing** accepts case-insensitive and non-canonical float spellings
18//!   (e.g., `INF`, `Infinity`, `NAN`) via Rust's `f64::parse`, then normalizes
19//!   them to canonical [`Frame::SpecialFloat`] values (`inf`, `-inf`, `nan`).
20//!   This means roundtrip is semantic (value-preserving) but not lexical
21//!   (byte-preserving) for non-canonical inputs.
22//! - **Streaming support** for blob errors and verbatim strings is limited:
23//!   `parse_streaming_sequence` passes through their streaming headers
24//!   (`!?\r\n`, `=?\r\n`) without accumulation, since RESP3 does not define
25//!   a chunk format for these types. Use low-level `parse_frame` to handle
26//!   these headers manually if needed.
27
28use bytes::{BufMut, Bytes, BytesMut};
29
30/// Maximum reasonable size for collections to prevent DoS attacks.
31const MAX_COLLECTION_SIZE: usize = 10_000_000;
32
33/// Maximum reasonable size for bulk string/blob/chunk payloads (512 MB).
34const MAX_BULK_STRING_SIZE: usize = 512 * 1024 * 1024;
35
36/// A streaming parser for RESP3 frames.
37///
38/// This parser allows feeding data in chunks and extracting frames as they become available.
39/// It maintains an internal buffer of accumulated data and attempts to parse frames from it.
40#[derive(Default, Debug)]
41pub struct Parser {
42    buffer: BytesMut,
43}
44
45impl Parser {
46    /// Creates a new parser with an empty buffer.
47    pub fn new() -> Self {
48        Self {
49            buffer: BytesMut::new(),
50        }
51    }
52
53    /// Feeds a chunk of data into the parser.
54    ///
55    /// The data is appended to the internal buffer.
56    pub fn feed(&mut self, data: Bytes) {
57        self.buffer.extend_from_slice(&data);
58    }
59
60    /// Attempts to extract the next complete frame from the buffer.
61    ///
62    /// Returns `Ok(None)` if there is not enough data to parse a complete frame.
63    /// Returns `Ok(Some(frame))` on success, consuming the parsed bytes.
64    /// Returns `Err` on protocol errors, clearing the buffer.
65    pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
66        if self.buffer.is_empty() {
67            return Ok(None);
68        }
69
70        let bytes = self.buffer.split().freeze();
71
72        match parse_frame_inner(&bytes, 0) {
73            Ok((frame, consumed)) => {
74                if consumed < bytes.len() {
75                    self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
76                }
77                Ok(Some(frame))
78            }
79            Err(ParseError::Incomplete) => {
80                self.buffer.unsplit(bytes.into());
81                Ok(None)
82            }
83            Err(e) => {
84                // Buffer was emptied by split() above; intentionally not restored
85                // on hard errors so the parser doesn't re-parse corrupt data.
86                Err(e)
87            }
88        }
89    }
90
91    /// Returns the number of bytes currently in the buffer.
92    pub fn buffered_bytes(&self) -> usize {
93        self.buffer.len()
94    }
95
96    /// Clears the internal buffer.
97    pub fn clear(&mut self) {
98        self.buffer.clear();
99    }
100}
101
102// --- Zero-copy Frame enum and parser using bytes::Bytes ---
103/// A parsed RESP3 frame.
104///
105/// Each variant corresponds to one of the RESP3 types, including both fixed-length
106/// and streaming headers for bulk strings, arrays, sets, maps, attributes, and pushes.
107#[derive(Debug, Clone, PartialEq)]
108pub enum Frame {
109    /// Simple string: +&lt;string&gt;\r\n
110    SimpleString(Bytes),
111    /// Simple error: -&lt;error&gt;\r\n
112    Error(Bytes),
113    /// Integer: :&lt;number&gt;\r\n
114    Integer(i64),
115    /// Blob string: $&lt;length&gt;\r\n&lt;bytes&gt;\r\n
116    // BulkString(Option<Vec<u8>>),
117    BulkString(Option<Bytes>),
118    /// Blob error: !&lt;length&gt;\r\n&lt;bytes&gt;\r\n
119    BlobError(Bytes),
120    /// Streaming blob string header: $?\r\n
121    StreamedStringHeader,
122    /// Streaming blob error header: !?\r\n
123    StreamedBlobErrorHeader,
124    /// Streaming verbatim string header: =?\r\n
125    StreamedVerbatimStringHeader,
126    /// Streaming array header: *?\r\n
127    StreamedArrayHeader,
128    /// Streaming set header: ~?\r\n
129    StreamedSetHeader,
130    /// Streaming map header: %?\r\n
131    StreamedMapHeader,
132    /// Streaming attribute header: |?\r\n
133    StreamedAttributeHeader,
134    /// Streaming push header: >?\r\n
135    StreamedPushHeader,
136    /// Streaming string chunk: ;length\r\ndata\r\n
137    ///
138    /// Represents an individual chunk in a streaming string sequence.
139    /// These chunks are parsed from `;{length}\r\n{data}\r\n` format.
140    /// A zero-length chunk (`;0\r\n`) indicates the end of the stream.
141    ///
142    /// # Example
143    /// ```
144    /// use resp_rs::resp3::Frame;
145    /// use bytes::Bytes;
146    ///
147    /// // Chunk containing "Hello"
148    /// // Wire format: ;5\r\nHello\r\n
149    /// Frame::StreamedStringChunk(Bytes::from("Hello"));
150    /// ```
151    StreamedStringChunk(Bytes),
152
153    /// Accumulated streaming string data from multiple chunks
154    ///
155    /// Created by `parse_streaming_sequence()` when parsing a complete
156    /// streaming string sequence (`$?\r\n` + chunks + `;0\r\n`).
157    /// Contains all chunks in order, allowing reconstruction of the full string.
158    ///
159    /// # Example
160    /// ```
161    /// use resp_rs::resp3::Frame;
162    /// use bytes::Bytes;
163    ///
164    /// // Represents "Hello world" from chunks ["Hello ", "world"]
165    /// Frame::StreamedString(vec![
166    ///     Bytes::from("Hello "),
167    ///     Bytes::from("world")
168    /// ]);
169    /// ```
170    StreamedString(Vec<Bytes>),
171
172    /// Accumulated streaming array data from multiple chunks
173    ///
174    /// Created when parsing a streaming array sequence (`*?\r\n` + frames + `.\r\n`).
175    /// Contains all frames that were streamed as part of the array.
176    ///
177    /// # Example
178    /// ```
179    /// use resp_rs::resp3::Frame;
180    /// use bytes::Bytes;
181    ///
182    /// // Array with mixed types
183    /// Frame::StreamedArray(vec![
184    ///     Frame::SimpleString(Bytes::from("hello")),
185    ///     Frame::Integer(42),
186    ///     Frame::Boolean(true)
187    /// ]);
188    /// ```
189    StreamedArray(Vec<Frame>),
190
191    /// Accumulated streaming set data from multiple chunks
192    ///
193    /// Created when parsing a streaming set sequence (`~?\r\n` + frames + `.\r\n`).
194    /// Contains all unique elements that were streamed as part of the set.
195    StreamedSet(Vec<Frame>),
196
197    /// Accumulated streaming map data from multiple chunks
198    ///
199    /// Created when parsing a streaming map sequence (`%?\r\n` + key-value pairs + `.\r\n`).
200    /// Contains all key-value pairs that were streamed as part of the map.
201    ///
202    /// # Example
203    /// ```
204    /// use resp_rs::resp3::Frame;
205    /// use bytes::Bytes;
206    ///
207    /// Frame::StreamedMap(vec![
208    ///     (Frame::SimpleString(Bytes::from("name")), Frame::SimpleString(Bytes::from("Alice"))),
209    ///     (Frame::SimpleString(Bytes::from("age")), Frame::Integer(25))
210    /// ]);
211    /// ```
212    StreamedMap(Vec<(Frame, Frame)>),
213
214    /// Accumulated streaming attribute data from multiple chunks
215    ///
216    /// Created when parsing a streaming attribute sequence (`|?\r\n` + key-value pairs + `.\r\n`).
217    /// Attributes provide out-of-band metadata that doesn't affect the main data structure.
218    StreamedAttribute(Vec<(Frame, Frame)>),
219
220    /// Accumulated streaming push data from multiple chunks
221    ///
222    /// Created when parsing a streaming push sequence (`>?\r\n` + frames + `.\r\n`).
223    /// Push messages are server-initiated communications (e.g., pub/sub messages).
224    ///
225    /// # Example
226    /// ```
227    /// use resp_rs::resp3::Frame;
228    /// use bytes::Bytes;
229    ///
230    /// // Pub/sub message
231    /// Frame::StreamedPush(vec![
232    ///     Frame::SimpleString(Bytes::from("pubsub")),
233    ///     Frame::SimpleString(Bytes::from("channel1")),
234    ///     Frame::SimpleString(Bytes::from("message content"))
235    /// ]);
236    /// ```
237    StreamedPush(Vec<Frame>),
238    /// End-of-stream terminator for all chunked sequences: .\r\n
239    StreamTerminator,
240    /// Null: _\r\n
241    Null,
242    /// Double: ,&lt;float&gt;\r\n
243    Double(f64),
244    /// Special Float: ,inf\r\n, -inf\r\n, nan\r\n
245    SpecialFloat(Bytes),
246    /// Boolean: #t\r\n or #f\r\n
247    Boolean(bool),
248    /// Big number: (&lt;number&gt;\r\n
249    BigNumber(Bytes),
250    /// Verbatim string: =format:content\r\n
251    // VerbatimString { format: String, content: String },
252    VerbatimString(Bytes, Bytes),
253    /// Array: *&lt;count&gt;\r\n... (or streaming header *?\r\n)
254    Array(Option<Vec<Frame>>),
255    /// Set: ~&lt;count&gt;\r\n... (or streaming header ~?\r\n)
256    Set(Vec<Frame>),
257    /// Map: %&lt;count&gt;\r\n... (or streaming header %?\r\n)
258    Map(Vec<(Frame, Frame)>),
259    /// Attribute: |&lt;count&gt;\r\n... (or streaming header |?\r\n)
260    Attribute(Vec<(Frame, Frame)>),
261    /// Push: > &lt;count&gt;\r\n... (or streaming header >?\r\n)
262    Push(Vec<Frame>),
263}
264
265pub use crate::ParseError;
266
267/// Parse a single RESP3 frame from the provided `Bytes`.
268///
269/// Returns the parsed `Frame` and the remaining unconsumed `Bytes`, or a `ParseError` on failure.
270pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
271    let (frame, consumed) = parse_frame_inner(&input, 0)?;
272    Ok((frame, input.slice(consumed..)))
273}
274
275/// Parse a length-prefixed blob: shared logic for bulk string, blob error, and
276/// streamed string chunks. Returns `(data_start, data_end, after_crlf)` on success.
277#[inline(never)]
278fn parse_blob_bounds(
279    buf: &[u8],
280    after_crlf: usize,
281    len: usize,
282) -> Result<(usize, usize), ParseError> {
283    if len == 0 {
284        if after_crlf + 1 >= buf.len() {
285            return Err(ParseError::Incomplete);
286        }
287        if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
288            return Ok((after_crlf, after_crlf));
289        } else {
290            return Err(ParseError::InvalidFormat);
291        }
292    }
293    let data_start = after_crlf;
294    let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
295    if data_end + 1 >= buf.len() {
296        return Err(ParseError::Incomplete);
297    }
298    if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
299        return Err(ParseError::InvalidFormat);
300    }
301    Ok((data_start, data_end))
302}
303
304/// Parse a bulk string frame (`$`).
305fn parse_bulk_string(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
306    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
307    let len_bytes = &buf[pos + 1..line_end];
308    if len_bytes == b"?" {
309        return Ok((Frame::StreamedStringHeader, after_crlf));
310    }
311    if len_bytes == b"-1" {
312        return Ok((Frame::BulkString(None), after_crlf));
313    }
314    let len = parse_usize(len_bytes)?;
315    if len > MAX_BULK_STRING_SIZE {
316        return Err(ParseError::BadLength);
317    }
318    let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
319    if data_start == data_end {
320        Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2))
321    } else {
322        Ok((
323            Frame::BulkString(Some(input.slice(data_start..data_end))),
324            data_end + 2,
325        ))
326    }
327}
328
329/// Parse a double frame (`,`).
330#[inline(never)]
331fn parse_double_frame(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
332    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
333    let line_bytes = &buf[pos + 1..line_end];
334    if line_bytes == b"inf" || line_bytes == b"-inf" || line_bytes == b"nan" {
335        return Ok((
336            Frame::SpecialFloat(input.slice(pos + 1..line_end)),
337            after_crlf,
338        ));
339    }
340    let s = std::str::from_utf8(line_bytes).map_err(|_| ParseError::Utf8Error)?;
341    let v = s.parse::<f64>().map_err(|_| ParseError::InvalidFormat)?;
342    if v.is_infinite() || v.is_nan() {
343        let canonical = if v.is_nan() {
344            "nan"
345        } else if v.is_sign_negative() {
346            "-inf"
347        } else {
348            "inf"
349        };
350        return Ok((Frame::SpecialFloat(Bytes::from(canonical)), after_crlf));
351    }
352    Ok((Frame::Double(v), after_crlf))
353}
354
355/// Parse a verbatim string frame (`=`).
356#[inline(never)]
357fn parse_verbatim(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
358    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
359    let len_bytes = &buf[pos + 1..line_end];
360    if len_bytes == b"?" {
361        return Ok((Frame::StreamedVerbatimStringHeader, after_crlf));
362    }
363    if len_bytes == b"-1" {
364        return Err(ParseError::BadLength);
365    }
366    let len = parse_usize(len_bytes)?;
367    if len > MAX_BULK_STRING_SIZE {
368        return Err(ParseError::BadLength);
369    }
370    let data_start = after_crlf;
371    let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
372    if data_end + 1 >= buf.len() {
373        return Err(ParseError::Incomplete);
374    }
375    if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
376        return Err(ParseError::InvalidFormat);
377    }
378    let sep = buf[data_start..data_end]
379        .iter()
380        .position(|&b| b == b':')
381        .ok_or(ParseError::InvalidFormat)?;
382    if sep != 3 {
383        return Err(ParseError::InvalidFormat);
384    }
385    let format = input.slice(data_start..data_start + sep);
386    let content = input.slice(data_start + sep + 1..data_end);
387    Ok((Frame::VerbatimString(format, content), data_end + 2))
388}
389
390/// Parse a blob error frame (`!`).
391#[inline(never)]
392fn parse_blob_error(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
393    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
394    let len_bytes = &buf[pos + 1..line_end];
395    if len_bytes == b"?" {
396        return Ok((Frame::StreamedBlobErrorHeader, after_crlf));
397    }
398    if len_bytes == b"-1" {
399        return Err(ParseError::BadLength);
400    }
401    let len = parse_usize(len_bytes)?;
402    if len > MAX_BULK_STRING_SIZE {
403        return Err(ParseError::BadLength);
404    }
405    let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
406    if data_start == data_end {
407        Ok((Frame::BlobError(Bytes::new()), after_crlf + 2))
408    } else {
409        Ok((
410            Frame::BlobError(input.slice(data_start..data_end)),
411            data_end + 2,
412        ))
413    }
414}
415
416/// Parse a collection (array, set, push) with element count.
417#[inline(never)]
418fn parse_collection(
419    input: &Bytes,
420    buf: &[u8],
421    pos: usize,
422    tag: u8,
423) -> Result<(Frame, usize), ParseError> {
424    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
425    let len_bytes = &buf[pos + 1..line_end];
426
427    // Streaming headers
428    if len_bytes == b"?" {
429        return match tag {
430            b'*' => Ok((Frame::StreamedArrayHeader, after_crlf)),
431            b'~' => Ok((Frame::StreamedSetHeader, after_crlf)),
432            b'>' => Ok((Frame::StreamedPushHeader, after_crlf)),
433            _ => unreachable!(),
434        };
435    }
436    // Null array
437    if tag == b'*' && len_bytes == b"-1" {
438        return Ok((Frame::Array(None), after_crlf));
439    }
440    let count = parse_count(len_bytes)?;
441    if count == 0 {
442        return match tag {
443            b'*' => Ok((Frame::Array(Some(Vec::new())), after_crlf)),
444            b'~' => Ok((Frame::Set(Vec::new()), after_crlf)),
445            b'>' => Ok((Frame::Push(Vec::new()), after_crlf)),
446            _ => unreachable!(),
447        };
448    }
449    let mut cursor = after_crlf;
450    let mut items = Vec::with_capacity(count);
451    for _ in 0..count {
452        let (item, next) = parse_frame_inner(input, cursor)?;
453        items.push(item);
454        cursor = next;
455    }
456    match tag {
457        b'*' => Ok((Frame::Array(Some(items)), cursor)),
458        b'~' => Ok((Frame::Set(items), cursor)),
459        b'>' => Ok((Frame::Push(items), cursor)),
460        _ => unreachable!(),
461    }
462}
463
464/// Parse a map or attribute (`%` / `|`).
465#[inline(never)]
466fn parse_pairs(
467    input: &Bytes,
468    buf: &[u8],
469    pos: usize,
470    tag: u8,
471) -> Result<(Frame, usize), ParseError> {
472    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
473    let len_bytes = &buf[pos + 1..line_end];
474    if len_bytes == b"?" {
475        return if tag == b'%' {
476            Ok((Frame::StreamedMapHeader, after_crlf))
477        } else {
478            Ok((Frame::StreamedAttributeHeader, after_crlf))
479        };
480    }
481    let count = parse_count(len_bytes)?;
482    let mut cursor = after_crlf;
483    let mut pairs = Vec::with_capacity(count);
484    for _ in 0..count {
485        let (key, next1) = parse_frame_inner(input, cursor)?;
486        let (val, next2) = parse_frame_inner(input, next1)?;
487        pairs.push((key, val));
488        cursor = next2;
489    }
490    if tag == b'%' {
491        Ok((Frame::Map(pairs), cursor))
492    } else {
493        Ok((Frame::Attribute(pairs), cursor))
494    }
495}
496
497/// Parse a streamed string chunk (`;`).
498#[inline(never)]
499fn parse_streamed_chunk(
500    input: &Bytes,
501    buf: &[u8],
502    pos: usize,
503) -> Result<(Frame, usize), ParseError> {
504    let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
505    let len = parse_usize(&buf[pos + 1..line_end])?;
506    if len > MAX_BULK_STRING_SIZE {
507        return Err(ParseError::BadLength);
508    }
509    let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
510    if data_start == data_end {
511        Ok((Frame::StreamedStringChunk(Bytes::new()), after_crlf + 2))
512    } else {
513        Ok((
514            Frame::StreamedStringChunk(input.slice(data_start..data_end)),
515            data_end + 2,
516        ))
517    }
518}
519
520/// Offset-based internal parser. The match body is kept minimal to reduce
521/// instruction-cache pressure; heavy arms are extracted into `#[inline(never)]`
522/// helpers so the hot dispatch stays small.
523fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
524    let buf = input.as_ref();
525    if pos >= buf.len() {
526        return Err(ParseError::Incomplete);
527    }
528
529    let tag = buf[pos];
530
531    match tag {
532        // Line-based types (small, stay inline)
533        b'+' => {
534            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
535            Ok((
536                Frame::SimpleString(input.slice(pos + 1..line_end)),
537                after_crlf,
538            ))
539        }
540        b'-' => {
541            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
542            Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
543        }
544        b':' => {
545            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
546            let v = parse_i64(&buf[pos + 1..line_end])?;
547            Ok((Frame::Integer(v), after_crlf))
548        }
549        b'#' => {
550            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
551            match &buf[pos + 1..line_end] {
552                b"t" => Ok((Frame::Boolean(true), after_crlf)),
553                b"f" => Ok((Frame::Boolean(false), after_crlf)),
554                _ => Err(ParseError::InvalidBoolean),
555            }
556        }
557        b'(' => {
558            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
559            Ok((Frame::BigNumber(input.slice(pos + 1..line_end)), after_crlf))
560        }
561        b'_' => {
562            if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
563                Ok((Frame::Null, pos + 3))
564            } else {
565                Err(ParseError::Incomplete)
566            }
567        }
568        b'.' => {
569            if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
570                Ok((Frame::StreamTerminator, pos + 3))
571            } else {
572                Err(ParseError::Incomplete)
573            }
574        }
575
576        // Length-prefixed types (extracted to reduce icache pressure)
577        b'$' => parse_bulk_string(input, buf, pos),
578        b',' => parse_double_frame(input, buf, pos),
579        b'=' => parse_verbatim(input, buf, pos),
580        b'!' => parse_blob_error(input, buf, pos),
581        b';' => parse_streamed_chunk(input, buf, pos),
582
583        // Collections (extracted)
584        b'*' | b'~' | b'>' => parse_collection(input, buf, pos, tag),
585        b'%' | b'|' => parse_pairs(input, buf, pos, tag),
586
587        _ => Err(ParseError::InvalidTag(tag)),
588    }
589}
590
591/// Parse a complete RESP3 streaming sequence, accumulating chunks until termination.
592///
593/// This function handles RESP3 streaming sequences that begin with streaming headers
594/// (`$?`, `*?`, `~?`, `%?`, `|?`, `>?`) and accumulates the following data until
595/// the appropriate terminator is encountered.
596///
597/// # Streaming Types Supported
598///
599/// - **Streaming Strings**: `$?\r\n` followed by chunks terminated with `;0\r\n`
600/// - **Streaming Arrays**: `*?\r\n` followed by frames terminated with `.\r\n`
601/// - **Streaming Sets**: `~?\r\n` followed by frames terminated with `.\r\n`
602/// - **Streaming Maps**: `%?\r\n` followed by key-value pairs terminated with `.\r\n`
603/// - **Streaming Attributes**: `|?\r\n` followed by key-value pairs terminated with `.\r\n`
604/// - **Streaming Push**: `>?\r\n` followed by frames terminated with `.\r\n`
605///
606/// # Examples
607///
608/// ## Streaming String
609/// ```rust
610/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
611/// use bytes::Bytes;
612///
613/// let data = Bytes::from("$?\r\n;4\r\nHell\r\n;6\r\no worl\r\n;1\r\nd\r\n;0\r\n\r\n");
614/// let (frame, rest) = parse_streaming_sequence(data).unwrap();
615///
616/// if let Frame::StreamedString(chunks) = frame {
617///     assert_eq!(chunks.len(), 3);
618///     let full_string: String = chunks.iter()
619///         .map(|chunk| std::str::from_utf8(chunk).unwrap())
620///         .collect::<Vec<_>>()
621///         .join("");
622///     assert_eq!(full_string, "Hello world");
623/// }
624/// assert!(rest.is_empty());
625/// ```
626///
627/// ## Streaming Array
628/// ```rust
629/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
630/// use bytes::Bytes;
631///
632/// let data = Bytes::from("*?\r\n+hello\r\n:42\r\n#t\r\n.\r\n");
633/// let (frame, _) = parse_streaming_sequence(data).unwrap();
634///
635/// if let Frame::StreamedArray(items) = frame {
636///     assert_eq!(items.len(), 3);
637///     // items[0] = SimpleString("hello")
638///     // items[1] = Integer(42)
639///     // items[2] = Boolean(true)
640/// }
641/// ```
642///
643/// ## Streaming Map
644/// ```rust
645/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
646/// use bytes::Bytes;
647///
648/// let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+key2\r\n:123\r\n.\r\n");
649/// let (frame, _) = parse_streaming_sequence(data).unwrap();
650///
651/// if let Frame::StreamedMap(pairs) = frame {
652///     assert_eq!(pairs.len(), 2);
653///     // pairs[0] = (SimpleString("key1"), SimpleString("val1"))
654///     // pairs[1] = (SimpleString("key2"), Integer(123))
655/// }
656/// ```
657///
658/// # Errors
659///
660/// Returns `ParseError::Incomplete` if the stream is not complete or if required
661/// terminators are missing. Returns `ParseError::InvalidFormat` for malformed
662/// chunk data or unexpected frame types within streaming sequences.
663///
664/// # Notes
665///
666/// - For non-streaming frames, this function simply returns the parsed frame
667/// - Streaming string chunks are accumulated in order
668/// - All other streaming types accumulate complete frames until termination
669/// - Zero-copy parsing is used where possible to minimize allocations
670pub fn parse_streaming_sequence(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
671    if input.is_empty() {
672        return Err(ParseError::Incomplete);
673    }
674
675    let (header, mut rest) = parse_frame(input)?;
676
677    match header {
678        Frame::StreamedStringHeader => {
679            // Parse streaming string chunks until zero-length chunk
680            let mut chunks = Vec::new();
681
682            loop {
683                let (frame, new_rest) = parse_frame(rest)?;
684                rest = new_rest;
685
686                match frame {
687                    Frame::StreamedStringChunk(chunk) => {
688                        if chunk.is_empty() {
689                            // Zero-length chunk indicates end of stream
690                            break;
691                        }
692                        chunks.push(chunk);
693                    }
694                    _ => {
695                        return Err(ParseError::InvalidFormat);
696                    }
697                }
698            }
699
700            Ok((Frame::StreamedString(chunks), rest))
701        }
702        Frame::StreamedBlobErrorHeader | Frame::StreamedVerbatimStringHeader => {
703            // RESP3 does not define a streaming chunk format for blob errors
704            // or verbatim strings. Return the header as-is for low-level consumers.
705            Ok((header, rest))
706        }
707        Frame::StreamedArrayHeader => {
708            // Parse streaming array items until terminator
709            let mut items = Vec::new();
710
711            loop {
712                let (frame, new_rest) = parse_frame(rest)?;
713                rest = new_rest;
714
715                match frame {
716                    Frame::StreamTerminator => {
717                        break;
718                    }
719                    item => {
720                        items.push(item);
721                    }
722                }
723            }
724
725            Ok((Frame::StreamedArray(items), rest))
726        }
727        Frame::StreamedSetHeader => {
728            // Parse streaming set items until terminator
729            let mut items = Vec::new();
730
731            loop {
732                let (frame, new_rest) = parse_frame(rest)?;
733                rest = new_rest;
734
735                match frame {
736                    Frame::StreamTerminator => {
737                        break;
738                    }
739                    item => {
740                        items.push(item);
741                    }
742                }
743            }
744
745            Ok((Frame::StreamedSet(items), rest))
746        }
747        Frame::StreamedMapHeader => {
748            // Parse streaming map pairs until terminator
749            let mut pairs = Vec::new();
750
751            loop {
752                let (frame, new_rest) = parse_frame(rest)?;
753                rest = new_rest;
754
755                match frame {
756                    Frame::StreamTerminator => {
757                        break;
758                    }
759                    key => {
760                        let (value, newer_rest) = parse_frame(rest)?;
761                        if matches!(value, Frame::StreamTerminator) {
762                            return Err(ParseError::InvalidFormat);
763                        }
764                        rest = newer_rest;
765                        pairs.push((key, value));
766                    }
767                }
768            }
769
770            Ok((Frame::StreamedMap(pairs), rest))
771        }
772        Frame::StreamedAttributeHeader => {
773            // Parse streaming attribute pairs until terminator
774            let mut pairs = Vec::new();
775
776            loop {
777                let (frame, new_rest) = parse_frame(rest)?;
778                rest = new_rest;
779
780                match frame {
781                    Frame::StreamTerminator => {
782                        break;
783                    }
784                    key => {
785                        let (value, newer_rest) = parse_frame(rest)?;
786                        if matches!(value, Frame::StreamTerminator) {
787                            return Err(ParseError::InvalidFormat);
788                        }
789                        rest = newer_rest;
790                        pairs.push((key, value));
791                    }
792                }
793            }
794
795            Ok((Frame::StreamedAttribute(pairs), rest))
796        }
797        Frame::StreamedPushHeader => {
798            // Parse streaming push items until terminator
799            let mut items = Vec::new();
800
801            loop {
802                let (frame, new_rest) = parse_frame(rest)?;
803                rest = new_rest;
804
805                match frame {
806                    Frame::StreamTerminator => {
807                        break;
808                    }
809                    item => {
810                        items.push(item);
811                    }
812                }
813            }
814
815            Ok((Frame::StreamedPush(items), rest))
816        }
817        _ => {
818            // Not a streaming sequence, just return the original frame
819            Ok((header, rest))
820        }
821    }
822}
823
824/// Find `\r\n` in `buf` starting at `from`. Returns `(line_end, after_crlf)` where
825/// `line_end` is the position of `\r` and `after_crlf` is the position after `\n`.
826#[inline]
827fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
828    let mut i = from;
829    let len = buf.len();
830    while i + 1 < len {
831        if buf[i] == b'\r' && buf[i + 1] == b'\n' {
832            return Ok((i, i + 2));
833        }
834        i += 1;
835    }
836    Err(ParseError::Incomplete)
837}
838
839/// Parse a `usize` directly from ASCII digit bytes, no UTF-8 validation needed.
840#[inline]
841fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
842    if buf.is_empty() {
843        return Err(ParseError::BadLength);
844    }
845    let mut v: usize = 0;
846    for &b in buf {
847        if !b.is_ascii_digit() {
848            return Err(ParseError::BadLength);
849        }
850        v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
851        v = v
852            .checked_add((b - b'0') as usize)
853            .ok_or(ParseError::BadLength)?;
854    }
855    Ok(v)
856}
857
858/// Parse an `i64` directly from ASCII bytes (optional leading `-`), no UTF-8 validation.
859#[inline]
860fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
861    if buf.is_empty() {
862        return Err(ParseError::InvalidFormat);
863    }
864    let (neg, digits) = if buf[0] == b'-' {
865        (true, &buf[1..])
866    } else {
867        (false, buf)
868    };
869    if digits.is_empty() {
870        return Err(ParseError::InvalidFormat);
871    }
872    let mut v: i64 = 0;
873    for (i, &d) in digits.iter().enumerate() {
874        if !d.is_ascii_digit() {
875            return Err(ParseError::InvalidFormat);
876        }
877        let digit = (d - b'0') as i64;
878        if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
879            return Ok(i64::MIN);
880        }
881        if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
882            return Err(ParseError::Overflow);
883        }
884        v = v * 10 + digit;
885    }
886    if neg { Ok(-v) } else { Ok(v) }
887}
888
889/// Parse a collection count (usize) with MAX_COLLECTION_SIZE check.
890#[inline]
891fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
892    let count = parse_usize(buf)?;
893    if count > MAX_COLLECTION_SIZE {
894        return Err(ParseError::BadLength);
895    }
896    Ok(count)
897}
898
899/// Converts a Frame to its RESP3 byte representation.
900///
901/// This function serializes a Frame into the corresponding RESP3 protocol bytes.
902pub fn frame_to_bytes(frame: &Frame) -> Bytes {
903    let mut buf = BytesMut::new();
904    serialize_frame(frame, &mut buf);
905    buf.freeze()
906}
907
908fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
909    match frame {
910        Frame::SimpleString(s) => {
911            buf.put_u8(b'+');
912            buf.extend_from_slice(s);
913            buf.extend_from_slice(b"\r\n");
914        }
915        Frame::Error(e) => {
916            buf.put_u8(b'-');
917            buf.extend_from_slice(e);
918            buf.extend_from_slice(b"\r\n");
919        }
920        Frame::Integer(i) => {
921            buf.put_u8(b':');
922            let s = i.to_string();
923            buf.extend_from_slice(s.as_bytes());
924            buf.extend_from_slice(b"\r\n");
925        }
926        Frame::BulkString(opt) => {
927            buf.put_u8(b'$');
928            match opt {
929                Some(data) => {
930                    let len = data.len().to_string();
931                    buf.extend_from_slice(len.as_bytes());
932                    buf.extend_from_slice(b"\r\n");
933                    buf.extend_from_slice(data);
934                    buf.extend_from_slice(b"\r\n");
935                }
936                None => {
937                    buf.extend_from_slice(b"-1\r\n");
938                }
939            }
940        }
941        Frame::BlobError(data) => {
942            buf.put_u8(b'!');
943            let len = data.len().to_string();
944            buf.extend_from_slice(len.as_bytes());
945            buf.extend_from_slice(b"\r\n");
946            buf.extend_from_slice(data);
947            buf.extend_from_slice(b"\r\n");
948        }
949        Frame::StreamedStringHeader => {
950            buf.extend_from_slice(b"$?\r\n");
951        }
952        Frame::StreamedBlobErrorHeader => {
953            buf.extend_from_slice(b"!?\r\n");
954        }
955        Frame::StreamedVerbatimStringHeader => {
956            buf.extend_from_slice(b"=?\r\n");
957        }
958        Frame::StreamedArrayHeader => {
959            buf.extend_from_slice(b"*?\r\n");
960        }
961        Frame::StreamedSetHeader => {
962            buf.extend_from_slice(b"~?\r\n");
963        }
964        Frame::StreamedMapHeader => {
965            buf.extend_from_slice(b"%?\r\n");
966        }
967        Frame::StreamedAttributeHeader => {
968            buf.extend_from_slice(b"|?\r\n");
969        }
970        Frame::StreamedPushHeader => {
971            buf.extend_from_slice(b">?\r\n");
972        }
973        Frame::StreamedStringChunk(data) => {
974            buf.put_u8(b';');
975            let len = data.len().to_string();
976            buf.extend_from_slice(len.as_bytes());
977            buf.extend_from_slice(b"\r\n");
978            buf.extend_from_slice(data);
979            buf.extend_from_slice(b"\r\n");
980        }
981        Frame::StreamedString(chunks) => {
982            // Serialize as streaming string sequence: $?\r\n + chunks + terminator
983            buf.extend_from_slice(b"$?\r\n");
984            for chunk in chunks {
985                buf.put_u8(b';');
986                let len = chunk.len().to_string();
987                buf.extend_from_slice(len.as_bytes());
988                buf.extend_from_slice(b"\r\n");
989                buf.extend_from_slice(chunk);
990                buf.extend_from_slice(b"\r\n");
991            }
992            buf.extend_from_slice(b";0\r\n\r\n");
993        }
994        Frame::StreamedArray(items) => {
995            buf.extend_from_slice(b"*?\r\n");
996            for item in items {
997                serialize_frame(item, buf);
998            }
999            buf.extend_from_slice(b".\r\n");
1000        }
1001        Frame::StreamedSet(items) => {
1002            buf.extend_from_slice(b"~?\r\n");
1003            for item in items {
1004                serialize_frame(item, buf);
1005            }
1006            buf.extend_from_slice(b".\r\n");
1007        }
1008        Frame::StreamedMap(pairs) => {
1009            buf.extend_from_slice(b"%?\r\n");
1010            for (key, value) in pairs {
1011                serialize_frame(key, buf);
1012                serialize_frame(value, buf);
1013            }
1014            buf.extend_from_slice(b".\r\n");
1015        }
1016        Frame::StreamedAttribute(pairs) => {
1017            buf.extend_from_slice(b"|?\r\n");
1018            for (key, value) in pairs {
1019                serialize_frame(key, buf);
1020                serialize_frame(value, buf);
1021            }
1022            buf.extend_from_slice(b".\r\n");
1023        }
1024        Frame::StreamedPush(items) => {
1025            buf.extend_from_slice(b">?\r\n");
1026            for item in items {
1027                serialize_frame(item, buf);
1028            }
1029            buf.extend_from_slice(b".\r\n");
1030        }
1031        Frame::StreamTerminator => {
1032            buf.extend_from_slice(b".\r\n");
1033        }
1034        Frame::Null => {
1035            buf.extend_from_slice(b"_\r\n");
1036        }
1037        Frame::Double(d) => {
1038            buf.put_u8(b',');
1039            let s = d.to_string();
1040            buf.extend_from_slice(s.as_bytes());
1041            buf.extend_from_slice(b"\r\n");
1042        }
1043        Frame::SpecialFloat(f) => {
1044            buf.put_u8(b',');
1045            buf.extend_from_slice(f);
1046            buf.extend_from_slice(b"\r\n");
1047        }
1048        Frame::Boolean(b) => {
1049            buf.extend_from_slice(if *b { b"#t\r\n" } else { b"#f\r\n" });
1050        }
1051        Frame::BigNumber(n) => {
1052            buf.put_u8(b'(');
1053            buf.extend_from_slice(n);
1054            buf.extend_from_slice(b"\r\n");
1055        }
1056        Frame::VerbatimString(format, content) => {
1057            buf.put_u8(b'=');
1058            let total_len = format.len() + 1 + content.len(); // +1 for the colon
1059            let len = total_len.to_string();
1060            buf.extend_from_slice(len.as_bytes());
1061            buf.extend_from_slice(b"\r\n");
1062            buf.extend_from_slice(format);
1063            buf.put_u8(b':');
1064            buf.extend_from_slice(content);
1065            buf.extend_from_slice(b"\r\n");
1066        }
1067        Frame::Array(opt) => {
1068            buf.put_u8(b'*');
1069            match opt {
1070                Some(items) => {
1071                    let len = items.len().to_string();
1072                    buf.extend_from_slice(len.as_bytes());
1073                    buf.extend_from_slice(b"\r\n");
1074                    for item in items {
1075                        serialize_frame(item, buf);
1076                    }
1077                }
1078                None => {
1079                    buf.extend_from_slice(b"-1\r\n");
1080                }
1081            }
1082        }
1083        Frame::Set(items) => {
1084            buf.put_u8(b'~');
1085            let len = items.len().to_string();
1086            buf.extend_from_slice(len.as_bytes());
1087            buf.extend_from_slice(b"\r\n");
1088            for item in items {
1089                serialize_frame(item, buf);
1090            }
1091        }
1092        Frame::Map(pairs) => {
1093            buf.put_u8(b'%');
1094            let len = pairs.len().to_string();
1095            buf.extend_from_slice(len.as_bytes());
1096            buf.extend_from_slice(b"\r\n");
1097            for (key, value) in pairs {
1098                serialize_frame(key, buf);
1099                serialize_frame(value, buf);
1100            }
1101        }
1102        Frame::Attribute(pairs) => {
1103            buf.put_u8(b'|');
1104            let len = pairs.len().to_string();
1105            buf.extend_from_slice(len.as_bytes());
1106            buf.extend_from_slice(b"\r\n");
1107            for (key, value) in pairs {
1108                serialize_frame(key, buf);
1109                serialize_frame(value, buf);
1110            }
1111        }
1112        Frame::Push(items) => {
1113            buf.put_u8(b'>');
1114            let len = items.len().to_string();
1115            buf.extend_from_slice(len.as_bytes());
1116            buf.extend_from_slice(b"\r\n");
1117            for item in items {
1118                serialize_frame(item, buf);
1119            }
1120        }
1121    }
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126    use super::{Frame, ParseError, Parser, frame_to_bytes, parse_frame, parse_streaming_sequence};
1127    use bytes::Bytes;
1128
1129    #[test]
1130    fn test_parse_frame_simple_string() {
1131        let input = Bytes::from("+HELLO\r\nWORLD");
1132        let (frame, rest) = parse_frame(input.clone()).unwrap();
1133        assert_eq!(frame, Frame::SimpleString(Bytes::from("HELLO")));
1134        assert_eq!(rest, Bytes::from("WORLD"));
1135    }
1136
1137    #[test]
1138    fn test_parse_frame_blob_error() {
1139        let input = Bytes::from("!5\r\nERROR\r\nREST");
1140        let (frame, rest) = parse_frame(input.clone()).unwrap();
1141        assert_eq!(frame, Frame::BlobError(Bytes::from("ERROR")));
1142        assert_eq!(rest, Bytes::from("REST"));
1143    }
1144
1145    #[test]
1146    fn test_parse_frame_error() {
1147        let input = Bytes::from("-ERR fail\r\nLEFT");
1148        let (frame, rest) = parse_frame(input.clone()).unwrap();
1149        assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
1150        assert_eq!(rest, Bytes::from("LEFT"));
1151    }
1152
1153    #[test]
1154    fn test_parse_frame_integer() {
1155        let input = Bytes::from(":42\r\nTAIL");
1156        let (frame, rest) = parse_frame(input.clone()).unwrap();
1157        assert_eq!(frame, Frame::Integer(42));
1158        assert_eq!(rest, Bytes::from("TAIL"));
1159    }
1160
1161    #[test]
1162    fn test_parse_frame_bulk_string() {
1163        let input = Bytes::from("$3\r\nfoo\r\nREST");
1164        let (frame, rest) = parse_frame(input.clone()).unwrap();
1165        assert_eq!(frame, Frame::BulkString(Some(Bytes::from("foo"))));
1166        assert_eq!(rest, Bytes::from("REST"));
1167        let null_input = Bytes::from("$-1\r\nAFTER");
1168        let (frame, rest) = parse_frame(null_input.clone()).unwrap();
1169        assert_eq!(frame, Frame::BulkString(None));
1170        assert_eq!(rest, Bytes::from("AFTER"));
1171    }
1172
1173    #[test]
1174    fn test_parse_frame_null() {
1175        let input = Bytes::from("_\r\nLEFT");
1176        let (frame, rest) = parse_frame(input.clone()).unwrap();
1177        assert_eq!(frame, Frame::Null);
1178        assert_eq!(rest, Bytes::from("LEFT"));
1179    }
1180
1181    #[test]
1182    fn test_parse_frame_double_and_special_float() {
1183        let input = Bytes::from(",3.5\r\nNEXT");
1184        let (frame, rest) = parse_frame(input.clone()).unwrap();
1185        assert_eq!(frame, Frame::Double(3.5));
1186        assert_eq!(rest, Bytes::from("NEXT"));
1187        let input_inf = Bytes::from(",inf\r\nTAIL");
1188        let (frame, rest) = parse_frame(input_inf.clone()).unwrap();
1189        assert_eq!(frame, Frame::SpecialFloat(Bytes::from("inf")));
1190        assert_eq!(rest, Bytes::from("TAIL"));
1191    }
1192
1193    #[test]
1194    fn test_parse_frame_boolean() {
1195        let input_true = Bytes::from("#t\r\nXYZ");
1196        let (frame, rest) = parse_frame(input_true.clone()).unwrap();
1197        assert_eq!(frame, Frame::Boolean(true));
1198        assert_eq!(rest, Bytes::from("XYZ"));
1199        let input_false = Bytes::from("#f\r\nDONE");
1200        let (frame, rest) = parse_frame(input_false.clone()).unwrap();
1201        assert_eq!(frame, Frame::Boolean(false));
1202        assert_eq!(rest, Bytes::from("DONE"));
1203    }
1204
1205    #[test]
1206    fn test_parse_frame_big_number() {
1207        let input = Bytes::from("(123456789\r\nEND");
1208        let (frame, rest) = parse_frame(input.clone()).unwrap();
1209        assert_eq!(frame, Frame::BigNumber(Bytes::from("123456789")));
1210        assert_eq!(rest, Bytes::from("END"));
1211    }
1212
1213    #[test]
1214    fn test_parse_frame_verbatim_string() {
1215        let input = Bytes::from("=12\r\ntxt:hi there\r\nAFTER");
1216        let (frame, rest) = parse_frame(input.clone()).unwrap();
1217        assert_eq!(
1218            frame,
1219            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hi there")) // Frame::VerbatimString {
1220                                                                               //     format: "txt".to_string(),
1221                                                                               //     content: "hi there".to_string()
1222                                                                               // }
1223        );
1224        assert_eq!(rest, Bytes::from("AFTER"));
1225    }
1226
1227    #[test]
1228    fn test_parse_frame_array_set_push_map_attribute() {
1229        // Array of two bulk strings
1230        let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\nTAIL");
1231        let (frame, rest) = parse_frame(input.clone()).unwrap();
1232        assert_eq!(
1233            frame,
1234            Frame::Array(Some(vec![
1235                Frame::BulkString(Some(Bytes::from("foo"))),
1236                Frame::BulkString(Some(Bytes::from("bar")))
1237            ]))
1238        );
1239        assert_eq!(rest, Bytes::from("TAIL"));
1240        // Null array
1241        let input_null = Bytes::from("*-1\r\nEND");
1242        let (frame, rest) = parse_frame(input_null.clone()).unwrap();
1243        assert_eq!(frame, Frame::Array(None));
1244        assert_eq!(rest, Bytes::from("END"));
1245        // Set of two simple strings
1246        let input_set = Bytes::from("~2\r\n+foo\r\n+bar\r\nTAIL");
1247        let (frame, rest) = parse_frame(input_set.clone()).unwrap();
1248        assert_eq!(
1249            frame,
1250            Frame::Set(vec![
1251                Frame::SimpleString(Bytes::from("foo")),
1252                Frame::SimpleString(Bytes::from("bar")),
1253            ])
1254        );
1255        assert_eq!(rest, Bytes::from("TAIL"));
1256        // Map of two key-value pairs
1257        let input_map = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\nTRAIL");
1258        let (frame, rest) = parse_frame(input_map.clone()).unwrap();
1259        assert_eq!(
1260            frame,
1261            Frame::Map(vec![
1262                (
1263                    Frame::SimpleString(Bytes::from("key1")),
1264                    Frame::SimpleString(Bytes::from("val1"))
1265                ),
1266                (
1267                    Frame::SimpleString(Bytes::from("key2")),
1268                    Frame::SimpleString(Bytes::from("val2"))
1269                ),
1270            ])
1271        );
1272        assert_eq!(rest, Bytes::from("TRAIL"));
1273        // Attribute
1274        let input_attr = Bytes::from("|1\r\n+meta\r\n+data\r\nAFTER");
1275        let (frame, rest) = parse_frame(input_attr.clone()).unwrap();
1276        assert_eq!(
1277            frame,
1278            Frame::Attribute(vec![(
1279                Frame::SimpleString(Bytes::from("meta")),
1280                Frame::SimpleString(Bytes::from("data"))
1281            ),])
1282        );
1283        assert_eq!(rest, Bytes::from("AFTER"));
1284        // Push
1285        let input_push = Bytes::from(">2\r\n+type\r\n:1\r\nNEXT");
1286        let (frame, rest) = parse_frame(input_push.clone()).unwrap();
1287        assert_eq!(
1288            frame,
1289            Frame::Push(vec![
1290                Frame::SimpleString(Bytes::from("type")),
1291                Frame::Integer(1),
1292            ])
1293        );
1294        assert_eq!(rest, Bytes::from("NEXT"));
1295    }
1296
1297    #[test]
1298    fn test_parse_frame_empty_input() {
1299        assert!(parse_frame(Bytes::new()).is_err());
1300    }
1301
1302    #[test]
1303    fn test_parse_frame_invalid_tag() {
1304        let input = Bytes::from("X123\r\n");
1305        assert!(parse_frame(input).is_err());
1306    }
1307
1308    #[test]
1309    fn test_parse_frame_malformed_bulk_length() {
1310        let input = Bytes::from("$x\r\nfoo\r\n");
1311        assert!(parse_frame(input).is_err());
1312    }
1313
1314    #[test]
1315    fn test_parse_frame_zero_length_bulk() {
1316        let input = Bytes::from("$0\r\n\r\nTAIL");
1317        let (frame, rest) = parse_frame(input.clone()).unwrap();
1318        assert_eq!(frame, Frame::BulkString(Some(Bytes::from(""))));
1319        assert_eq!(rest, Bytes::from("TAIL"));
1320    }
1321
1322    #[test]
1323    fn test_parse_frame_zero_length_blob_error() {
1324        let input = Bytes::from("!0\r\n\r\nREST");
1325        let (frame, rest) = parse_frame(input.clone()).unwrap();
1326        assert_eq!(frame, Frame::BlobError(Bytes::new()));
1327        assert_eq!(rest, Bytes::from("REST"));
1328    }
1329
1330    #[test]
1331    fn test_parse_frame_missing_crlf() {
1332        let input = Bytes::from(":42\nTAIL");
1333        assert!(parse_frame(input).is_err());
1334    }
1335
1336    #[test]
1337    fn test_parse_frame_unicode_simple_string() {
1338        let input = Bytes::from("+こんにちは\r\nEND");
1339        let (frame, rest) = parse_frame(input.clone()).unwrap();
1340        assert_eq!(frame, Frame::SimpleString(Bytes::from("こんにちは")));
1341        assert_eq!(rest, Bytes::from("END"));
1342    }
1343
1344    #[test]
1345    fn test_parse_frame_chained_frames() {
1346        let combined = Bytes::from("+OK\r\n:1\r\nfoo");
1347        let (f1, rem) = parse_frame(combined.clone()).unwrap();
1348        assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
1349        let (f2, rem2) = parse_frame(rem).unwrap();
1350        assert_eq!(f2, Frame::Integer(1));
1351        assert_eq!(rem2, Bytes::from("foo"));
1352    }
1353
1354    #[test]
1355    fn test_parse_frame_empty_array() {
1356        let input = Bytes::from("*0\r\nTAIL");
1357        let (frame, rest) = parse_frame(input.clone()).unwrap();
1358        assert_eq!(frame, Frame::Array(Some(vec![])));
1359        assert_eq!(rest, Bytes::from("TAIL"));
1360    }
1361
1362    #[test]
1363    fn test_parse_frame_partial_array_data() {
1364        let input = Bytes::from("*2\r\n+OK\r\n");
1365        assert!(parse_frame(input).is_err());
1366    }
1367
1368    #[test]
1369    fn test_parse_frame_streamed_string() {
1370        let input = Bytes::from("$?\r\n$5\r\nhello\r\n$0\r\n\r\nREST");
1371        let (frame, rem) = parse_frame(input.clone()).unwrap();
1372        assert_eq!(frame, Frame::StreamedStringHeader);
1373        let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1374        assert_eq!(chunk, Frame::BulkString(Some(Bytes::from("hello"))));
1375        let (terminator, rest) = parse_frame(rem2.clone()).unwrap();
1376        assert_eq!(terminator, Frame::BulkString(Some(Bytes::from(""))));
1377        assert_eq!(rest, Bytes::from("REST"));
1378    }
1379
1380    #[test]
1381    fn test_parse_frame_streamed_blob_error() {
1382        let input = Bytes::from("!?\r\n!5\r\nERROR\r\nREST");
1383        let (frame, rem) = parse_frame(input.clone()).unwrap();
1384        assert_eq!(frame, Frame::StreamedBlobErrorHeader);
1385        let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1386        assert_eq!(chunk, Frame::BlobError(Bytes::from("ERROR")));
1387        assert_eq!(rem2, Bytes::from("REST"));
1388    }
1389
1390    #[test]
1391    fn test_parse_frame_streamed_verbatim_string() {
1392        let input = Bytes::from("=?\r\n=9\r\ntxt:hello\r\nTAIL");
1393        let (frame, rem) = parse_frame(input.clone()).unwrap();
1394        assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
1395        let (chunk, rest) = parse_frame(rem.clone()).unwrap();
1396        assert_eq!(
1397            chunk,
1398            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello")) // Frame::VerbatimString {
1399                                                                            //     format: "txt".to_string(),
1400                                                                            //     content: "hello".to_string()
1401                                                                            // }
1402        );
1403        assert_eq!(rest, Bytes::from("TAIL"));
1404    }
1405
1406    #[test]
1407    fn test_parse_frame_streamed_array() {
1408        let input = Bytes::from("*?\r\n+one\r\n+two\r\n*0\r\nEND");
1409        let (header, rem) = parse_frame(input.clone()).unwrap();
1410        assert_eq!(header, Frame::StreamedArrayHeader);
1411        let (item1, rem2) = parse_frame(rem.clone()).unwrap();
1412        assert_eq!(item1, Frame::SimpleString(Bytes::from("one")));
1413        let (item2, rem3) = parse_frame(rem2.clone()).unwrap();
1414        assert_eq!(item2, Frame::SimpleString(Bytes::from("two")));
1415        let (terminator, rest) = parse_frame(rem3.clone()).unwrap();
1416        assert_eq!(terminator, Frame::Array(Some(vec![])));
1417        assert_eq!(rest, Bytes::from("END"));
1418    }
1419
1420    #[test]
1421    fn test_parse_frame_streamed_set_map_attr_push() {
1422        // Set
1423        let input_set = Bytes::from("~?\r\n+foo\r\n+bar\r\n~0\r\nTAIL");
1424        let (h_set, rem0) = parse_frame(input_set.clone()).unwrap();
1425        assert_eq!(h_set, Frame::StreamedSetHeader);
1426        let (s1, rem1) = parse_frame(rem0.clone()).unwrap();
1427        assert_eq!(s1, Frame::SimpleString(Bytes::from("foo")));
1428        let (s2, rem2) = parse_frame(rem1.clone()).unwrap();
1429        assert_eq!(s2, Frame::SimpleString(Bytes::from("bar")));
1430        let (term_set, rest_set) = parse_frame(rem2.clone()).unwrap();
1431        assert_eq!(term_set, Frame::Set(vec![]));
1432        assert_eq!(rest_set, Bytes::from("TAIL"));
1433        // Map
1434        let input_map = Bytes::from("%?\r\n+key\r\n+val\r\n%0\r\nNEXT");
1435        let (h_map, rem_map) = parse_frame(input_map.clone()).unwrap();
1436        assert_eq!(h_map, Frame::StreamedMapHeader);
1437        let (k, rem_map2) = parse_frame(rem_map.clone()).unwrap();
1438        assert_eq!(k, Frame::SimpleString(Bytes::from("key")));
1439        let (v, rem_map3) = parse_frame(rem_map2.clone()).unwrap();
1440        assert_eq!(v, Frame::SimpleString(Bytes::from("val")));
1441        let (term_map, rest_map4) = parse_frame(rem_map3.clone()).unwrap();
1442        assert_eq!(term_map, Frame::Map(vec![]));
1443        assert_eq!(rest_map4, Bytes::from("NEXT"));
1444        // Attribute
1445        let input_attr = Bytes::from("|?\r\n+meta\r\n+info\r\n|0\r\nMORE");
1446        let (h_attr, rem_attr) = parse_frame(input_attr.clone()).unwrap();
1447        assert_eq!(h_attr, Frame::StreamedAttributeHeader);
1448        let (a1, rem_attr2) = parse_frame(rem_attr.clone()).unwrap();
1449        assert_eq!(a1, Frame::SimpleString(Bytes::from("meta")));
1450        let (a2, rem_attr3) = parse_frame(rem_attr2.clone()).unwrap();
1451        assert_eq!(a2, Frame::SimpleString(Bytes::from("info")));
1452        let (term_attr, rest_attr) = parse_frame(rem_attr3.clone()).unwrap();
1453        assert_eq!(term_attr, Frame::Attribute(vec![]));
1454        assert_eq!(rest_attr, Bytes::from("MORE"));
1455        // Push
1456        let input_push = Bytes::from(">?\r\n:1\r\n:2\r\n>0\r\nEND");
1457        let (h_push, rem_push) = parse_frame(input_push.clone()).unwrap();
1458        assert_eq!(h_push, Frame::StreamedPushHeader);
1459        let (p1, rem_push2) = parse_frame(rem_push.clone()).unwrap();
1460        assert_eq!(p1, Frame::Integer(1));
1461        let (p2, rem_push3) = parse_frame(rem_push2.clone()).unwrap();
1462        assert_eq!(p2, Frame::Integer(2));
1463        let (term_push, rest_push) = parse_frame(rem_push3.clone()).unwrap();
1464        assert_eq!(term_push, Frame::Push(vec![]));
1465        assert_eq!(rest_push, Bytes::from("END"));
1466    }
1467
1468    #[test]
1469    fn test_parse_frame_stream_terminator() {
1470        let input = Bytes::from(".\r\nREST");
1471        let (frame, rest) = parse_frame(input.clone()).unwrap();
1472        assert_eq!(frame, Frame::StreamTerminator);
1473        assert_eq!(rest, Bytes::from("REST"));
1474    }
1475
1476    #[test]
1477    fn test_parse_frame_null_blob_error_rejected() {
1478        let input = Bytes::from("!-1\r\nTAIL");
1479        assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1480    }
1481
1482    #[test]
1483    fn test_parse_frame_null_verbatim_rejected() {
1484        let input = Bytes::from("=-1\r\nTAIL");
1485        assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1486    }
1487
1488    #[test]
1489    fn test_verbatim_string_format_must_be_3_bytes() {
1490        // Too short (1 byte format)
1491        let input = Bytes::from("=6\r\nx:data\r\n");
1492        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1493
1494        // Too long (4 byte format)
1495        let input = Bytes::from("=9\r\ntxtx:data\r\n");
1496        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1497
1498        // Empty format (colon at start)
1499        let input = Bytes::from("=5\r\n:data\r\n");
1500        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1501
1502        // Valid 3-byte format should still work
1503        let input = Bytes::from("=8\r\ntxt:data\r\n");
1504        let (frame, _) = parse_frame(input).unwrap();
1505        assert_eq!(
1506            frame,
1507            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("data"))
1508        );
1509    }
1510
1511    #[test]
1512    fn test_parse_frame_special_float_nan() {
1513        let input = Bytes::from(",nan\r\nTAIL");
1514        let (frame, rest) = parse_frame(input.clone()).unwrap();
1515        assert_eq!(frame, Frame::SpecialFloat(Bytes::from("nan")));
1516        assert_eq!(rest, Bytes::from("TAIL"));
1517    }
1518
1519    #[test]
1520    fn test_parse_frame_big_number_zero() {
1521        let input = Bytes::from("(0\r\nEND");
1522        let (frame, rest) = parse_frame(input.clone()).unwrap();
1523        assert_eq!(frame, Frame::BigNumber(Bytes::from("0")));
1524        assert_eq!(rest, Bytes::from("END"));
1525    }
1526
1527    #[test]
1528    fn test_parse_frame_collection_empty() {
1529        let input_push = Bytes::from(">0\r\nTAIL");
1530        let (f_push, r_push) = parse_frame(input_push.clone()).unwrap();
1531        assert_eq!(f_push, Frame::Push(vec![]));
1532        assert_eq!(r_push, Bytes::from("TAIL"));
1533        let input_attr = Bytes::from("|0\r\nAFTER");
1534        let (f_attr, r_attr) = parse_frame(input_attr.clone()).unwrap();
1535        assert_eq!(f_attr, Frame::Attribute(vec![]));
1536        assert_eq!(r_attr, Bytes::from("AFTER"));
1537        let input_map = Bytes::from("%0\r\nEND");
1538        let (f_map, r_map) = parse_frame(input_map.clone()).unwrap();
1539        assert_eq!(f_map, Frame::Map(vec![]));
1540        assert_eq!(r_map, Bytes::from("END"));
1541        let input_set = Bytes::from("~0\r\nDONE");
1542        let (f_set, r_set) = parse_frame(input_set.clone()).unwrap();
1543        assert_eq!(f_set, Frame::Set(vec![]));
1544        assert_eq!(r_set, Bytes::from("DONE"));
1545        let input_arr = Bytes::from("*-1\r\nFIN");
1546        let (f_arr, r_arr) = parse_frame(input_arr.clone()).unwrap();
1547        assert_eq!(f_arr, Frame::Array(None));
1548        assert_eq!(r_arr, Bytes::from("FIN"));
1549    }
1550
1551    // Round-trip tests for serialization and parsing
1552
1553    #[test]
1554    fn test_roundtrip_simple_string() {
1555        let original = Bytes::from("+hello\r\n");
1556        let (frame, _) = parse_frame(original.clone()).unwrap();
1557        let serialized = frame_to_bytes(&frame);
1558        assert_eq!(original, serialized);
1559
1560        let (reparsed, _) = parse_frame(serialized).unwrap();
1561        assert_eq!(frame, reparsed);
1562    }
1563
1564    #[test]
1565    fn test_roundtrip_error() {
1566        let original = Bytes::from("-ERR error message\r\n");
1567        let (frame, _) = parse_frame(original.clone()).unwrap();
1568        let serialized = frame_to_bytes(&frame);
1569        assert_eq!(original, serialized);
1570
1571        let (reparsed, _) = parse_frame(serialized).unwrap();
1572        assert_eq!(frame, reparsed);
1573    }
1574
1575    #[test]
1576    fn test_roundtrip_integer() {
1577        let original = Bytes::from(":12345\r\n");
1578        let (frame, _) = parse_frame(original.clone()).unwrap();
1579        let serialized = frame_to_bytes(&frame);
1580        assert_eq!(original, serialized);
1581
1582        let (reparsed, _) = parse_frame(serialized).unwrap();
1583        assert_eq!(frame, reparsed);
1584    }
1585
1586    #[test]
1587    fn test_roundtrip_bulk_string() {
1588        let original = Bytes::from("$5\r\nhello\r\n");
1589        let (frame, _) = parse_frame(original.clone()).unwrap();
1590        let serialized = frame_to_bytes(&frame);
1591        assert_eq!(original, serialized);
1592
1593        let (reparsed, _) = parse_frame(serialized).unwrap();
1594        assert_eq!(frame, reparsed);
1595
1596        // Test null bulk string
1597        let original_null = Bytes::from("$-1\r\n");
1598        let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1599        let serialized_null = frame_to_bytes(&frame_null);
1600        assert_eq!(original_null, serialized_null);
1601
1602        let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1603        assert_eq!(frame_null, reparsed_null);
1604    }
1605
1606    #[test]
1607    fn test_roundtrip_blob_error() {
1608        let original = Bytes::from("!5\r\nerror\r\n");
1609        let (frame, _) = parse_frame(original.clone()).unwrap();
1610        let serialized = frame_to_bytes(&frame);
1611        assert_eq!(original, serialized);
1612
1613        let (reparsed, _) = parse_frame(serialized).unwrap();
1614        assert_eq!(frame, reparsed);
1615    }
1616
1617    #[test]
1618    fn test_roundtrip_null() {
1619        let original = Bytes::from("_\r\n");
1620        let (frame, _) = parse_frame(original.clone()).unwrap();
1621        let serialized = frame_to_bytes(&frame);
1622        assert_eq!(original, serialized);
1623
1624        let (reparsed, _) = parse_frame(serialized).unwrap();
1625        assert_eq!(frame, reparsed);
1626    }
1627
1628    #[test]
1629    fn test_roundtrip_double() {
1630        let original = Bytes::from(",3.14159\r\n");
1631        let (frame, _) = parse_frame(original.clone()).unwrap();
1632        let serialized = frame_to_bytes(&frame);
1633
1634        // Note: The exact string representation of floating point numbers might differ
1635        // between parsing and serializing due to formatting differences
1636        let (reparsed, _) = parse_frame(serialized).unwrap();
1637        assert_eq!(frame, reparsed);
1638    }
1639
1640    #[test]
1641    fn test_roundtrip_special_float() {
1642        let original = Bytes::from(",inf\r\n");
1643        let (frame, _) = parse_frame(original.clone()).unwrap();
1644        let serialized = frame_to_bytes(&frame);
1645        assert_eq!(original, serialized);
1646
1647        let (reparsed, _) = parse_frame(serialized).unwrap();
1648        assert_eq!(frame, reparsed);
1649    }
1650
1651    #[test]
1652    fn test_roundtrip_boolean() {
1653        let original_true = Bytes::from("#t\r\n");
1654        let (frame_true, _) = parse_frame(original_true.clone()).unwrap();
1655        let serialized_true = frame_to_bytes(&frame_true);
1656        assert_eq!(original_true, serialized_true);
1657
1658        let (reparsed_true, _) = parse_frame(serialized_true).unwrap();
1659        assert_eq!(frame_true, reparsed_true);
1660
1661        let original_false = Bytes::from("#f\r\n");
1662        let (frame_false, _) = parse_frame(original_false.clone()).unwrap();
1663        let serialized_false = frame_to_bytes(&frame_false);
1664        assert_eq!(original_false, serialized_false);
1665
1666        let (reparsed_false, _) = parse_frame(serialized_false).unwrap();
1667        assert_eq!(frame_false, reparsed_false);
1668    }
1669
1670    #[test]
1671    fn test_roundtrip_big_number() {
1672        let original = Bytes::from("(12345678901234567890\r\n");
1673        let (frame, _) = parse_frame(original.clone()).unwrap();
1674        let serialized = frame_to_bytes(&frame);
1675        assert_eq!(original, serialized);
1676
1677        let (reparsed, _) = parse_frame(serialized).unwrap();
1678        assert_eq!(frame, reparsed);
1679    }
1680
1681    #[test]
1682    fn test_roundtrip_verbatim_string() {
1683        let original = Bytes::from("=10\r\ntxt:hello!\r\n");
1684        let (frame, _) = parse_frame(original.clone()).unwrap();
1685        let serialized = frame_to_bytes(&frame);
1686        assert_eq!(original, serialized);
1687
1688        let (reparsed, _) = parse_frame(serialized).unwrap();
1689        assert_eq!(frame, reparsed);
1690    }
1691
1692    #[test]
1693    fn test_roundtrip_array() {
1694        let original = Bytes::from("*2\r\n+hello\r\n:123\r\n");
1695        let (frame, _) = parse_frame(original.clone()).unwrap();
1696        let serialized = frame_to_bytes(&frame);
1697        assert_eq!(original, serialized);
1698
1699        let (reparsed, _) = parse_frame(serialized).unwrap();
1700        assert_eq!(frame, reparsed);
1701
1702        // Test null array
1703        let original_null = Bytes::from("*-1\r\n");
1704        let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1705        let serialized_null = frame_to_bytes(&frame_null);
1706        assert_eq!(original_null, serialized_null);
1707
1708        let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1709        assert_eq!(frame_null, reparsed_null);
1710    }
1711
1712    #[test]
1713    fn test_roundtrip_set() {
1714        let original = Bytes::from("~2\r\n+one\r\n+two\r\n");
1715        let (frame, _) = parse_frame(original.clone()).unwrap();
1716        let serialized = frame_to_bytes(&frame);
1717        assert_eq!(original, serialized);
1718
1719        let (reparsed, _) = parse_frame(serialized).unwrap();
1720        assert_eq!(frame, reparsed);
1721    }
1722
1723    #[test]
1724    fn test_roundtrip_map() {
1725        let original = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\n");
1726        let (frame, _) = parse_frame(original.clone()).unwrap();
1727        let serialized = frame_to_bytes(&frame);
1728        assert_eq!(original, serialized);
1729
1730        let (reparsed, _) = parse_frame(serialized).unwrap();
1731        assert_eq!(frame, reparsed);
1732    }
1733
1734    #[test]
1735    fn test_roundtrip_attribute() {
1736        let original = Bytes::from("|1\r\n+key\r\n+val\r\n");
1737        let (frame, _) = parse_frame(original.clone()).unwrap();
1738        let serialized = frame_to_bytes(&frame);
1739        assert_eq!(original, serialized);
1740
1741        let (reparsed, _) = parse_frame(serialized).unwrap();
1742        assert_eq!(frame, reparsed);
1743    }
1744
1745    #[test]
1746    fn test_roundtrip_push() {
1747        let original = Bytes::from(">2\r\n+msg\r\n+data\r\n");
1748        let (frame, _) = parse_frame(original.clone()).unwrap();
1749        let serialized = frame_to_bytes(&frame);
1750        assert_eq!(original, serialized);
1751
1752        let (reparsed, _) = parse_frame(serialized).unwrap();
1753        assert_eq!(frame, reparsed);
1754    }
1755
1756    #[test]
1757    fn test_roundtrip_streaming_headers() {
1758        let headers = [
1759            ("$?\r\n", Frame::StreamedStringHeader),
1760            ("!?\r\n", Frame::StreamedBlobErrorHeader),
1761            ("=?\r\n", Frame::StreamedVerbatimStringHeader),
1762            ("*?\r\n", Frame::StreamedArrayHeader),
1763            ("~?\r\n", Frame::StreamedSetHeader),
1764            ("%?\r\n", Frame::StreamedMapHeader),
1765            ("|?\r\n", Frame::StreamedAttributeHeader),
1766            (">?\r\n", Frame::StreamedPushHeader),
1767            (".\r\n", Frame::StreamTerminator),
1768        ];
1769
1770        for (original_str, expected_frame) in headers {
1771            let original = Bytes::from(original_str);
1772            let (frame, _) = parse_frame(original.clone()).unwrap();
1773            assert_eq!(frame, expected_frame);
1774
1775            let serialized = frame_to_bytes(&frame);
1776            assert_eq!(original, serialized);
1777
1778            let (reparsed, _) = parse_frame(serialized).unwrap();
1779            assert_eq!(frame, reparsed);
1780        }
1781    }
1782
1783    #[test]
1784    fn test_roundtrip_streaming_chunks() {
1785        let chunks = [
1786            (
1787                ";4\r\nHell\r\n",
1788                Frame::StreamedStringChunk(Bytes::from("Hell")),
1789            ),
1790            (
1791                ";5\r\no wor\r\n",
1792                Frame::StreamedStringChunk(Bytes::from("o wor")),
1793            ),
1794            (";1\r\nd\r\n", Frame::StreamedStringChunk(Bytes::from("d"))),
1795            (";0\r\n\r\n", Frame::StreamedStringChunk(Bytes::new())),
1796            (
1797                ";11\r\nHello World\r\n",
1798                Frame::StreamedStringChunk(Bytes::from("Hello World")),
1799            ),
1800        ];
1801
1802        for (original_str, expected_frame) in chunks {
1803            let original = Bytes::from(original_str);
1804            let (frame, rest) = parse_frame(original.clone()).unwrap();
1805            assert_eq!(frame, expected_frame);
1806            assert!(rest.is_empty());
1807
1808            let serialized = frame_to_bytes(&frame);
1809            assert_eq!(original, serialized);
1810
1811            let (reparsed, _) = parse_frame(serialized).unwrap();
1812            assert_eq!(frame, reparsed);
1813        }
1814    }
1815
1816    #[test]
1817    fn test_streaming_chunks_edge_cases() {
1818        // Test incomplete chunk (missing data)
1819        let data = Bytes::from(";4\r\nHel");
1820        let result = parse_frame(data);
1821        assert!(matches!(result, Err(ParseError::Incomplete)));
1822
1823        // Test incomplete chunk (missing CRLF)
1824        let data = Bytes::from(";4\r\nHell");
1825        let result = parse_frame(data);
1826        assert!(matches!(result, Err(ParseError::Incomplete)));
1827
1828        // Test invalid length format
1829        let data = Bytes::from(";abc\r\ndata\r\n");
1830        let result = parse_frame(data);
1831        assert!(matches!(result, Err(ParseError::BadLength)));
1832
1833        // Test negative length
1834        let data = Bytes::from(";-1\r\ndata\r\n");
1835        let result = parse_frame(data);
1836        assert!(matches!(result, Err(ParseError::BadLength)));
1837
1838        // Test length mismatch (length says 5 but only 4 bytes)
1839        let data = Bytes::from(";5\r\nHell\r\n");
1840        let result = parse_frame(data);
1841        assert!(matches!(result, Err(ParseError::Incomplete)));
1842
1843        // Test zero-length chunk without trailing CRLF returns Incomplete
1844        let data = Bytes::from(";0\r\n");
1845        let result = parse_frame(data);
1846        assert!(matches!(result, Err(ParseError::Incomplete)));
1847
1848        // Test binary data in chunk
1849        let binary_data = b"\x00\x01\x02\x03\xFF";
1850        let mut chunk_data = Vec::new();
1851        chunk_data.extend_from_slice(b";5\r\n");
1852        chunk_data.extend_from_slice(binary_data);
1853        chunk_data.extend_from_slice(b"\r\n");
1854        let data = Bytes::from(chunk_data);
1855        let result = parse_frame(data);
1856        assert!(result.is_ok());
1857        let (frame, _) = result.unwrap();
1858        if let Frame::StreamedStringChunk(chunk) = frame {
1859            assert_eq!(chunk.as_ref(), binary_data);
1860        }
1861    }
1862
1863    #[test]
1864    fn test_roundtrip_streaming_sequences() {
1865        // Test streaming string roundtrip
1866        let streaming_string = Frame::StreamedString(vec![
1867            Bytes::from("Hell"),
1868            Bytes::from("o wor"),
1869            Bytes::from("ld"),
1870        ]);
1871        let serialized = frame_to_bytes(&streaming_string);
1872        let expected = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;2\r\nld\r\n;0\r\n\r\n";
1873        assert_eq!(serialized, Bytes::from(expected));
1874
1875        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1876        assert_eq!(parsed, streaming_string);
1877
1878        // Test streaming array roundtrip
1879        let streaming_array = Frame::StreamedArray(vec![
1880            Frame::SimpleString(Bytes::from("hello")),
1881            Frame::Integer(42),
1882            Frame::Boolean(true),
1883        ]);
1884        let serialized = frame_to_bytes(&streaming_array);
1885        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1886        assert_eq!(parsed, streaming_array);
1887
1888        // Test streaming map roundtrip
1889        let streaming_map = Frame::StreamedMap(vec![
1890            (
1891                Frame::SimpleString(Bytes::from("key1")),
1892                Frame::SimpleString(Bytes::from("val1")),
1893            ),
1894            (
1895                Frame::SimpleString(Bytes::from("key2")),
1896                Frame::Integer(123),
1897            ),
1898        ]);
1899        let serialized = frame_to_bytes(&streaming_map);
1900        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1901        assert_eq!(parsed, streaming_map);
1902
1903        // Test empty streaming string
1904        let empty_streaming = Frame::StreamedString(vec![]);
1905        let serialized = frame_to_bytes(&empty_streaming);
1906        let expected = "$?\r\n;0\r\n\r\n";
1907        assert_eq!(serialized, Bytes::from(expected));
1908        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1909        assert_eq!(parsed, empty_streaming);
1910
1911        // Test streaming set roundtrip
1912        let streaming_set = Frame::StreamedSet(vec![
1913            Frame::SimpleString(Bytes::from("apple")),
1914            Frame::SimpleString(Bytes::from("banana")),
1915            Frame::Integer(42),
1916        ]);
1917        let serialized = frame_to_bytes(&streaming_set);
1918        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1919        assert_eq!(parsed, streaming_set);
1920
1921        // Test streaming attribute roundtrip
1922        let streaming_attribute = Frame::StreamedAttribute(vec![
1923            (
1924                Frame::SimpleString(Bytes::from("trace-id")),
1925                Frame::SimpleString(Bytes::from("abc123")),
1926            ),
1927            (
1928                Frame::SimpleString(Bytes::from("span-id")),
1929                Frame::SimpleString(Bytes::from("def456")),
1930            ),
1931        ]);
1932        let serialized = frame_to_bytes(&streaming_attribute);
1933        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1934        assert_eq!(parsed, streaming_attribute);
1935
1936        // Test streaming push roundtrip
1937        let streaming_push = Frame::StreamedPush(vec![
1938            Frame::SimpleString(Bytes::from("pubsub")),
1939            Frame::SimpleString(Bytes::from("channel1")),
1940            Frame::SimpleString(Bytes::from("message data")),
1941        ]);
1942        let serialized = frame_to_bytes(&streaming_push);
1943        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1944        assert_eq!(parsed, streaming_push);
1945
1946        // Test empty streaming containers
1947        let empty_array = Frame::StreamedArray(vec![]);
1948        let serialized = frame_to_bytes(&empty_array);
1949        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1950        assert_eq!(parsed, empty_array);
1951
1952        let empty_set = Frame::StreamedSet(vec![]);
1953        let serialized = frame_to_bytes(&empty_set);
1954        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1955        assert_eq!(parsed, empty_set);
1956    }
1957
1958    #[test]
1959    fn test_streaming_sequences_edge_cases() {
1960        // Test incomplete streaming string (missing zero-length terminator)
1961        let data = Bytes::from("$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n");
1962        let result = parse_streaming_sequence(data);
1963        assert!(matches!(result, Err(ParseError::Incomplete)));
1964
1965        // Test malformed chunk in streaming sequence
1966        let data = Bytes::from("$?\r\n;abc\r\nHell\r\n;0\r\n");
1967        let result = parse_streaming_sequence(data);
1968        assert!(matches!(result, Err(ParseError::BadLength)));
1969
1970        // Test streaming array with incomplete terminator
1971        let data = Bytes::from("*?\r\n+hello\r\n:42\r\n");
1972        let result = parse_streaming_sequence(data);
1973        assert!(matches!(result, Err(ParseError::Incomplete)));
1974
1975        // Test mixed streaming and non-streaming content
1976        let data = Bytes::from("*?\r\n+hello\r\n*2\r\n:1\r\n:2\r\n.\r\n");
1977        let result = parse_streaming_sequence(data);
1978        assert!(result.is_ok());
1979        let (frame, _) = result.unwrap();
1980        if let Frame::StreamedArray(items) = frame {
1981            assert_eq!(items.len(), 2);
1982            assert!(matches!(items[0], Frame::SimpleString(_)));
1983            assert!(matches!(items[1], Frame::Array(_)));
1984        }
1985
1986        // Test empty streaming containers
1987        let data = Bytes::from("*?\r\n.\r\n");
1988        let result = parse_streaming_sequence(data);
1989        assert!(result.is_ok());
1990        let (frame, _) = result.unwrap();
1991        if let Frame::StreamedArray(items) = frame {
1992            assert!(items.is_empty());
1993        }
1994
1995        // Test streaming map with odd number of elements
1996        let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+orphan\r\n.\r\n");
1997        let result = parse_streaming_sequence(data);
1998        assert!(matches!(result, Err(ParseError::InvalidFormat)));
1999
2000        // Test non-streaming frame passed to parse_streaming_sequence
2001        let data = Bytes::from("+simple\r\n");
2002        let result = parse_streaming_sequence(data);
2003        assert!(result.is_ok());
2004        let (frame, _) = result.unwrap();
2005        assert!(matches!(frame, Frame::SimpleString(_)));
2006
2007        // Test extremely large chunk size (should fail gracefully)
2008        let data = Bytes::from(";999999999999999999\r\ndata\r\n");
2009        let result = parse_frame(data);
2010        // The parsing might succeed but fail later when trying to read the data
2011        // Let's check what actually happens and accept either BadLength or Incomplete
2012        match &result {
2013            Err(ParseError::BadLength) => {}  // Expected
2014            Err(ParseError::Incomplete) => {} // Also acceptable - might not have enough data for huge chunk
2015            Err(e) => panic!("Got unexpected error type: {e:?}"),
2016            Ok(_) => panic!("Large chunk size should fail"),
2017        }
2018
2019        // Test streaming string with non-chunk frame mixed in
2020        let data = Bytes::from("$?\r\n+invalid\r\n;0\r\n");
2021        let result = parse_streaming_sequence(data);
2022        assert!(matches!(result, Err(ParseError::InvalidFormat)));
2023
2024        // Test streaming sequence with corrupted terminator
2025        let data = Bytes::from("*?\r\n+hello\r\n.corrupted\r\n");
2026        let result = parse_streaming_sequence(data);
2027        assert!(matches!(result, Err(ParseError::Incomplete)));
2028
2029        // Test empty input to parse_streaming_sequence
2030        let data = Bytes::new();
2031        let result = parse_streaming_sequence(data);
2032        assert!(matches!(result, Err(ParseError::Incomplete)));
2033
2034        // Test streaming sequence with partial frame at end
2035        let data = Bytes::from("*?\r\n+hello\r\n$5\r\nwo");
2036        let result = parse_streaming_sequence(data);
2037        assert!(matches!(result, Err(ParseError::Incomplete)));
2038    }
2039
2040    #[test]
2041    fn test_roundtrip_nested_structures() {
2042        // Test a complex nested structure
2043        let original = Bytes::from(
2044            "*3\r\n+hello\r\n%2\r\n+key1\r\n:123\r\n+key2\r\n~1\r\n+item\r\n|1\r\n+meta\r\n+data\r\n",
2045        );
2046        let (frame, _) = parse_frame(original.clone()).unwrap();
2047        let serialized = frame_to_bytes(&frame);
2048
2049        let (reparsed, _) = parse_frame(serialized).unwrap();
2050        assert_eq!(frame, reparsed);
2051    }
2052
2053    #[test]
2054    fn test_zero_length_bulk_string_requires_trailing_crlf() {
2055        // Complete: $0\r\n\r\n
2056        let input = Bytes::from("$0\r\n\r\nTAIL");
2057        let (frame, rest) = parse_frame(input).unwrap();
2058        assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
2059        assert_eq!(rest, Bytes::from("TAIL"));
2060
2061        // Incomplete: $0\r\n with no trailing data
2062        let input = Bytes::from("$0\r\n");
2063        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2064
2065        // Incomplete: $0\r\n with only one byte
2066        let input = Bytes::from("$0\r\n\r");
2067        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2068
2069        // Invalid: $0\r\n followed by non-CRLF
2070        let input = Bytes::from("$0\r\nXY");
2071        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2072    }
2073
2074    #[test]
2075    fn test_zero_length_streamed_chunk_requires_trailing_crlf() {
2076        // Complete: ;0\r\n\r\n
2077        let input = Bytes::from(";0\r\n\r\nTAIL");
2078        let (frame, rest) = parse_frame(input).unwrap();
2079        assert_eq!(frame, Frame::StreamedStringChunk(Bytes::new()));
2080        assert_eq!(rest, Bytes::from("TAIL"));
2081
2082        // Incomplete: ;0\r\n with no trailing data
2083        let input = Bytes::from(";0\r\n");
2084        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2085
2086        // Invalid: ;0\r\n followed by non-CRLF
2087        let input = Bytes::from(";0\r\nXY");
2088        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2089    }
2090
2091    #[test]
2092    fn test_integer_overflow_returns_overflow_error() {
2093        // One past i64::MAX
2094        let input = Bytes::from(":9223372036854775808\r\n");
2095        assert_eq!(parse_frame(input), Err(ParseError::Overflow));
2096
2097        // i64::MAX should succeed
2098        let input = Bytes::from(":9223372036854775807\r\n");
2099        let (frame, _) = parse_frame(input).unwrap();
2100        assert_eq!(frame, Frame::Integer(i64::MAX));
2101
2102        // i64::MIN should succeed
2103        let input = Bytes::from(":-9223372036854775808\r\n");
2104        let (frame, _) = parse_frame(input).unwrap();
2105        assert_eq!(frame, Frame::Integer(i64::MIN));
2106    }
2107
2108    #[test]
2109    fn test_parser_propagates_errors() {
2110        let mut parser = Parser::new();
2111        parser.feed(Bytes::from("XINVALID\r\n"));
2112        let result = parser.next_frame();
2113        assert!(result.is_err());
2114        assert_eq!(result.unwrap_err(), ParseError::InvalidTag(b'X'));
2115    }
2116
2117    #[test]
2118    fn test_parser_returns_ok_none_for_incomplete() {
2119        let mut parser = Parser::new();
2120        parser.feed(Bytes::from("+HELL"));
2121        assert_eq!(parser.next_frame().unwrap(), None);
2122    }
2123
2124    #[test]
2125    fn test_integer_negative_overflow() {
2126        // One past i64::MIN
2127        assert!(parse_frame(Bytes::from(":-9223372036854775809\r\n")).is_err());
2128    }
2129
2130    #[test]
2131    fn test_nonempty_bulk_malformed_terminator() {
2132        // Not enough data after payload
2133        assert_eq!(
2134            parse_frame(Bytes::from("$3\r\nfoo")),
2135            Err(ParseError::Incomplete)
2136        );
2137        // Only one byte after payload
2138        assert_eq!(
2139            parse_frame(Bytes::from("$3\r\nfooX")),
2140            Err(ParseError::Incomplete)
2141        );
2142        // Two bytes present but wrong
2143        assert_eq!(
2144            parse_frame(Bytes::from("$3\r\nfooXY")),
2145            Err(ParseError::InvalidFormat)
2146        );
2147    }
2148
2149    #[test]
2150    fn test_blob_error_malformed_terminator() {
2151        assert_eq!(
2152            parse_frame(Bytes::from("!3\r\nerr")),
2153            Err(ParseError::Incomplete)
2154        );
2155        assert_eq!(
2156            parse_frame(Bytes::from("!3\r\nerrXY")),
2157            Err(ParseError::InvalidFormat)
2158        );
2159    }
2160
2161    #[test]
2162    fn test_verbatim_string_malformed_terminator() {
2163        assert_eq!(
2164            parse_frame(Bytes::from("=8\r\ntxt:data")),
2165            Err(ParseError::Incomplete)
2166        );
2167        assert_eq!(
2168            parse_frame(Bytes::from("=8\r\ntxt:dataXY")),
2169            Err(ParseError::InvalidFormat)
2170        );
2171    }
2172
2173    #[test]
2174    fn test_streamed_chunk_malformed_terminator() {
2175        assert_eq!(
2176            parse_frame(Bytes::from(";3\r\nabc")),
2177            Err(ParseError::Incomplete)
2178        );
2179        assert_eq!(
2180            parse_frame(Bytes::from(";3\r\nabcXY")),
2181            Err(ParseError::InvalidFormat)
2182        );
2183    }
2184
2185    #[test]
2186    fn test_bulk_string_size_limit() {
2187        // Over MAX_BULK_STRING_SIZE (512 MB)
2188        assert_eq!(
2189            parse_frame(Bytes::from("$536870913\r\n")),
2190            Err(ParseError::BadLength)
2191        );
2192    }
2193
2194    #[test]
2195    fn test_blob_error_size_limit() {
2196        assert_eq!(
2197            parse_frame(Bytes::from("!536870913\r\n")),
2198            Err(ParseError::BadLength)
2199        );
2200    }
2201
2202    #[test]
2203    fn test_verbatim_string_size_limit() {
2204        assert_eq!(
2205            parse_frame(Bytes::from("=536870913\r\n")),
2206            Err(ParseError::BadLength)
2207        );
2208    }
2209
2210    #[test]
2211    fn test_streamed_chunk_size_limit() {
2212        assert_eq!(
2213            parse_frame(Bytes::from(";536870913\r\n")),
2214            Err(ParseError::BadLength)
2215        );
2216    }
2217
2218    #[test]
2219    fn test_invalid_double() {
2220        assert_eq!(
2221            parse_frame(Bytes::from(",foo\r\n")),
2222            Err(ParseError::InvalidFormat)
2223        );
2224    }
2225
2226    #[test]
2227    fn test_invalid_boolean() {
2228        assert_eq!(
2229            parse_frame(Bytes::from("#\r\n")),
2230            Err(ParseError::InvalidBoolean)
2231        );
2232        assert_eq!(
2233            parse_frame(Bytes::from("#true\r\n")),
2234            Err(ParseError::InvalidBoolean)
2235        );
2236    }
2237
2238    #[test]
2239    fn test_parser_clears_buffer_on_error() {
2240        let mut parser = Parser::new();
2241        parser.feed(Bytes::from("X\r\n"));
2242        assert_eq!(parser.next_frame(), Err(ParseError::InvalidTag(b'X')));
2243        assert_eq!(parser.buffered_bytes(), 0);
2244    }
2245
2246    #[test]
2247    fn test_parser_recovers_after_error() {
2248        let mut parser = Parser::new();
2249        parser.feed(Bytes::from("X\r\n"));
2250        assert!(parser.next_frame().is_err());
2251        assert_eq!(parser.buffered_bytes(), 0);
2252
2253        parser.feed(Bytes::from("+OK\r\n"));
2254        let frame = parser.next_frame().unwrap().unwrap();
2255        assert_eq!(frame, Frame::SimpleString(Bytes::from("OK")));
2256    }
2257
2258    #[test]
2259    fn test_streaming_set_roundtrip() {
2260        let data = Bytes::from("~?\r\n+a\r\n+b\r\n+c\r\n.\r\n");
2261        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2262        assert_eq!(
2263            frame,
2264            Frame::StreamedSet(vec![
2265                Frame::SimpleString(Bytes::from("a")),
2266                Frame::SimpleString(Bytes::from("b")),
2267                Frame::SimpleString(Bytes::from("c")),
2268            ])
2269        );
2270        assert!(rest.is_empty());
2271    }
2272
2273    #[test]
2274    fn test_streaming_attribute_roundtrip() {
2275        let data = Bytes::from("|?\r\n+key\r\n+val\r\n.\r\n");
2276        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2277        assert_eq!(
2278            frame,
2279            Frame::StreamedAttribute(vec![(
2280                Frame::SimpleString(Bytes::from("key")),
2281                Frame::SimpleString(Bytes::from("val")),
2282            )])
2283        );
2284        assert!(rest.is_empty());
2285    }
2286
2287    #[test]
2288    fn test_streaming_push_roundtrip() {
2289        let data = Bytes::from(">?\r\n+pubsub\r\n+channel\r\n+message\r\n.\r\n");
2290        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2291        assert_eq!(
2292            frame,
2293            Frame::StreamedPush(vec![
2294                Frame::SimpleString(Bytes::from("pubsub")),
2295                Frame::SimpleString(Bytes::from("channel")),
2296                Frame::SimpleString(Bytes::from("message")),
2297            ])
2298        );
2299        assert!(rest.is_empty());
2300    }
2301
2302    #[test]
2303    fn test_empty_streaming_containers() {
2304        // Empty streaming string
2305        let data = Bytes::from("$?\r\n;0\r\n\r\n");
2306        let (frame, _) = parse_streaming_sequence(data).unwrap();
2307        assert_eq!(frame, Frame::StreamedString(vec![]));
2308
2309        // Empty streaming array
2310        let data = Bytes::from("*?\r\n.\r\n");
2311        let (frame, _) = parse_streaming_sequence(data).unwrap();
2312        assert_eq!(frame, Frame::StreamedArray(vec![]));
2313
2314        // Empty streaming set
2315        let data = Bytes::from("~?\r\n.\r\n");
2316        let (frame, _) = parse_streaming_sequence(data).unwrap();
2317        assert_eq!(frame, Frame::StreamedSet(vec![]));
2318
2319        // Empty streaming map
2320        let data = Bytes::from("%?\r\n.\r\n");
2321        let (frame, _) = parse_streaming_sequence(data).unwrap();
2322        assert_eq!(frame, Frame::StreamedMap(vec![]));
2323    }
2324
2325    #[test]
2326    fn test_streaming_attribute_odd_elements_errors() {
2327        let data = Bytes::from("|?\r\n+key\r\n+val\r\n+orphan\r\n.\r\n");
2328        let result = parse_streaming_sequence(data);
2329        assert!(matches!(result, Err(ParseError::InvalidFormat)));
2330    }
2331
2332    #[test]
2333    fn test_streaming_blob_error_header_passthrough() {
2334        // Blob error streaming is not supported; header is passed through
2335        let data = Bytes::from("!?\r\n!5\r\nERROR\r\n");
2336        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2337        assert_eq!(frame, Frame::StreamedBlobErrorHeader);
2338        // Rest contains the subsequent data
2339        assert!(!rest.is_empty());
2340    }
2341
2342    #[test]
2343    fn test_streaming_verbatim_header_passthrough() {
2344        // Verbatim string streaming is not supported; header is passed through
2345        let data = Bytes::from("=?\r\n=9\r\ntxt:hello\r\n");
2346        let (frame, rest) = parse_streaming_sequence(data).unwrap();
2347        assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
2348        assert!(!rest.is_empty());
2349    }
2350}