Skip to main content

resp_rs/
resp3.rs

1//! Zero-copy RESP3 parser.
2//!
3//! Parses RESP3 frames using `bytes::Bytes` for efficient, zero-copy operation.
4//! Supports all RESP3 data types, including fixed-length and streaming variants.
5
6use bytes::{BufMut, Bytes, BytesMut};
7
8/// Maximum reasonable size for collections to prevent DoS attacks
9/// Set to 10 million elements (reasonable for real-world Redis usage)
10const MAX_COLLECTION_SIZE: usize = 10_000_000;
11
12/// A streaming parser for RESP3 frames.
13///
14/// This parser allows feeding data in chunks and extracting frames as they become available.
15/// It maintains an internal buffer of accumulated data and attempts to parse frames from it.
16#[derive(Default, Debug)]
17pub struct Parser {
18    buffer: BytesMut,
19}
20
21impl Parser {
22    /// Creates a new parser with an empty buffer.
23    pub fn new() -> Self {
24        Self {
25            buffer: BytesMut::new(),
26        }
27    }
28
29    /// Feeds a chunk of data into the parser.
30    ///
31    /// The data is appended to the internal buffer.
32    pub fn feed(&mut self, data: Bytes) {
33        self.buffer.extend_from_slice(&data);
34    }
35
36    /// Attempts to extract the next complete frame from the buffer.
37    ///
38    /// Returns `Ok(None)` if there is not enough data to parse a complete frame.
39    /// Returns `Ok(Some(frame))` on success, consuming the parsed bytes.
40    /// Returns `Err` on protocol errors, clearing the buffer.
41    pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
42        if self.buffer.is_empty() {
43            return Ok(None);
44        }
45
46        let bytes = self.buffer.split().freeze();
47
48        match parse_frame_inner(&bytes, 0) {
49            Ok((frame, consumed)) => {
50                if consumed < bytes.len() {
51                    self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
52                }
53                Ok(Some(frame))
54            }
55            Err(ParseError::Incomplete) => {
56                self.buffer.unsplit(bytes.into());
57                Ok(None)
58            }
59            Err(e) => Err(e),
60        }
61    }
62
63    /// Returns the number of bytes currently in the buffer.
64    pub fn buffered_bytes(&self) -> usize {
65        self.buffer.len()
66    }
67
68    /// Clears the internal buffer.
69    pub fn clear(&mut self) {
70        self.buffer.clear();
71    }
72}
73
74// --- Zero-copy Frame enum and parser using bytes::Bytes ---
75/// A parsed RESP3 frame.
76///
77/// Each variant corresponds to one of the RESP3 types, including both fixed-length
78/// and streaming headers for bulk strings, arrays, sets, maps, attributes, and pushes.
79#[derive(Debug, Clone, PartialEq)]
80pub enum Frame {
81    /// Simple string: +&lt;string&gt;\r\n
82    SimpleString(Bytes),
83    /// Simple error: -&lt;error&gt;\r\n
84    Error(Bytes),
85    /// Integer: :&lt;number&gt;\r\n
86    Integer(i64),
87    /// Blob string: $&lt;length&gt;\r\n&lt;bytes&gt;\r\n
88    // BulkString(Option<Vec<u8>>),
89    BulkString(Option<Bytes>),
90    /// Blob error: !&lt;length&gt;\r\n&lt;bytes&gt;\r\n
91    BlobError(Bytes),
92    /// Streaming blob string header: $?\r\n
93    StreamedStringHeader,
94    /// Streaming blob error header: !?\r\n
95    StreamedBlobErrorHeader,
96    /// Streaming verbatim string header: =?\r\n
97    StreamedVerbatimStringHeader,
98    /// Streaming array header: *?\r\n
99    StreamedArrayHeader,
100    /// Streaming set header: ~?\r\n
101    StreamedSetHeader,
102    /// Streaming map header: %?\r\n
103    StreamedMapHeader,
104    /// Streaming attribute header: |?\r\n
105    StreamedAttributeHeader,
106    /// Streaming push header: >?\r\n
107    StreamedPushHeader,
108    /// Streaming string chunk: ;length\r\ndata\r\n
109    ///
110    /// Represents an individual chunk in a streaming string sequence.
111    /// These chunks are parsed from `;{length}\r\n{data}\r\n` format.
112    /// A zero-length chunk (`;0\r\n`) indicates the end of the stream.
113    ///
114    /// # Example
115    /// ```
116    /// use resp_rs::resp3::Frame;
117    /// use bytes::Bytes;
118    ///
119    /// // Chunk containing "Hello"
120    /// // Wire format: ;5\r\nHello\r\n
121    /// Frame::StreamedStringChunk(Bytes::from("Hello"));
122    /// ```
123    StreamedStringChunk(Bytes),
124
125    /// Accumulated streaming string data from multiple chunks
126    ///
127    /// Created by `parse_streaming_sequence()` when parsing a complete
128    /// streaming string sequence (`$?\r\n` + chunks + `;0\r\n`).
129    /// Contains all chunks in order, allowing reconstruction of the full string.
130    ///
131    /// # Example
132    /// ```
133    /// use resp_rs::resp3::Frame;
134    /// use bytes::Bytes;
135    ///
136    /// // Represents "Hello world" from chunks ["Hello ", "world"]
137    /// Frame::StreamedString(vec![
138    ///     Bytes::from("Hello "),
139    ///     Bytes::from("world")
140    /// ]);
141    /// ```
142    StreamedString(Vec<Bytes>),
143
144    /// Accumulated streaming array data from multiple chunks
145    ///
146    /// Created when parsing a streaming array sequence (`*?\r\n` + frames + `.\r\n`).
147    /// Contains all frames that were streamed as part of the array.
148    ///
149    /// # Example
150    /// ```
151    /// use resp_rs::resp3::Frame;
152    /// use bytes::Bytes;
153    ///
154    /// // Array with mixed types
155    /// Frame::StreamedArray(vec![
156    ///     Frame::SimpleString(Bytes::from("hello")),
157    ///     Frame::Integer(42),
158    ///     Frame::Boolean(true)
159    /// ]);
160    /// ```
161    StreamedArray(Vec<Frame>),
162
163    /// Accumulated streaming set data from multiple chunks
164    ///
165    /// Created when parsing a streaming set sequence (`~?\r\n` + frames + `.\r\n`).
166    /// Contains all unique elements that were streamed as part of the set.
167    StreamedSet(Vec<Frame>),
168
169    /// Accumulated streaming map data from multiple chunks
170    ///
171    /// Created when parsing a streaming map sequence (`%?\r\n` + key-value pairs + `.\r\n`).
172    /// Contains all key-value pairs that were streamed as part of the map.
173    ///
174    /// # Example
175    /// ```
176    /// use resp_rs::resp3::Frame;
177    /// use bytes::Bytes;
178    ///
179    /// Frame::StreamedMap(vec![
180    ///     (Frame::SimpleString(Bytes::from("name")), Frame::SimpleString(Bytes::from("Alice"))),
181    ///     (Frame::SimpleString(Bytes::from("age")), Frame::Integer(25))
182    /// ]);
183    /// ```
184    StreamedMap(Vec<(Frame, Frame)>),
185
186    /// Accumulated streaming attribute data from multiple chunks
187    ///
188    /// Created when parsing a streaming attribute sequence (`|?\r\n` + key-value pairs + `.\r\n`).
189    /// Attributes provide out-of-band metadata that doesn't affect the main data structure.
190    StreamedAttribute(Vec<(Frame, Frame)>),
191
192    /// Accumulated streaming push data from multiple chunks
193    ///
194    /// Created when parsing a streaming push sequence (`>?\r\n` + frames + `.\r\n`).
195    /// Push messages are server-initiated communications (e.g., pub/sub messages).
196    ///
197    /// # Example
198    /// ```
199    /// use resp_rs::resp3::Frame;
200    /// use bytes::Bytes;
201    ///
202    /// // Pub/sub message
203    /// Frame::StreamedPush(vec![
204    ///     Frame::SimpleString(Bytes::from("pubsub")),
205    ///     Frame::SimpleString(Bytes::from("channel1")),
206    ///     Frame::SimpleString(Bytes::from("message content"))
207    /// ]);
208    /// ```
209    StreamedPush(Vec<Frame>),
210    /// End-of-stream terminator for all chunked sequences: .\r\n
211    StreamTerminator,
212    /// Null: _\r\n
213    Null,
214    /// Double: ,&lt;float&gt;\r\n
215    Double(f64),
216    /// Special Float: ,inf\r\n, -inf\r\n, nan\r\n
217    SpecialFloat(Bytes),
218    /// Boolean: #t\r\n or #f\r\n
219    Boolean(bool),
220    /// Big number: (&lt;number&gt;\r\n
221    BigNumber(Bytes),
222    /// Verbatim string: =format:content\r\n
223    // VerbatimString { format: String, content: String },
224    VerbatimString(Bytes, Bytes),
225    /// Array: *&lt;count&gt;\r\n... (or streaming header *?\r\n)
226    Array(Option<Vec<Frame>>),
227    /// Set: ~&lt;count&gt;\r\n... (or streaming header ~?\r\n)
228    Set(Vec<Frame>),
229    /// Map: %&lt;count&gt;\r\n... (or streaming header %?\r\n)
230    Map(Vec<(Frame, Frame)>),
231    /// Attribute: |&lt;count&gt;\r\n... (or streaming header |?\r\n)
232    Attribute(Vec<(Frame, Frame)>),
233    /// Push: > &lt;count&gt;\r\n... (or streaming header >?\r\n)
234    Push(Vec<Frame>),
235}
236
237pub use crate::ParseError;
238
239/// Parse a single RESP3 frame from the provided `Bytes`.
240///
241/// Returns the parsed `Frame` and the remaining unconsumed `Bytes`, or a `ParseError` on failure.
242pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
243    let (frame, consumed) = parse_frame_inner(&input, 0)?;
244    Ok((frame, input.slice(consumed..)))
245}
246
247/// Offset-based internal parser. Works with byte positions to avoid creating
248/// intermediate `Bytes::slice()` objects. Only slices for actual frame data.
249fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
250    let buf = input.as_ref();
251    if pos >= buf.len() {
252        return Err(ParseError::Incomplete);
253    }
254
255    let tag = buf[pos];
256
257    match tag {
258        b'+' => {
259            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
260            Ok((
261                Frame::SimpleString(input.slice(pos + 1..line_end)),
262                after_crlf,
263            ))
264        }
265        b'-' => {
266            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
267            Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
268        }
269        b':' => {
270            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
271            let v = parse_i64(&buf[pos + 1..line_end])?;
272            Ok((Frame::Integer(v), after_crlf))
273        }
274        b'$' => {
275            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
276            let len_bytes = &buf[pos + 1..line_end];
277            // streaming header?
278            if len_bytes == b"?" {
279                return Ok((Frame::StreamedStringHeader, after_crlf));
280            }
281            // null bulk
282            if len_bytes == b"-1" {
283                return Ok((Frame::BulkString(None), after_crlf));
284            }
285            let len = parse_usize(len_bytes)?;
286            if len == 0 {
287                if after_crlf + 1 >= buf.len() {
288                    return Err(ParseError::Incomplete);
289                }
290                if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
291                    return Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2));
292                } else {
293                    return Err(ParseError::InvalidFormat);
294                }
295            }
296            let data_start = after_crlf;
297            let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
298            if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
299                return Err(ParseError::Incomplete);
300            }
301            Ok((
302                Frame::BulkString(Some(input.slice(data_start..data_end))),
303                data_end + 2,
304            ))
305        }
306        b'_' => {
307            if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
308                Ok((Frame::Null, pos + 3))
309            } else {
310                Err(ParseError::Incomplete)
311            }
312        }
313        b',' => {
314            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
315            let line_bytes = &buf[pos + 1..line_end];
316            // special float cases
317            if line_bytes == b"inf" || line_bytes == b"-inf" || line_bytes == b"nan" {
318                return Ok((
319                    Frame::SpecialFloat(input.slice(pos + 1..line_end)),
320                    after_crlf,
321                ));
322            }
323            // numeric double
324            let s = std::str::from_utf8(line_bytes).map_err(|_| ParseError::Utf8Error)?;
325            let v = s.parse::<f64>().map_err(|_| ParseError::InvalidFormat)?;
326            // Rust's f64::parse accepts case-insensitive "inf"/"infinity"/"nan".
327            // Normalize these to SpecialFloat for consistent roundtripping.
328            if v.is_infinite() || v.is_nan() {
329                let canonical = if v.is_nan() {
330                    "nan"
331                } else if v.is_sign_negative() {
332                    "-inf"
333                } else {
334                    "inf"
335                };
336                return Ok((Frame::SpecialFloat(Bytes::from(canonical)), after_crlf));
337            }
338            Ok((Frame::Double(v), after_crlf))
339        }
340        b'#' => {
341            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
342            match &buf[pos + 1..line_end] {
343                b"t" => Ok((Frame::Boolean(true), after_crlf)),
344                b"f" => Ok((Frame::Boolean(false), after_crlf)),
345                _ => Err(ParseError::InvalidBoolean),
346            }
347        }
348        b'(' => {
349            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
350            Ok((Frame::BigNumber(input.slice(pos + 1..line_end)), after_crlf))
351        }
352        b'=' => {
353            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
354            let len_bytes = &buf[pos + 1..line_end];
355            // streaming header
356            if len_bytes == b"?" {
357                return Ok((Frame::StreamedVerbatimStringHeader, after_crlf));
358            }
359            // null case
360            if len_bytes == b"-1" {
361                return Ok((
362                    Frame::VerbatimString(Bytes::new(), Bytes::new()),
363                    after_crlf,
364                ));
365            }
366            let len = parse_usize(len_bytes)?;
367            let data_start = after_crlf;
368            let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
369            if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
370                return Err(ParseError::Incomplete);
371            }
372            // find colon separator in payload
373            let sep = buf[data_start..data_end]
374                .iter()
375                .position(|&b| b == b':')
376                .ok_or(ParseError::InvalidFormat)?;
377            let format = input.slice(data_start..data_start + sep);
378            let content = input.slice(data_start + sep + 1..data_end);
379            Ok((Frame::VerbatimString(format, content), data_end + 2))
380        }
381        b'!' => {
382            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
383            let len_bytes = &buf[pos + 1..line_end];
384            // streaming blob error header
385            if len_bytes == b"?" {
386                return Ok((Frame::StreamedBlobErrorHeader, after_crlf));
387            }
388            if len_bytes == b"-1" {
389                return Ok((Frame::BlobError(Bytes::new()), after_crlf));
390            }
391            let len = parse_usize(len_bytes)?;
392            let data_start = after_crlf;
393            let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
394            if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
395                return Err(ParseError::Incomplete);
396            }
397            Ok((
398                Frame::BlobError(input.slice(data_start..data_end)),
399                data_end + 2,
400            ))
401        }
402        b'*' => {
403            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
404            let len_bytes = &buf[pos + 1..line_end];
405            if len_bytes == b"?" {
406                return Ok((Frame::StreamedArrayHeader, after_crlf));
407            }
408            if len_bytes == b"-1" {
409                return Ok((Frame::Array(None), after_crlf));
410            }
411            let count = parse_count(len_bytes)?;
412            if count == 0 {
413                return Ok((Frame::Array(Some(Vec::new())), after_crlf));
414            }
415            let mut cursor = after_crlf;
416            let mut items = Vec::with_capacity(count);
417            for _ in 0..count {
418                let (item, next) = parse_frame_inner(input, cursor)?;
419                items.push(item);
420                cursor = next;
421            }
422            Ok((Frame::Array(Some(items)), cursor))
423        }
424        b'~' => {
425            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
426            let len_bytes = &buf[pos + 1..line_end];
427            if len_bytes == b"?" {
428                return Ok((Frame::StreamedSetHeader, after_crlf));
429            }
430            let count = parse_count(len_bytes)?;
431            let mut cursor = after_crlf;
432            let mut items = Vec::with_capacity(count);
433            for _ in 0..count {
434                let (item, next) = parse_frame_inner(input, cursor)?;
435                items.push(item);
436                cursor = next;
437            }
438            Ok((Frame::Set(items), cursor))
439        }
440        b'%' | b'|' => {
441            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
442            let len_bytes = &buf[pos + 1..line_end];
443            if len_bytes == b"?" {
444                return if tag == b'%' {
445                    Ok((Frame::StreamedMapHeader, after_crlf))
446                } else {
447                    Ok((Frame::StreamedAttributeHeader, after_crlf))
448                };
449            }
450            let count = parse_count(len_bytes)?;
451            let mut cursor = after_crlf;
452            let mut pairs = Vec::with_capacity(count);
453            for _ in 0..count {
454                let (key, next1) = parse_frame_inner(input, cursor)?;
455                let (val, next2) = parse_frame_inner(input, next1)?;
456                pairs.push((key, val));
457                cursor = next2;
458            }
459            if tag == b'%' {
460                Ok((Frame::Map(pairs), cursor))
461            } else {
462                Ok((Frame::Attribute(pairs), cursor))
463            }
464        }
465        b'>' => {
466            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
467            let len_bytes = &buf[pos + 1..line_end];
468            if len_bytes == b"?" {
469                return Ok((Frame::StreamedPushHeader, after_crlf));
470            }
471            let count = parse_count(len_bytes)?;
472            let mut cursor = after_crlf;
473            let mut items = Vec::with_capacity(count);
474            for _ in 0..count {
475                let (item, next) = parse_frame_inner(input, cursor)?;
476                items.push(item);
477                cursor = next;
478            }
479            Ok((Frame::Push(items), cursor))
480        }
481        b';' => {
482            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
483            let len = parse_usize(&buf[pos + 1..line_end])?;
484            if len == 0 {
485                if after_crlf + 1 >= buf.len() {
486                    return Err(ParseError::Incomplete);
487                }
488                if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
489                    return Ok((Frame::StreamedStringChunk(Bytes::new()), after_crlf + 2));
490                } else {
491                    return Err(ParseError::InvalidFormat);
492                }
493            }
494            let data_start = after_crlf;
495            let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
496            if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
497                return Err(ParseError::Incomplete);
498            }
499            Ok((
500                Frame::StreamedStringChunk(input.slice(data_start..data_end)),
501                data_end + 2,
502            ))
503        }
504        b'.' => {
505            if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
506                Ok((Frame::StreamTerminator, pos + 3))
507            } else {
508                Err(ParseError::Incomplete)
509            }
510        }
511        _ => Err(ParseError::InvalidTag(tag)),
512    }
513}
514
515/// Parse a complete RESP3 streaming sequence, accumulating chunks until termination.
516///
517/// This function handles RESP3 streaming sequences that begin with streaming headers
518/// (`$?`, `*?`, `~?`, `%?`, `|?`, `>?`) and accumulates the following data until
519/// the appropriate terminator is encountered.
520///
521/// # Streaming Types Supported
522///
523/// - **Streaming Strings**: `$?\r\n` followed by chunks terminated with `;0\r\n`
524/// - **Streaming Arrays**: `*?\r\n` followed by frames terminated with `.\r\n`
525/// - **Streaming Sets**: `~?\r\n` followed by frames terminated with `.\r\n`
526/// - **Streaming Maps**: `%?\r\n` followed by key-value pairs terminated with `.\r\n`
527/// - **Streaming Attributes**: `|?\r\n` followed by key-value pairs terminated with `.\r\n`
528/// - **Streaming Push**: `>?\r\n` followed by frames terminated with `.\r\n`
529///
530/// # Examples
531///
532/// ## Streaming String
533/// ```rust
534/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
535/// use bytes::Bytes;
536///
537/// let data = Bytes::from("$?\r\n;4\r\nHell\r\n;6\r\no worl\r\n;1\r\nd\r\n;0\r\n\r\n");
538/// let (frame, rest) = parse_streaming_sequence(data).unwrap();
539///
540/// if let Frame::StreamedString(chunks) = frame {
541///     assert_eq!(chunks.len(), 3);
542///     let full_string: String = chunks.iter()
543///         .map(|chunk| std::str::from_utf8(chunk).unwrap())
544///         .collect::<Vec<_>>()
545///         .join("");
546///     assert_eq!(full_string, "Hello world");
547/// }
548/// assert!(rest.is_empty());
549/// ```
550///
551/// ## Streaming Array
552/// ```rust
553/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
554/// use bytes::Bytes;
555///
556/// let data = Bytes::from("*?\r\n+hello\r\n:42\r\n#t\r\n.\r\n");
557/// let (frame, _) = parse_streaming_sequence(data).unwrap();
558///
559/// if let Frame::StreamedArray(items) = frame {
560///     assert_eq!(items.len(), 3);
561///     // items[0] = SimpleString("hello")
562///     // items[1] = Integer(42)
563///     // items[2] = Boolean(true)
564/// }
565/// ```
566///
567/// ## Streaming Map
568/// ```rust
569/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
570/// use bytes::Bytes;
571///
572/// let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+key2\r\n:123\r\n.\r\n");
573/// let (frame, _) = parse_streaming_sequence(data).unwrap();
574///
575/// if let Frame::StreamedMap(pairs) = frame {
576///     assert_eq!(pairs.len(), 2);
577///     // pairs[0] = (SimpleString("key1"), SimpleString("val1"))
578///     // pairs[1] = (SimpleString("key2"), Integer(123))
579/// }
580/// ```
581///
582/// # Errors
583///
584/// Returns `ParseError::Incomplete` if the stream is not complete or if required
585/// terminators are missing. Returns `ParseError::InvalidFormat` for malformed
586/// chunk data or unexpected frame types within streaming sequences.
587///
588/// # Notes
589///
590/// - For non-streaming frames, this function simply returns the parsed frame
591/// - Streaming string chunks are accumulated in order
592/// - All other streaming types accumulate complete frames until termination
593/// - Zero-copy parsing is used where possible to minimize allocations
594pub fn parse_streaming_sequence(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
595    if input.is_empty() {
596        return Err(ParseError::Incomplete);
597    }
598
599    let (header, mut rest) = parse_frame(input)?;
600
601    match header {
602        Frame::StreamedStringHeader => {
603            // Parse streaming string chunks until zero-length chunk
604            let mut chunks = Vec::new();
605
606            loop {
607                let (frame, new_rest) = parse_frame(rest)?;
608                rest = new_rest;
609
610                match frame {
611                    Frame::StreamedStringChunk(chunk) => {
612                        if chunk.is_empty() {
613                            // Zero-length chunk indicates end of stream
614                            break;
615                        }
616                        chunks.push(chunk);
617                    }
618                    _ => {
619                        return Err(ParseError::InvalidFormat);
620                    }
621                }
622            }
623
624            Ok((Frame::StreamedString(chunks), rest))
625        }
626        Frame::StreamedBlobErrorHeader => {
627            // Parse streaming blob error chunks until zero-length chunk
628            // Similar to StreamedStringHeader but for blob errors
629            let mut chunks = Vec::new();
630
631            loop {
632                let (frame, new_rest) = parse_frame(rest)?;
633                rest = new_rest;
634
635                match frame {
636                    Frame::BlobError(chunk) => {
637                        // Check for terminator by parsing as bulk string
638                        // In streaming context, !0\r\n acts as terminator
639                        if chunk.is_empty() {
640                            break;
641                        }
642                        chunks.push(chunk);
643                    }
644                    _ => {
645                        return Err(ParseError::InvalidFormat);
646                    }
647                }
648            }
649
650            // Concatenate all error chunks
651            let total_len: usize = chunks.iter().map(|c| c.len()).sum();
652            let mut combined = Vec::with_capacity(total_len);
653            for chunk in chunks {
654                combined.extend_from_slice(&chunk);
655            }
656            Ok((Frame::BlobError(Bytes::from(combined)), rest))
657        }
658        Frame::StreamedVerbatimStringHeader => {
659            // Parse streaming verbatim string chunks until zero-length chunk
660            let mut chunks = Vec::new();
661
662            loop {
663                let (frame, new_rest) = parse_frame(rest)?;
664                rest = new_rest;
665
666                match frame {
667                    Frame::VerbatimString(format, content) => {
668                        // Zero-length content indicates end of stream
669                        if content.is_empty() && format.len() == 3 {
670                            break;
671                        }
672                        // For now, just collect the content parts
673                        // In a full implementation, we'd validate format consistency
674                        chunks.push((format, content));
675                    }
676                    _ => {
677                        return Err(ParseError::InvalidFormat);
678                    }
679                }
680            }
681
682            // Concatenate all chunks, using format from first chunk
683            if chunks.is_empty() {
684                return Ok((Frame::VerbatimString(Bytes::new(), Bytes::new()), rest));
685            }
686
687            let format = chunks[0].0.clone();
688            let total_len: usize = chunks.iter().map(|(_, c)| c.len()).sum();
689            let mut combined = Vec::with_capacity(total_len);
690            for (_, content) in chunks {
691                combined.extend_from_slice(&content);
692            }
693            Ok((Frame::VerbatimString(format, Bytes::from(combined)), rest))
694        }
695        Frame::StreamedArrayHeader => {
696            // Parse streaming array items until terminator
697            let mut items = Vec::new();
698
699            loop {
700                let (frame, new_rest) = parse_frame(rest)?;
701                rest = new_rest;
702
703                match frame {
704                    Frame::StreamTerminator => {
705                        break;
706                    }
707                    item => {
708                        items.push(item);
709                    }
710                }
711            }
712
713            Ok((Frame::StreamedArray(items), rest))
714        }
715        Frame::StreamedSetHeader => {
716            // Parse streaming set items until terminator
717            let mut items = Vec::new();
718
719            loop {
720                let (frame, new_rest) = parse_frame(rest)?;
721                rest = new_rest;
722
723                match frame {
724                    Frame::StreamTerminator => {
725                        break;
726                    }
727                    item => {
728                        items.push(item);
729                    }
730                }
731            }
732
733            Ok((Frame::StreamedSet(items), rest))
734        }
735        Frame::StreamedMapHeader => {
736            // Parse streaming map pairs until terminator
737            let mut pairs = Vec::new();
738
739            loop {
740                let (frame, new_rest) = parse_frame(rest)?;
741                rest = new_rest;
742
743                match frame {
744                    Frame::StreamTerminator => {
745                        break;
746                    }
747                    key => {
748                        let (value, newer_rest) = parse_frame(rest)?;
749                        rest = newer_rest;
750                        pairs.push((key, value));
751                    }
752                }
753            }
754
755            Ok((Frame::StreamedMap(pairs), rest))
756        }
757        Frame::StreamedAttributeHeader => {
758            // Parse streaming attribute pairs until terminator
759            let mut pairs = Vec::new();
760
761            loop {
762                let (frame, new_rest) = parse_frame(rest)?;
763                rest = new_rest;
764
765                match frame {
766                    Frame::StreamTerminator => {
767                        break;
768                    }
769                    key => {
770                        let (value, newer_rest) = parse_frame(rest)?;
771                        rest = newer_rest;
772                        pairs.push((key, value));
773                    }
774                }
775            }
776
777            Ok((Frame::StreamedAttribute(pairs), rest))
778        }
779        Frame::StreamedPushHeader => {
780            // Parse streaming push items until terminator
781            let mut items = Vec::new();
782
783            loop {
784                let (frame, new_rest) = parse_frame(rest)?;
785                rest = new_rest;
786
787                match frame {
788                    Frame::StreamTerminator => {
789                        break;
790                    }
791                    item => {
792                        items.push(item);
793                    }
794                }
795            }
796
797            Ok((Frame::StreamedPush(items), rest))
798        }
799        _ => {
800            // Not a streaming sequence, just return the original frame
801            Ok((header, rest))
802        }
803    }
804}
805
806/// Find `\r\n` in `buf` starting at `from`. Returns `(line_end, after_crlf)` where
807/// `line_end` is the position of `\r` and `after_crlf` is the position after `\n`.
808#[inline]
809fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
810    let mut i = from;
811    let len = buf.len();
812    while i + 1 < len {
813        if buf[i] == b'\r' && buf[i + 1] == b'\n' {
814            return Ok((i, i + 2));
815        }
816        i += 1;
817    }
818    Err(ParseError::Incomplete)
819}
820
821/// Parse a `usize` directly from ASCII digit bytes, no UTF-8 validation needed.
822#[inline]
823fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
824    if buf.is_empty() {
825        return Err(ParseError::BadLength);
826    }
827    let mut v: usize = 0;
828    for &b in buf {
829        if !b.is_ascii_digit() {
830            return Err(ParseError::BadLength);
831        }
832        v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
833        v = v
834            .checked_add((b - b'0') as usize)
835            .ok_or(ParseError::BadLength)?;
836    }
837    Ok(v)
838}
839
840/// Parse an `i64` directly from ASCII bytes (optional leading `-`), no UTF-8 validation.
841#[inline]
842fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
843    if buf.is_empty() {
844        return Err(ParseError::InvalidFormat);
845    }
846    let (neg, digits) = if buf[0] == b'-' {
847        (true, &buf[1..])
848    } else {
849        (false, buf)
850    };
851    if digits.is_empty() {
852        return Err(ParseError::InvalidFormat);
853    }
854    let mut v: i64 = 0;
855    for (i, &d) in digits.iter().enumerate() {
856        if !d.is_ascii_digit() {
857            return Err(ParseError::InvalidFormat);
858        }
859        let digit = (d - b'0') as i64;
860        if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
861            return Ok(i64::MIN);
862        }
863        if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
864            return Err(ParseError::Overflow);
865        }
866        v = v * 10 + digit;
867    }
868    if neg { Ok(-v) } else { Ok(v) }
869}
870
871/// Parse a collection count (usize) with MAX_COLLECTION_SIZE check.
872#[inline]
873fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
874    let count = parse_usize(buf)?;
875    if count > MAX_COLLECTION_SIZE {
876        return Err(ParseError::BadLength);
877    }
878    Ok(count)
879}
880
881/// Converts a Frame to its RESP3 byte representation.
882///
883/// This function serializes a Frame into the corresponding RESP3 protocol bytes.
884pub fn frame_to_bytes(frame: &Frame) -> Bytes {
885    let mut buf = BytesMut::new();
886    serialize_frame(frame, &mut buf);
887    buf.freeze()
888}
889
890fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
891    match frame {
892        Frame::SimpleString(s) => {
893            buf.put_u8(b'+');
894            buf.extend_from_slice(s);
895            buf.extend_from_slice(b"\r\n");
896        }
897        Frame::Error(e) => {
898            buf.put_u8(b'-');
899            buf.extend_from_slice(e);
900            buf.extend_from_slice(b"\r\n");
901        }
902        Frame::Integer(i) => {
903            buf.put_u8(b':');
904            let s = i.to_string();
905            buf.extend_from_slice(s.as_bytes());
906            buf.extend_from_slice(b"\r\n");
907        }
908        Frame::BulkString(opt) => {
909            buf.put_u8(b'$');
910            match opt {
911                Some(data) => {
912                    let len = data.len().to_string();
913                    buf.extend_from_slice(len.as_bytes());
914                    buf.extend_from_slice(b"\r\n");
915                    buf.extend_from_slice(data);
916                    buf.extend_from_slice(b"\r\n");
917                }
918                None => {
919                    buf.extend_from_slice(b"-1\r\n");
920                }
921            }
922        }
923        Frame::BlobError(data) => {
924            buf.put_u8(b'!');
925            let len = data.len().to_string();
926            buf.extend_from_slice(len.as_bytes());
927            buf.extend_from_slice(b"\r\n");
928            buf.extend_from_slice(data);
929            buf.extend_from_slice(b"\r\n");
930        }
931        Frame::StreamedStringHeader => {
932            buf.extend_from_slice(b"$?\r\n");
933        }
934        Frame::StreamedBlobErrorHeader => {
935            buf.extend_from_slice(b"!?\r\n");
936        }
937        Frame::StreamedVerbatimStringHeader => {
938            buf.extend_from_slice(b"=?\r\n");
939        }
940        Frame::StreamedArrayHeader => {
941            buf.extend_from_slice(b"*?\r\n");
942        }
943        Frame::StreamedSetHeader => {
944            buf.extend_from_slice(b"~?\r\n");
945        }
946        Frame::StreamedMapHeader => {
947            buf.extend_from_slice(b"%?\r\n");
948        }
949        Frame::StreamedAttributeHeader => {
950            buf.extend_from_slice(b"|?\r\n");
951        }
952        Frame::StreamedPushHeader => {
953            buf.extend_from_slice(b">?\r\n");
954        }
955        Frame::StreamedStringChunk(data) => {
956            buf.put_u8(b';');
957            let len = data.len().to_string();
958            buf.extend_from_slice(len.as_bytes());
959            buf.extend_from_slice(b"\r\n");
960            buf.extend_from_slice(data);
961            buf.extend_from_slice(b"\r\n");
962        }
963        Frame::StreamedString(chunks) => {
964            // Serialize as streaming string sequence: $?\r\n + chunks + terminator
965            buf.extend_from_slice(b"$?\r\n");
966            for chunk in chunks {
967                buf.put_u8(b';');
968                let len = chunk.len().to_string();
969                buf.extend_from_slice(len.as_bytes());
970                buf.extend_from_slice(b"\r\n");
971                buf.extend_from_slice(chunk);
972                buf.extend_from_slice(b"\r\n");
973            }
974            buf.extend_from_slice(b";0\r\n\r\n");
975        }
976        Frame::StreamedArray(items) => {
977            buf.extend_from_slice(b"*?\r\n");
978            for item in items {
979                serialize_frame(item, buf);
980            }
981            buf.extend_from_slice(b".\r\n");
982        }
983        Frame::StreamedSet(items) => {
984            buf.extend_from_slice(b"~?\r\n");
985            for item in items {
986                serialize_frame(item, buf);
987            }
988            buf.extend_from_slice(b".\r\n");
989        }
990        Frame::StreamedMap(pairs) => {
991            buf.extend_from_slice(b"%?\r\n");
992            for (key, value) in pairs {
993                serialize_frame(key, buf);
994                serialize_frame(value, buf);
995            }
996            buf.extend_from_slice(b".\r\n");
997        }
998        Frame::StreamedAttribute(pairs) => {
999            buf.extend_from_slice(b"|?\r\n");
1000            for (key, value) in pairs {
1001                serialize_frame(key, buf);
1002                serialize_frame(value, buf);
1003            }
1004            buf.extend_from_slice(b".\r\n");
1005        }
1006        Frame::StreamedPush(items) => {
1007            buf.extend_from_slice(b">?\r\n");
1008            for item in items {
1009                serialize_frame(item, buf);
1010            }
1011            buf.extend_from_slice(b".\r\n");
1012        }
1013        Frame::StreamTerminator => {
1014            buf.extend_from_slice(b".\r\n");
1015        }
1016        Frame::Null => {
1017            buf.extend_from_slice(b"_\r\n");
1018        }
1019        Frame::Double(d) => {
1020            buf.put_u8(b',');
1021            let s = d.to_string();
1022            buf.extend_from_slice(s.as_bytes());
1023            buf.extend_from_slice(b"\r\n");
1024        }
1025        Frame::SpecialFloat(f) => {
1026            buf.put_u8(b',');
1027            buf.extend_from_slice(f);
1028            buf.extend_from_slice(b"\r\n");
1029        }
1030        Frame::Boolean(b) => {
1031            buf.extend_from_slice(if *b { b"#t\r\n" } else { b"#f\r\n" });
1032        }
1033        Frame::BigNumber(n) => {
1034            buf.put_u8(b'(');
1035            buf.extend_from_slice(n);
1036            buf.extend_from_slice(b"\r\n");
1037        }
1038        Frame::VerbatimString(format, content) => {
1039            buf.put_u8(b'=');
1040            let total_len = format.len() + 1 + content.len(); // +1 for the colon
1041            let len = total_len.to_string();
1042            buf.extend_from_slice(len.as_bytes());
1043            buf.extend_from_slice(b"\r\n");
1044            buf.extend_from_slice(format);
1045            buf.put_u8(b':');
1046            buf.extend_from_slice(content);
1047            buf.extend_from_slice(b"\r\n");
1048        }
1049        Frame::Array(opt) => {
1050            buf.put_u8(b'*');
1051            match opt {
1052                Some(items) => {
1053                    let len = items.len().to_string();
1054                    buf.extend_from_slice(len.as_bytes());
1055                    buf.extend_from_slice(b"\r\n");
1056                    for item in items {
1057                        serialize_frame(item, buf);
1058                    }
1059                }
1060                None => {
1061                    buf.extend_from_slice(b"-1\r\n");
1062                }
1063            }
1064        }
1065        Frame::Set(items) => {
1066            buf.put_u8(b'~');
1067            let len = items.len().to_string();
1068            buf.extend_from_slice(len.as_bytes());
1069            buf.extend_from_slice(b"\r\n");
1070            for item in items {
1071                serialize_frame(item, buf);
1072            }
1073        }
1074        Frame::Map(pairs) => {
1075            buf.put_u8(b'%');
1076            let len = pairs.len().to_string();
1077            buf.extend_from_slice(len.as_bytes());
1078            buf.extend_from_slice(b"\r\n");
1079            for (key, value) in pairs {
1080                serialize_frame(key, buf);
1081                serialize_frame(value, buf);
1082            }
1083        }
1084        Frame::Attribute(pairs) => {
1085            buf.put_u8(b'|');
1086            let len = pairs.len().to_string();
1087            buf.extend_from_slice(len.as_bytes());
1088            buf.extend_from_slice(b"\r\n");
1089            for (key, value) in pairs {
1090                serialize_frame(key, buf);
1091                serialize_frame(value, buf);
1092            }
1093        }
1094        Frame::Push(items) => {
1095            buf.put_u8(b'>');
1096            let len = items.len().to_string();
1097            buf.extend_from_slice(len.as_bytes());
1098            buf.extend_from_slice(b"\r\n");
1099            for item in items {
1100                serialize_frame(item, buf);
1101            }
1102        }
1103    }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108    use super::{Frame, ParseError, Parser, frame_to_bytes, parse_frame, parse_streaming_sequence};
1109    use bytes::Bytes;
1110
1111    #[test]
1112    fn test_parse_frame_simple_string() {
1113        let input = Bytes::from("+HELLO\r\nWORLD");
1114        let (frame, rest) = parse_frame(input.clone()).unwrap();
1115        assert_eq!(frame, Frame::SimpleString(Bytes::from("HELLO")));
1116        assert_eq!(rest, Bytes::from("WORLD"));
1117    }
1118
1119    #[test]
1120    fn test_parse_frame_blob_error() {
1121        let input = Bytes::from("!5\r\nERROR\r\nREST");
1122        let (frame, rest) = parse_frame(input.clone()).unwrap();
1123        assert_eq!(frame, Frame::BlobError(Bytes::from("ERROR")));
1124        assert_eq!(rest, Bytes::from("REST"));
1125    }
1126
1127    #[test]
1128    fn test_parse_frame_error() {
1129        let input = Bytes::from("-ERR fail\r\nLEFT");
1130        let (frame, rest) = parse_frame(input.clone()).unwrap();
1131        assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
1132        assert_eq!(rest, Bytes::from("LEFT"));
1133    }
1134
1135    #[test]
1136    fn test_parse_frame_integer() {
1137        let input = Bytes::from(":42\r\nTAIL");
1138        let (frame, rest) = parse_frame(input.clone()).unwrap();
1139        assert_eq!(frame, Frame::Integer(42));
1140        assert_eq!(rest, Bytes::from("TAIL"));
1141    }
1142
1143    #[test]
1144    fn test_parse_frame_bulk_string() {
1145        let input = Bytes::from("$3\r\nfoo\r\nREST");
1146        let (frame, rest) = parse_frame(input.clone()).unwrap();
1147        assert_eq!(frame, Frame::BulkString(Some(Bytes::from("foo"))));
1148        assert_eq!(rest, Bytes::from("REST"));
1149        let null_input = Bytes::from("$-1\r\nAFTER");
1150        let (frame, rest) = parse_frame(null_input.clone()).unwrap();
1151        assert_eq!(frame, Frame::BulkString(None));
1152        assert_eq!(rest, Bytes::from("AFTER"));
1153    }
1154
1155    #[test]
1156    fn test_parse_frame_null() {
1157        let input = Bytes::from("_\r\nLEFT");
1158        let (frame, rest) = parse_frame(input.clone()).unwrap();
1159        assert_eq!(frame, Frame::Null);
1160        assert_eq!(rest, Bytes::from("LEFT"));
1161    }
1162
1163    #[test]
1164    fn test_parse_frame_double_and_special_float() {
1165        let input = Bytes::from(",3.5\r\nNEXT");
1166        let (frame, rest) = parse_frame(input.clone()).unwrap();
1167        assert_eq!(frame, Frame::Double(3.5));
1168        assert_eq!(rest, Bytes::from("NEXT"));
1169        let input_inf = Bytes::from(",inf\r\nTAIL");
1170        let (frame, rest) = parse_frame(input_inf.clone()).unwrap();
1171        assert_eq!(frame, Frame::SpecialFloat(Bytes::from("inf")));
1172        assert_eq!(rest, Bytes::from("TAIL"));
1173    }
1174
1175    #[test]
1176    fn test_parse_frame_boolean() {
1177        let input_true = Bytes::from("#t\r\nXYZ");
1178        let (frame, rest) = parse_frame(input_true.clone()).unwrap();
1179        assert_eq!(frame, Frame::Boolean(true));
1180        assert_eq!(rest, Bytes::from("XYZ"));
1181        let input_false = Bytes::from("#f\r\nDONE");
1182        let (frame, rest) = parse_frame(input_false.clone()).unwrap();
1183        assert_eq!(frame, Frame::Boolean(false));
1184        assert_eq!(rest, Bytes::from("DONE"));
1185    }
1186
1187    #[test]
1188    fn test_parse_frame_big_number() {
1189        let input = Bytes::from("(123456789\r\nEND");
1190        let (frame, rest) = parse_frame(input.clone()).unwrap();
1191        assert_eq!(frame, Frame::BigNumber(Bytes::from("123456789")));
1192        assert_eq!(rest, Bytes::from("END"));
1193    }
1194
1195    #[test]
1196    fn test_parse_frame_verbatim_string() {
1197        let input = Bytes::from("=12\r\ntxt:hi there\r\nAFTER");
1198        let (frame, rest) = parse_frame(input.clone()).unwrap();
1199        assert_eq!(
1200            frame,
1201            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hi there")) // Frame::VerbatimString {
1202                                                                               //     format: "txt".to_string(),
1203                                                                               //     content: "hi there".to_string()
1204                                                                               // }
1205        );
1206        assert_eq!(rest, Bytes::from("AFTER"));
1207    }
1208
1209    #[test]
1210    fn test_parse_frame_array_set_push_map_attribute() {
1211        // Array of two bulk strings
1212        let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\nTAIL");
1213        let (frame, rest) = parse_frame(input.clone()).unwrap();
1214        assert_eq!(
1215            frame,
1216            Frame::Array(Some(vec![
1217                Frame::BulkString(Some(Bytes::from("foo"))),
1218                Frame::BulkString(Some(Bytes::from("bar")))
1219            ]))
1220        );
1221        assert_eq!(rest, Bytes::from("TAIL"));
1222        // Null array
1223        let input_null = Bytes::from("*-1\r\nEND");
1224        let (frame, rest) = parse_frame(input_null.clone()).unwrap();
1225        assert_eq!(frame, Frame::Array(None));
1226        assert_eq!(rest, Bytes::from("END"));
1227        // Set of two simple strings
1228        let input_set = Bytes::from("~2\r\n+foo\r\n+bar\r\nTAIL");
1229        let (frame, rest) = parse_frame(input_set.clone()).unwrap();
1230        assert_eq!(
1231            frame,
1232            Frame::Set(vec![
1233                Frame::SimpleString(Bytes::from("foo")),
1234                Frame::SimpleString(Bytes::from("bar")),
1235            ])
1236        );
1237        assert_eq!(rest, Bytes::from("TAIL"));
1238        // Map of two key-value pairs
1239        let input_map = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\nTRAIL");
1240        let (frame, rest) = parse_frame(input_map.clone()).unwrap();
1241        assert_eq!(
1242            frame,
1243            Frame::Map(vec![
1244                (
1245                    Frame::SimpleString(Bytes::from("key1")),
1246                    Frame::SimpleString(Bytes::from("val1"))
1247                ),
1248                (
1249                    Frame::SimpleString(Bytes::from("key2")),
1250                    Frame::SimpleString(Bytes::from("val2"))
1251                ),
1252            ])
1253        );
1254        assert_eq!(rest, Bytes::from("TRAIL"));
1255        // Attribute
1256        let input_attr = Bytes::from("|1\r\n+meta\r\n+data\r\nAFTER");
1257        let (frame, rest) = parse_frame(input_attr.clone()).unwrap();
1258        assert_eq!(
1259            frame,
1260            Frame::Attribute(vec![(
1261                Frame::SimpleString(Bytes::from("meta")),
1262                Frame::SimpleString(Bytes::from("data"))
1263            ),])
1264        );
1265        assert_eq!(rest, Bytes::from("AFTER"));
1266        // Push
1267        let input_push = Bytes::from(">2\r\n+type\r\n:1\r\nNEXT");
1268        let (frame, rest) = parse_frame(input_push.clone()).unwrap();
1269        assert_eq!(
1270            frame,
1271            Frame::Push(vec![
1272                Frame::SimpleString(Bytes::from("type")),
1273                Frame::Integer(1),
1274            ])
1275        );
1276        assert_eq!(rest, Bytes::from("NEXT"));
1277    }
1278
1279    #[test]
1280    fn test_parse_frame_empty_input() {
1281        assert!(parse_frame(Bytes::new()).is_err());
1282    }
1283
1284    #[test]
1285    fn test_parse_frame_invalid_tag() {
1286        let input = Bytes::from("X123\r\n");
1287        assert!(parse_frame(input).is_err());
1288    }
1289
1290    #[test]
1291    fn test_parse_frame_malformed_bulk_length() {
1292        let input = Bytes::from("$x\r\nfoo\r\n");
1293        assert!(parse_frame(input).is_err());
1294    }
1295
1296    #[test]
1297    fn test_parse_frame_zero_length_bulk() {
1298        let input = Bytes::from("$0\r\n\r\nTAIL");
1299        let (frame, rest) = parse_frame(input.clone()).unwrap();
1300        assert_eq!(frame, Frame::BulkString(Some(Bytes::from(""))));
1301        assert_eq!(rest, Bytes::from("TAIL"));
1302    }
1303
1304    #[test]
1305    fn test_parse_frame_zero_length_blob_error() {
1306        let input = Bytes::from("!0\r\n\r\nREST");
1307        let (frame, rest) = parse_frame(input.clone()).unwrap();
1308        assert_eq!(frame, Frame::BlobError(Bytes::new()));
1309        assert_eq!(rest, Bytes::from("REST"));
1310    }
1311
1312    #[test]
1313    fn test_parse_frame_missing_crlf() {
1314        let input = Bytes::from(":42\nTAIL");
1315        assert!(parse_frame(input).is_err());
1316    }
1317
1318    #[test]
1319    fn test_parse_frame_unicode_simple_string() {
1320        let input = Bytes::from("+こんにちは\r\nEND");
1321        let (frame, rest) = parse_frame(input.clone()).unwrap();
1322        assert_eq!(frame, Frame::SimpleString(Bytes::from("こんにちは")));
1323        assert_eq!(rest, Bytes::from("END"));
1324    }
1325
1326    #[test]
1327    fn test_parse_frame_chained_frames() {
1328        let combined = Bytes::from("+OK\r\n:1\r\nfoo");
1329        let (f1, rem) = parse_frame(combined.clone()).unwrap();
1330        assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
1331        let (f2, rem2) = parse_frame(rem).unwrap();
1332        assert_eq!(f2, Frame::Integer(1));
1333        assert_eq!(rem2, Bytes::from("foo"));
1334    }
1335
1336    #[test]
1337    fn test_parse_frame_empty_array() {
1338        let input = Bytes::from("*0\r\nTAIL");
1339        let (frame, rest) = parse_frame(input.clone()).unwrap();
1340        assert_eq!(frame, Frame::Array(Some(vec![])));
1341        assert_eq!(rest, Bytes::from("TAIL"));
1342    }
1343
1344    #[test]
1345    fn test_parse_frame_partial_array_data() {
1346        let input = Bytes::from("*2\r\n+OK\r\n");
1347        assert!(parse_frame(input).is_err());
1348    }
1349
1350    #[test]
1351    fn test_parse_frame_streamed_string() {
1352        let input = Bytes::from("$?\r\n$5\r\nhello\r\n$0\r\n\r\nREST");
1353        let (frame, rem) = parse_frame(input.clone()).unwrap();
1354        assert_eq!(frame, Frame::StreamedStringHeader);
1355        let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1356        assert_eq!(chunk, Frame::BulkString(Some(Bytes::from("hello"))));
1357        let (terminator, rest) = parse_frame(rem2.clone()).unwrap();
1358        assert_eq!(terminator, Frame::BulkString(Some(Bytes::from(""))));
1359        assert_eq!(rest, Bytes::from("REST"));
1360    }
1361
1362    #[test]
1363    fn test_parse_frame_streamed_blob_error() {
1364        let input = Bytes::from("!?\r\n!5\r\nERROR\r\nREST");
1365        let (frame, rem) = parse_frame(input.clone()).unwrap();
1366        assert_eq!(frame, Frame::StreamedBlobErrorHeader);
1367        let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1368        assert_eq!(chunk, Frame::BlobError(Bytes::from("ERROR")));
1369        assert_eq!(rem2, Bytes::from("REST"));
1370    }
1371
1372    #[test]
1373    fn test_parse_frame_streamed_verbatim_string() {
1374        let input = Bytes::from("=?\r\n=9\r\ntxt:hello\r\nTAIL");
1375        let (frame, rem) = parse_frame(input.clone()).unwrap();
1376        assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
1377        let (chunk, rest) = parse_frame(rem.clone()).unwrap();
1378        assert_eq!(
1379            chunk,
1380            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello")) // Frame::VerbatimString {
1381                                                                            //     format: "txt".to_string(),
1382                                                                            //     content: "hello".to_string()
1383                                                                            // }
1384        );
1385        assert_eq!(rest, Bytes::from("TAIL"));
1386    }
1387
1388    #[test]
1389    fn test_parse_frame_streamed_array() {
1390        let input = Bytes::from("*?\r\n+one\r\n+two\r\n*0\r\nEND");
1391        let (header, rem) = parse_frame(input.clone()).unwrap();
1392        assert_eq!(header, Frame::StreamedArrayHeader);
1393        let (item1, rem2) = parse_frame(rem.clone()).unwrap();
1394        assert_eq!(item1, Frame::SimpleString(Bytes::from("one")));
1395        let (item2, rem3) = parse_frame(rem2.clone()).unwrap();
1396        assert_eq!(item2, Frame::SimpleString(Bytes::from("two")));
1397        let (terminator, rest) = parse_frame(rem3.clone()).unwrap();
1398        assert_eq!(terminator, Frame::Array(Some(vec![])));
1399        assert_eq!(rest, Bytes::from("END"));
1400    }
1401
1402    #[test]
1403    fn test_parse_frame_streamed_set_map_attr_push() {
1404        // Set
1405        let input_set = Bytes::from("~?\r\n+foo\r\n+bar\r\n~0\r\nTAIL");
1406        let (h_set, rem0) = parse_frame(input_set.clone()).unwrap();
1407        assert_eq!(h_set, Frame::StreamedSetHeader);
1408        let (s1, rem1) = parse_frame(rem0.clone()).unwrap();
1409        assert_eq!(s1, Frame::SimpleString(Bytes::from("foo")));
1410        let (s2, rem2) = parse_frame(rem1.clone()).unwrap();
1411        assert_eq!(s2, Frame::SimpleString(Bytes::from("bar")));
1412        let (term_set, rest_set) = parse_frame(rem2.clone()).unwrap();
1413        assert_eq!(term_set, Frame::Set(vec![]));
1414        assert_eq!(rest_set, Bytes::from("TAIL"));
1415        // Map
1416        let input_map = Bytes::from("%?\r\n+key\r\n+val\r\n%0\r\nNEXT");
1417        let (h_map, rem_map) = parse_frame(input_map.clone()).unwrap();
1418        assert_eq!(h_map, Frame::StreamedMapHeader);
1419        let (k, rem_map2) = parse_frame(rem_map.clone()).unwrap();
1420        assert_eq!(k, Frame::SimpleString(Bytes::from("key")));
1421        let (v, rem_map3) = parse_frame(rem_map2.clone()).unwrap();
1422        assert_eq!(v, Frame::SimpleString(Bytes::from("val")));
1423        let (term_map, rest_map4) = parse_frame(rem_map3.clone()).unwrap();
1424        assert_eq!(term_map, Frame::Map(vec![]));
1425        assert_eq!(rest_map4, Bytes::from("NEXT"));
1426        // Attribute
1427        let input_attr = Bytes::from("|?\r\n+meta\r\n+info\r\n|0\r\nMORE");
1428        let (h_attr, rem_attr) = parse_frame(input_attr.clone()).unwrap();
1429        assert_eq!(h_attr, Frame::StreamedAttributeHeader);
1430        let (a1, rem_attr2) = parse_frame(rem_attr.clone()).unwrap();
1431        assert_eq!(a1, Frame::SimpleString(Bytes::from("meta")));
1432        let (a2, rem_attr3) = parse_frame(rem_attr2.clone()).unwrap();
1433        assert_eq!(a2, Frame::SimpleString(Bytes::from("info")));
1434        let (term_attr, rest_attr) = parse_frame(rem_attr3.clone()).unwrap();
1435        assert_eq!(term_attr, Frame::Attribute(vec![]));
1436        assert_eq!(rest_attr, Bytes::from("MORE"));
1437        // Push
1438        let input_push = Bytes::from(">?\r\n:1\r\n:2\r\n>0\r\nEND");
1439        let (h_push, rem_push) = parse_frame(input_push.clone()).unwrap();
1440        assert_eq!(h_push, Frame::StreamedPushHeader);
1441        let (p1, rem_push2) = parse_frame(rem_push.clone()).unwrap();
1442        assert_eq!(p1, Frame::Integer(1));
1443        let (p2, rem_push3) = parse_frame(rem_push2.clone()).unwrap();
1444        assert_eq!(p2, Frame::Integer(2));
1445        let (term_push, rest_push) = parse_frame(rem_push3.clone()).unwrap();
1446        assert_eq!(term_push, Frame::Push(vec![]));
1447        assert_eq!(rest_push, Bytes::from("END"));
1448    }
1449
1450    #[test]
1451    fn test_parse_frame_stream_terminator() {
1452        let input = Bytes::from(".\r\nREST");
1453        let (frame, rest) = parse_frame(input.clone()).unwrap();
1454        assert_eq!(frame, Frame::StreamTerminator);
1455        assert_eq!(rest, Bytes::from("REST"));
1456    }
1457
1458    #[test]
1459    fn test_parse_frame_null_bulk_and_error() {
1460        let input1 = Bytes::from("!-1\r\nTAIL");
1461        let (f1, r1) = parse_frame(input1.clone()).unwrap();
1462        assert_eq!(f1, Frame::BlobError(Bytes::new()));
1463        assert_eq!(r1, Bytes::from("TAIL"));
1464        let input2 = Bytes::from("=-1\r\nTAIL");
1465        let (f2, r2) = parse_frame(input2.clone()).unwrap();
1466        assert_eq!(f2, Frame::VerbatimString(Bytes::new(), Bytes::new()));
1467        assert_eq!(r2, Bytes::from("TAIL"));
1468    }
1469
1470    #[test]
1471    fn test_parse_frame_special_float_nan() {
1472        let input = Bytes::from(",nan\r\nTAIL");
1473        let (frame, rest) = parse_frame(input.clone()).unwrap();
1474        assert_eq!(frame, Frame::SpecialFloat(Bytes::from("nan")));
1475        assert_eq!(rest, Bytes::from("TAIL"));
1476    }
1477
1478    #[test]
1479    fn test_parse_frame_big_number_zero() {
1480        let input = Bytes::from("(0\r\nEND");
1481        let (frame, rest) = parse_frame(input.clone()).unwrap();
1482        assert_eq!(frame, Frame::BigNumber(Bytes::from("0")));
1483        assert_eq!(rest, Bytes::from("END"));
1484    }
1485
1486    #[test]
1487    fn test_parse_frame_collection_empty() {
1488        let input_push = Bytes::from(">0\r\nTAIL");
1489        let (f_push, r_push) = parse_frame(input_push.clone()).unwrap();
1490        assert_eq!(f_push, Frame::Push(vec![]));
1491        assert_eq!(r_push, Bytes::from("TAIL"));
1492        let input_attr = Bytes::from("|0\r\nAFTER");
1493        let (f_attr, r_attr) = parse_frame(input_attr.clone()).unwrap();
1494        assert_eq!(f_attr, Frame::Attribute(vec![]));
1495        assert_eq!(r_attr, Bytes::from("AFTER"));
1496        let input_map = Bytes::from("%0\r\nEND");
1497        let (f_map, r_map) = parse_frame(input_map.clone()).unwrap();
1498        assert_eq!(f_map, Frame::Map(vec![]));
1499        assert_eq!(r_map, Bytes::from("END"));
1500        let input_set = Bytes::from("~0\r\nDONE");
1501        let (f_set, r_set) = parse_frame(input_set.clone()).unwrap();
1502        assert_eq!(f_set, Frame::Set(vec![]));
1503        assert_eq!(r_set, Bytes::from("DONE"));
1504        let input_arr = Bytes::from("*-1\r\nFIN");
1505        let (f_arr, r_arr) = parse_frame(input_arr.clone()).unwrap();
1506        assert_eq!(f_arr, Frame::Array(None));
1507        assert_eq!(r_arr, Bytes::from("FIN"));
1508    }
1509
1510    // Round-trip tests for serialization and parsing
1511
1512    #[test]
1513    fn test_roundtrip_simple_string() {
1514        let original = Bytes::from("+hello\r\n");
1515        let (frame, _) = parse_frame(original.clone()).unwrap();
1516        let serialized = frame_to_bytes(&frame);
1517        assert_eq!(original, serialized);
1518
1519        let (reparsed, _) = parse_frame(serialized).unwrap();
1520        assert_eq!(frame, reparsed);
1521    }
1522
1523    #[test]
1524    fn test_roundtrip_error() {
1525        let original = Bytes::from("-ERR error message\r\n");
1526        let (frame, _) = parse_frame(original.clone()).unwrap();
1527        let serialized = frame_to_bytes(&frame);
1528        assert_eq!(original, serialized);
1529
1530        let (reparsed, _) = parse_frame(serialized).unwrap();
1531        assert_eq!(frame, reparsed);
1532    }
1533
1534    #[test]
1535    fn test_roundtrip_integer() {
1536        let original = Bytes::from(":12345\r\n");
1537        let (frame, _) = parse_frame(original.clone()).unwrap();
1538        let serialized = frame_to_bytes(&frame);
1539        assert_eq!(original, serialized);
1540
1541        let (reparsed, _) = parse_frame(serialized).unwrap();
1542        assert_eq!(frame, reparsed);
1543    }
1544
1545    #[test]
1546    fn test_roundtrip_bulk_string() {
1547        let original = Bytes::from("$5\r\nhello\r\n");
1548        let (frame, _) = parse_frame(original.clone()).unwrap();
1549        let serialized = frame_to_bytes(&frame);
1550        assert_eq!(original, serialized);
1551
1552        let (reparsed, _) = parse_frame(serialized).unwrap();
1553        assert_eq!(frame, reparsed);
1554
1555        // Test null bulk string
1556        let original_null = Bytes::from("$-1\r\n");
1557        let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1558        let serialized_null = frame_to_bytes(&frame_null);
1559        assert_eq!(original_null, serialized_null);
1560
1561        let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1562        assert_eq!(frame_null, reparsed_null);
1563    }
1564
1565    #[test]
1566    fn test_roundtrip_blob_error() {
1567        let original = Bytes::from("!5\r\nerror\r\n");
1568        let (frame, _) = parse_frame(original.clone()).unwrap();
1569        let serialized = frame_to_bytes(&frame);
1570        assert_eq!(original, serialized);
1571
1572        let (reparsed, _) = parse_frame(serialized).unwrap();
1573        assert_eq!(frame, reparsed);
1574    }
1575
1576    #[test]
1577    fn test_roundtrip_null() {
1578        let original = Bytes::from("_\r\n");
1579        let (frame, _) = parse_frame(original.clone()).unwrap();
1580        let serialized = frame_to_bytes(&frame);
1581        assert_eq!(original, serialized);
1582
1583        let (reparsed, _) = parse_frame(serialized).unwrap();
1584        assert_eq!(frame, reparsed);
1585    }
1586
1587    #[test]
1588    fn test_roundtrip_double() {
1589        let original = Bytes::from(",3.14159\r\n");
1590        let (frame, _) = parse_frame(original.clone()).unwrap();
1591        let serialized = frame_to_bytes(&frame);
1592
1593        // Note: The exact string representation of floating point numbers might differ
1594        // between parsing and serializing due to formatting differences
1595        let (reparsed, _) = parse_frame(serialized).unwrap();
1596        assert_eq!(frame, reparsed);
1597    }
1598
1599    #[test]
1600    fn test_roundtrip_special_float() {
1601        let original = Bytes::from(",inf\r\n");
1602        let (frame, _) = parse_frame(original.clone()).unwrap();
1603        let serialized = frame_to_bytes(&frame);
1604        assert_eq!(original, serialized);
1605
1606        let (reparsed, _) = parse_frame(serialized).unwrap();
1607        assert_eq!(frame, reparsed);
1608    }
1609
1610    #[test]
1611    fn test_roundtrip_boolean() {
1612        let original_true = Bytes::from("#t\r\n");
1613        let (frame_true, _) = parse_frame(original_true.clone()).unwrap();
1614        let serialized_true = frame_to_bytes(&frame_true);
1615        assert_eq!(original_true, serialized_true);
1616
1617        let (reparsed_true, _) = parse_frame(serialized_true).unwrap();
1618        assert_eq!(frame_true, reparsed_true);
1619
1620        let original_false = Bytes::from("#f\r\n");
1621        let (frame_false, _) = parse_frame(original_false.clone()).unwrap();
1622        let serialized_false = frame_to_bytes(&frame_false);
1623        assert_eq!(original_false, serialized_false);
1624
1625        let (reparsed_false, _) = parse_frame(serialized_false).unwrap();
1626        assert_eq!(frame_false, reparsed_false);
1627    }
1628
1629    #[test]
1630    fn test_roundtrip_big_number() {
1631        let original = Bytes::from("(12345678901234567890\r\n");
1632        let (frame, _) = parse_frame(original.clone()).unwrap();
1633        let serialized = frame_to_bytes(&frame);
1634        assert_eq!(original, serialized);
1635
1636        let (reparsed, _) = parse_frame(serialized).unwrap();
1637        assert_eq!(frame, reparsed);
1638    }
1639
1640    #[test]
1641    fn test_roundtrip_verbatim_string() {
1642        let original = Bytes::from("=10\r\ntxt:hello!\r\n");
1643        let (frame, _) = parse_frame(original.clone()).unwrap();
1644        let serialized = frame_to_bytes(&frame);
1645        assert_eq!(original, serialized);
1646
1647        let (reparsed, _) = parse_frame(serialized).unwrap();
1648        assert_eq!(frame, reparsed);
1649    }
1650
1651    #[test]
1652    fn test_roundtrip_array() {
1653        let original = Bytes::from("*2\r\n+hello\r\n:123\r\n");
1654        let (frame, _) = parse_frame(original.clone()).unwrap();
1655        let serialized = frame_to_bytes(&frame);
1656        assert_eq!(original, serialized);
1657
1658        let (reparsed, _) = parse_frame(serialized).unwrap();
1659        assert_eq!(frame, reparsed);
1660
1661        // Test null array
1662        let original_null = Bytes::from("*-1\r\n");
1663        let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1664        let serialized_null = frame_to_bytes(&frame_null);
1665        assert_eq!(original_null, serialized_null);
1666
1667        let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1668        assert_eq!(frame_null, reparsed_null);
1669    }
1670
1671    #[test]
1672    fn test_roundtrip_set() {
1673        let original = Bytes::from("~2\r\n+one\r\n+two\r\n");
1674        let (frame, _) = parse_frame(original.clone()).unwrap();
1675        let serialized = frame_to_bytes(&frame);
1676        assert_eq!(original, serialized);
1677
1678        let (reparsed, _) = parse_frame(serialized).unwrap();
1679        assert_eq!(frame, reparsed);
1680    }
1681
1682    #[test]
1683    fn test_roundtrip_map() {
1684        let original = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\n");
1685        let (frame, _) = parse_frame(original.clone()).unwrap();
1686        let serialized = frame_to_bytes(&frame);
1687        assert_eq!(original, serialized);
1688
1689        let (reparsed, _) = parse_frame(serialized).unwrap();
1690        assert_eq!(frame, reparsed);
1691    }
1692
1693    #[test]
1694    fn test_roundtrip_attribute() {
1695        let original = Bytes::from("|1\r\n+key\r\n+val\r\n");
1696        let (frame, _) = parse_frame(original.clone()).unwrap();
1697        let serialized = frame_to_bytes(&frame);
1698        assert_eq!(original, serialized);
1699
1700        let (reparsed, _) = parse_frame(serialized).unwrap();
1701        assert_eq!(frame, reparsed);
1702    }
1703
1704    #[test]
1705    fn test_roundtrip_push() {
1706        let original = Bytes::from(">2\r\n+msg\r\n+data\r\n");
1707        let (frame, _) = parse_frame(original.clone()).unwrap();
1708        let serialized = frame_to_bytes(&frame);
1709        assert_eq!(original, serialized);
1710
1711        let (reparsed, _) = parse_frame(serialized).unwrap();
1712        assert_eq!(frame, reparsed);
1713    }
1714
1715    #[test]
1716    fn test_roundtrip_streaming_headers() {
1717        let headers = [
1718            ("$?\r\n", Frame::StreamedStringHeader),
1719            ("!?\r\n", Frame::StreamedBlobErrorHeader),
1720            ("=?\r\n", Frame::StreamedVerbatimStringHeader),
1721            ("*?\r\n", Frame::StreamedArrayHeader),
1722            ("~?\r\n", Frame::StreamedSetHeader),
1723            ("%?\r\n", Frame::StreamedMapHeader),
1724            ("|?\r\n", Frame::StreamedAttributeHeader),
1725            (">?\r\n", Frame::StreamedPushHeader),
1726            (".\r\n", Frame::StreamTerminator),
1727        ];
1728
1729        for (original_str, expected_frame) in headers {
1730            let original = Bytes::from(original_str);
1731            let (frame, _) = parse_frame(original.clone()).unwrap();
1732            assert_eq!(frame, expected_frame);
1733
1734            let serialized = frame_to_bytes(&frame);
1735            assert_eq!(original, serialized);
1736
1737            let (reparsed, _) = parse_frame(serialized).unwrap();
1738            assert_eq!(frame, reparsed);
1739        }
1740    }
1741
1742    #[test]
1743    fn test_roundtrip_streaming_chunks() {
1744        let chunks = [
1745            (
1746                ";4\r\nHell\r\n",
1747                Frame::StreamedStringChunk(Bytes::from("Hell")),
1748            ),
1749            (
1750                ";5\r\no wor\r\n",
1751                Frame::StreamedStringChunk(Bytes::from("o wor")),
1752            ),
1753            (";1\r\nd\r\n", Frame::StreamedStringChunk(Bytes::from("d"))),
1754            (";0\r\n\r\n", Frame::StreamedStringChunk(Bytes::new())),
1755            (
1756                ";11\r\nHello World\r\n",
1757                Frame::StreamedStringChunk(Bytes::from("Hello World")),
1758            ),
1759        ];
1760
1761        for (original_str, expected_frame) in chunks {
1762            let original = Bytes::from(original_str);
1763            let (frame, rest) = parse_frame(original.clone()).unwrap();
1764            assert_eq!(frame, expected_frame);
1765            assert!(rest.is_empty());
1766
1767            let serialized = frame_to_bytes(&frame);
1768            assert_eq!(original, serialized);
1769
1770            let (reparsed, _) = parse_frame(serialized).unwrap();
1771            assert_eq!(frame, reparsed);
1772        }
1773    }
1774
1775    #[test]
1776    fn test_streaming_chunks_edge_cases() {
1777        // Test incomplete chunk (missing data)
1778        let data = Bytes::from(";4\r\nHel");
1779        let result = parse_frame(data);
1780        assert!(matches!(result, Err(ParseError::Incomplete)));
1781
1782        // Test incomplete chunk (missing CRLF)
1783        let data = Bytes::from(";4\r\nHell");
1784        let result = parse_frame(data);
1785        assert!(matches!(result, Err(ParseError::Incomplete)));
1786
1787        // Test invalid length format
1788        let data = Bytes::from(";abc\r\ndata\r\n");
1789        let result = parse_frame(data);
1790        assert!(matches!(result, Err(ParseError::BadLength)));
1791
1792        // Test negative length
1793        let data = Bytes::from(";-1\r\ndata\r\n");
1794        let result = parse_frame(data);
1795        assert!(matches!(result, Err(ParseError::BadLength)));
1796
1797        // Test length mismatch (length says 5 but only 4 bytes)
1798        let data = Bytes::from(";5\r\nHell\r\n");
1799        let result = parse_frame(data);
1800        assert!(matches!(result, Err(ParseError::Incomplete)));
1801
1802        // Test zero-length chunk without trailing CRLF returns Incomplete
1803        let data = Bytes::from(";0\r\n");
1804        let result = parse_frame(data);
1805        assert!(matches!(result, Err(ParseError::Incomplete)));
1806
1807        // Test binary data in chunk
1808        let binary_data = b"\x00\x01\x02\x03\xFF";
1809        let mut chunk_data = Vec::new();
1810        chunk_data.extend_from_slice(b";5\r\n");
1811        chunk_data.extend_from_slice(binary_data);
1812        chunk_data.extend_from_slice(b"\r\n");
1813        let data = Bytes::from(chunk_data);
1814        let result = parse_frame(data);
1815        assert!(result.is_ok());
1816        let (frame, _) = result.unwrap();
1817        if let Frame::StreamedStringChunk(chunk) = frame {
1818            assert_eq!(chunk.as_ref(), binary_data);
1819        }
1820    }
1821
1822    #[test]
1823    fn test_roundtrip_streaming_sequences() {
1824        // Test streaming string roundtrip
1825        let streaming_string = Frame::StreamedString(vec![
1826            Bytes::from("Hell"),
1827            Bytes::from("o wor"),
1828            Bytes::from("ld"),
1829        ]);
1830        let serialized = frame_to_bytes(&streaming_string);
1831        let expected = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;2\r\nld\r\n;0\r\n\r\n";
1832        assert_eq!(serialized, Bytes::from(expected));
1833
1834        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1835        assert_eq!(parsed, streaming_string);
1836
1837        // Test streaming array roundtrip
1838        let streaming_array = Frame::StreamedArray(vec![
1839            Frame::SimpleString(Bytes::from("hello")),
1840            Frame::Integer(42),
1841            Frame::Boolean(true),
1842        ]);
1843        let serialized = frame_to_bytes(&streaming_array);
1844        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1845        assert_eq!(parsed, streaming_array);
1846
1847        // Test streaming map roundtrip
1848        let streaming_map = Frame::StreamedMap(vec![
1849            (
1850                Frame::SimpleString(Bytes::from("key1")),
1851                Frame::SimpleString(Bytes::from("val1")),
1852            ),
1853            (
1854                Frame::SimpleString(Bytes::from("key2")),
1855                Frame::Integer(123),
1856            ),
1857        ]);
1858        let serialized = frame_to_bytes(&streaming_map);
1859        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1860        assert_eq!(parsed, streaming_map);
1861
1862        // Test empty streaming string
1863        let empty_streaming = Frame::StreamedString(vec![]);
1864        let serialized = frame_to_bytes(&empty_streaming);
1865        let expected = "$?\r\n;0\r\n\r\n";
1866        assert_eq!(serialized, Bytes::from(expected));
1867        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1868        assert_eq!(parsed, empty_streaming);
1869
1870        // Test streaming set roundtrip
1871        let streaming_set = Frame::StreamedSet(vec![
1872            Frame::SimpleString(Bytes::from("apple")),
1873            Frame::SimpleString(Bytes::from("banana")),
1874            Frame::Integer(42),
1875        ]);
1876        let serialized = frame_to_bytes(&streaming_set);
1877        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1878        assert_eq!(parsed, streaming_set);
1879
1880        // Test streaming attribute roundtrip
1881        let streaming_attribute = Frame::StreamedAttribute(vec![
1882            (
1883                Frame::SimpleString(Bytes::from("trace-id")),
1884                Frame::SimpleString(Bytes::from("abc123")),
1885            ),
1886            (
1887                Frame::SimpleString(Bytes::from("span-id")),
1888                Frame::SimpleString(Bytes::from("def456")),
1889            ),
1890        ]);
1891        let serialized = frame_to_bytes(&streaming_attribute);
1892        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1893        assert_eq!(parsed, streaming_attribute);
1894
1895        // Test streaming push roundtrip
1896        let streaming_push = Frame::StreamedPush(vec![
1897            Frame::SimpleString(Bytes::from("pubsub")),
1898            Frame::SimpleString(Bytes::from("channel1")),
1899            Frame::SimpleString(Bytes::from("message data")),
1900        ]);
1901        let serialized = frame_to_bytes(&streaming_push);
1902        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1903        assert_eq!(parsed, streaming_push);
1904
1905        // Test empty streaming containers
1906        let empty_array = Frame::StreamedArray(vec![]);
1907        let serialized = frame_to_bytes(&empty_array);
1908        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1909        assert_eq!(parsed, empty_array);
1910
1911        let empty_set = Frame::StreamedSet(vec![]);
1912        let serialized = frame_to_bytes(&empty_set);
1913        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1914        assert_eq!(parsed, empty_set);
1915    }
1916
1917    #[test]
1918    fn test_streaming_sequences_edge_cases() {
1919        // Test incomplete streaming string (missing zero-length terminator)
1920        let data = Bytes::from("$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n");
1921        let result = parse_streaming_sequence(data);
1922        assert!(matches!(result, Err(ParseError::Incomplete)));
1923
1924        // Test malformed chunk in streaming sequence
1925        let data = Bytes::from("$?\r\n;abc\r\nHell\r\n;0\r\n");
1926        let result = parse_streaming_sequence(data);
1927        assert!(matches!(result, Err(ParseError::BadLength)));
1928
1929        // Test streaming array with incomplete terminator
1930        let data = Bytes::from("*?\r\n+hello\r\n:42\r\n");
1931        let result = parse_streaming_sequence(data);
1932        assert!(matches!(result, Err(ParseError::Incomplete)));
1933
1934        // Test mixed streaming and non-streaming content
1935        let data = Bytes::from("*?\r\n+hello\r\n*2\r\n:1\r\n:2\r\n.\r\n");
1936        let result = parse_streaming_sequence(data);
1937        assert!(result.is_ok());
1938        let (frame, _) = result.unwrap();
1939        if let Frame::StreamedArray(items) = frame {
1940            assert_eq!(items.len(), 2);
1941            assert!(matches!(items[0], Frame::SimpleString(_)));
1942            assert!(matches!(items[1], Frame::Array(_)));
1943        }
1944
1945        // Test empty streaming containers
1946        let data = Bytes::from("*?\r\n.\r\n");
1947        let result = parse_streaming_sequence(data);
1948        assert!(result.is_ok());
1949        let (frame, _) = result.unwrap();
1950        if let Frame::StreamedArray(items) = frame {
1951            assert!(items.is_empty());
1952        }
1953
1954        // Test streaming map with odd number of elements (should work)
1955        let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+orphan\r\n.\r\n");
1956        let result = parse_streaming_sequence(data);
1957        assert!(matches!(result, Err(ParseError::Incomplete)));
1958
1959        // Test non-streaming frame passed to parse_streaming_sequence
1960        let data = Bytes::from("+simple\r\n");
1961        let result = parse_streaming_sequence(data);
1962        assert!(result.is_ok());
1963        let (frame, _) = result.unwrap();
1964        assert!(matches!(frame, Frame::SimpleString(_)));
1965
1966        // Test extremely large chunk size (should fail gracefully)
1967        let data = Bytes::from(";999999999999999999\r\ndata\r\n");
1968        let result = parse_frame(data);
1969        // The parsing might succeed but fail later when trying to read the data
1970        // Let's check what actually happens and accept either BadLength or Incomplete
1971        match &result {
1972            Err(ParseError::BadLength) => {}  // Expected
1973            Err(ParseError::Incomplete) => {} // Also acceptable - might not have enough data for huge chunk
1974            Err(e) => panic!("Got unexpected error type: {e:?}"),
1975            Ok(_) => panic!("Large chunk size should fail"),
1976        }
1977
1978        // Test streaming string with non-chunk frame mixed in
1979        let data = Bytes::from("$?\r\n+invalid\r\n;0\r\n");
1980        let result = parse_streaming_sequence(data);
1981        assert!(matches!(result, Err(ParseError::InvalidFormat)));
1982
1983        // Test streaming sequence with corrupted terminator
1984        let data = Bytes::from("*?\r\n+hello\r\n.corrupted\r\n");
1985        let result = parse_streaming_sequence(data);
1986        assert!(matches!(result, Err(ParseError::Incomplete)));
1987
1988        // Test empty input to parse_streaming_sequence
1989        let data = Bytes::new();
1990        let result = parse_streaming_sequence(data);
1991        assert!(matches!(result, Err(ParseError::Incomplete)));
1992
1993        // Test streaming sequence with partial frame at end
1994        let data = Bytes::from("*?\r\n+hello\r\n$5\r\nwo");
1995        let result = parse_streaming_sequence(data);
1996        assert!(matches!(result, Err(ParseError::Incomplete)));
1997    }
1998
1999    #[test]
2000    fn test_roundtrip_nested_structures() {
2001        // Test a complex nested structure
2002        let original = Bytes::from(
2003            "*3\r\n+hello\r\n%2\r\n+key1\r\n:123\r\n+key2\r\n~1\r\n+item\r\n|1\r\n+meta\r\n+data\r\n",
2004        );
2005        let (frame, _) = parse_frame(original.clone()).unwrap();
2006        let serialized = frame_to_bytes(&frame);
2007
2008        let (reparsed, _) = parse_frame(serialized).unwrap();
2009        assert_eq!(frame, reparsed);
2010    }
2011
2012    #[test]
2013    fn test_zero_length_bulk_string_requires_trailing_crlf() {
2014        // Complete: $0\r\n\r\n
2015        let input = Bytes::from("$0\r\n\r\nTAIL");
2016        let (frame, rest) = parse_frame(input).unwrap();
2017        assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
2018        assert_eq!(rest, Bytes::from("TAIL"));
2019
2020        // Incomplete: $0\r\n with no trailing data
2021        let input = Bytes::from("$0\r\n");
2022        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2023
2024        // Incomplete: $0\r\n with only one byte
2025        let input = Bytes::from("$0\r\n\r");
2026        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2027
2028        // Invalid: $0\r\n followed by non-CRLF
2029        let input = Bytes::from("$0\r\nXY");
2030        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2031    }
2032
2033    #[test]
2034    fn test_zero_length_streamed_chunk_requires_trailing_crlf() {
2035        // Complete: ;0\r\n\r\n
2036        let input = Bytes::from(";0\r\n\r\nTAIL");
2037        let (frame, rest) = parse_frame(input).unwrap();
2038        assert_eq!(frame, Frame::StreamedStringChunk(Bytes::new()));
2039        assert_eq!(rest, Bytes::from("TAIL"));
2040
2041        // Incomplete: ;0\r\n with no trailing data
2042        let input = Bytes::from(";0\r\n");
2043        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2044
2045        // Invalid: ;0\r\n followed by non-CRLF
2046        let input = Bytes::from(";0\r\nXY");
2047        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2048    }
2049
2050    #[test]
2051    fn test_integer_overflow_returns_overflow_error() {
2052        // One past i64::MAX
2053        let input = Bytes::from(":9223372036854775808\r\n");
2054        assert_eq!(parse_frame(input), Err(ParseError::Overflow));
2055
2056        // i64::MAX should succeed
2057        let input = Bytes::from(":9223372036854775807\r\n");
2058        let (frame, _) = parse_frame(input).unwrap();
2059        assert_eq!(frame, Frame::Integer(i64::MAX));
2060
2061        // i64::MIN should succeed
2062        let input = Bytes::from(":-9223372036854775808\r\n");
2063        let (frame, _) = parse_frame(input).unwrap();
2064        assert_eq!(frame, Frame::Integer(i64::MIN));
2065    }
2066
2067    #[test]
2068    fn test_parser_propagates_errors() {
2069        let mut parser = Parser::new();
2070        parser.feed(Bytes::from("XINVALID\r\n"));
2071        let result = parser.next_frame();
2072        assert!(result.is_err());
2073        assert_eq!(result.unwrap_err(), ParseError::InvalidTag(b'X'));
2074    }
2075
2076    #[test]
2077    fn test_parser_returns_ok_none_for_incomplete() {
2078        let mut parser = Parser::new();
2079        parser.feed(Bytes::from("+HELL"));
2080        assert_eq!(parser.next_frame().unwrap(), None);
2081    }
2082}