Skip to main content

resp_rs/
resp3.rs

1//! Zero-copy RESP3 parser.
2//!
3//! Parses RESP3 frames using `bytes::Bytes` for efficient, zero-copy operation.
4//! Supports all RESP3 data types, including fixed-length and streaming variants.
5
6use bytes::{BufMut, Bytes, BytesMut};
7
8/// Maximum reasonable size for collections to prevent DoS attacks
9/// Set to 10 million elements (reasonable for real-world Redis usage)
10const MAX_COLLECTION_SIZE: usize = 10_000_000;
11
12/// A streaming parser for RESP3 frames.
13///
14/// This parser allows feeding data in chunks and extracting frames as they become available.
15/// It maintains an internal buffer of accumulated data and attempts to parse frames from it.
16#[derive(Default, Debug)]
17pub struct Parser {
18    buffer: BytesMut,
19}
20
21impl Parser {
22    /// Creates a new parser with an empty buffer.
23    pub fn new() -> Self {
24        Self {
25            buffer: BytesMut::new(),
26        }
27    }
28
29    /// Feeds a chunk of data into the parser.
30    ///
31    /// The data is appended to the internal buffer.
32    pub fn feed(&mut self, data: Bytes) {
33        self.buffer.extend_from_slice(&data);
34    }
35
36    /// Attempts to extract the next complete frame from the buffer.
37    ///
38    /// Returns `Ok(None)` if there is not enough data to parse a complete frame.
39    /// Returns `Ok(Some(frame))` on success, consuming the parsed bytes.
40    /// Returns `Err` on protocol errors, clearing the buffer.
41    pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
42        if self.buffer.is_empty() {
43            return Ok(None);
44        }
45
46        let bytes = self.buffer.split().freeze();
47
48        match parse_frame_inner(&bytes, 0) {
49            Ok((frame, consumed)) => {
50                if consumed < bytes.len() {
51                    self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
52                }
53                Ok(Some(frame))
54            }
55            Err(ParseError::Incomplete) => {
56                self.buffer.unsplit(bytes.into());
57                Ok(None)
58            }
59            Err(e) => Err(e),
60        }
61    }
62
63    /// Returns the number of bytes currently in the buffer.
64    pub fn buffered_bytes(&self) -> usize {
65        self.buffer.len()
66    }
67
68    /// Clears the internal buffer.
69    pub fn clear(&mut self) {
70        self.buffer.clear();
71    }
72}
73
74// --- Zero-copy Frame enum and parser using bytes::Bytes ---
75/// A parsed RESP3 frame.
76///
77/// Each variant corresponds to one of the RESP3 types, including both fixed-length
78/// and streaming headers for bulk strings, arrays, sets, maps, attributes, and pushes.
79#[derive(Debug, Clone, PartialEq)]
80pub enum Frame {
81    /// Simple string: +&lt;string&gt;\r\n
82    SimpleString(Bytes),
83    /// Simple error: -&lt;error&gt;\r\n
84    Error(Bytes),
85    /// Integer: :&lt;number&gt;\r\n
86    Integer(i64),
87    /// Blob string: $&lt;length&gt;\r\n&lt;bytes&gt;\r\n
88    // BulkString(Option<Vec<u8>>),
89    BulkString(Option<Bytes>),
90    /// Blob error: !&lt;length&gt;\r\n&lt;bytes&gt;\r\n
91    BlobError(Bytes),
92    /// Streaming blob string header: $?\r\n
93    StreamedStringHeader,
94    /// Streaming blob error header: !?\r\n
95    StreamedBlobErrorHeader,
96    /// Streaming verbatim string header: =?\r\n
97    StreamedVerbatimStringHeader,
98    /// Streaming array header: *?\r\n
99    StreamedArrayHeader,
100    /// Streaming set header: ~?\r\n
101    StreamedSetHeader,
102    /// Streaming map header: %?\r\n
103    StreamedMapHeader,
104    /// Streaming attribute header: |?\r\n
105    StreamedAttributeHeader,
106    /// Streaming push header: >?\r\n
107    StreamedPushHeader,
108    /// Streaming string chunk: ;length\r\ndata\r\n
109    ///
110    /// Represents an individual chunk in a streaming string sequence.
111    /// These chunks are parsed from `;{length}\r\n{data}\r\n` format.
112    /// A zero-length chunk (`;0\r\n`) indicates the end of the stream.
113    ///
114    /// # Example
115    /// ```
116    /// use resp_rs::resp3::Frame;
117    /// use bytes::Bytes;
118    ///
119    /// // Chunk containing "Hello"
120    /// // Wire format: ;5\r\nHello\r\n
121    /// Frame::StreamedStringChunk(Bytes::from("Hello"));
122    /// ```
123    StreamedStringChunk(Bytes),
124
125    /// Accumulated streaming string data from multiple chunks
126    ///
127    /// Created by `parse_streaming_sequence()` when parsing a complete
128    /// streaming string sequence (`$?\r\n` + chunks + `;0\r\n`).
129    /// Contains all chunks in order, allowing reconstruction of the full string.
130    ///
131    /// # Example
132    /// ```
133    /// use resp_rs::resp3::Frame;
134    /// use bytes::Bytes;
135    ///
136    /// // Represents "Hello world" from chunks ["Hello ", "world"]
137    /// Frame::StreamedString(vec![
138    ///     Bytes::from("Hello "),
139    ///     Bytes::from("world")
140    /// ]);
141    /// ```
142    StreamedString(Vec<Bytes>),
143
144    /// Accumulated streaming array data from multiple chunks
145    ///
146    /// Created when parsing a streaming array sequence (`*?\r\n` + frames + `.\r\n`).
147    /// Contains all frames that were streamed as part of the array.
148    ///
149    /// # Example
150    /// ```
151    /// use resp_rs::resp3::Frame;
152    /// use bytes::Bytes;
153    ///
154    /// // Array with mixed types
155    /// Frame::StreamedArray(vec![
156    ///     Frame::SimpleString(Bytes::from("hello")),
157    ///     Frame::Integer(42),
158    ///     Frame::Boolean(true)
159    /// ]);
160    /// ```
161    StreamedArray(Vec<Frame>),
162
163    /// Accumulated streaming set data from multiple chunks
164    ///
165    /// Created when parsing a streaming set sequence (`~?\r\n` + frames + `.\r\n`).
166    /// Contains all unique elements that were streamed as part of the set.
167    StreamedSet(Vec<Frame>),
168
169    /// Accumulated streaming map data from multiple chunks
170    ///
171    /// Created when parsing a streaming map sequence (`%?\r\n` + key-value pairs + `.\r\n`).
172    /// Contains all key-value pairs that were streamed as part of the map.
173    ///
174    /// # Example
175    /// ```
176    /// use resp_rs::resp3::Frame;
177    /// use bytes::Bytes;
178    ///
179    /// Frame::StreamedMap(vec![
180    ///     (Frame::SimpleString(Bytes::from("name")), Frame::SimpleString(Bytes::from("Alice"))),
181    ///     (Frame::SimpleString(Bytes::from("age")), Frame::Integer(25))
182    /// ]);
183    /// ```
184    StreamedMap(Vec<(Frame, Frame)>),
185
186    /// Accumulated streaming attribute data from multiple chunks
187    ///
188    /// Created when parsing a streaming attribute sequence (`|?\r\n` + key-value pairs + `.\r\n`).
189    /// Attributes provide out-of-band metadata that doesn't affect the main data structure.
190    StreamedAttribute(Vec<(Frame, Frame)>),
191
192    /// Accumulated streaming push data from multiple chunks
193    ///
194    /// Created when parsing a streaming push sequence (`>?\r\n` + frames + `.\r\n`).
195    /// Push messages are server-initiated communications (e.g., pub/sub messages).
196    ///
197    /// # Example
198    /// ```
199    /// use resp_rs::resp3::Frame;
200    /// use bytes::Bytes;
201    ///
202    /// // Pub/sub message
203    /// Frame::StreamedPush(vec![
204    ///     Frame::SimpleString(Bytes::from("pubsub")),
205    ///     Frame::SimpleString(Bytes::from("channel1")),
206    ///     Frame::SimpleString(Bytes::from("message content"))
207    /// ]);
208    /// ```
209    StreamedPush(Vec<Frame>),
210    /// End-of-stream terminator for all chunked sequences: .\r\n
211    StreamTerminator,
212    /// Null: _\r\n
213    Null,
214    /// Double: ,&lt;float&gt;\r\n
215    Double(f64),
216    /// Special Float: ,inf\r\n, -inf\r\n, nan\r\n
217    SpecialFloat(Bytes),
218    /// Boolean: #t\r\n or #f\r\n
219    Boolean(bool),
220    /// Big number: (&lt;number&gt;\r\n
221    BigNumber(Bytes),
222    /// Verbatim string: =format:content\r\n
223    // VerbatimString { format: String, content: String },
224    VerbatimString(Bytes, Bytes),
225    /// Array: *&lt;count&gt;\r\n... (or streaming header *?\r\n)
226    Array(Option<Vec<Frame>>),
227    /// Set: ~&lt;count&gt;\r\n... (or streaming header ~?\r\n)
228    Set(Vec<Frame>),
229    /// Map: %&lt;count&gt;\r\n... (or streaming header %?\r\n)
230    Map(Vec<(Frame, Frame)>),
231    /// Attribute: |&lt;count&gt;\r\n... (or streaming header |?\r\n)
232    Attribute(Vec<(Frame, Frame)>),
233    /// Push: > &lt;count&gt;\r\n... (or streaming header >?\r\n)
234    Push(Vec<Frame>),
235}
236
237pub use crate::ParseError;
238
239/// Parse a single RESP3 frame from the provided `Bytes`.
240///
241/// Returns the parsed `Frame` and the remaining unconsumed `Bytes`, or a `ParseError` on failure.
242pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
243    let (frame, consumed) = parse_frame_inner(&input, 0)?;
244    Ok((frame, input.slice(consumed..)))
245}
246
247/// Offset-based internal parser. Works with byte positions to avoid creating
248/// intermediate `Bytes::slice()` objects. Only slices for actual frame data.
249fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
250    let buf = input.as_ref();
251    if pos >= buf.len() {
252        return Err(ParseError::Incomplete);
253    }
254
255    let tag = buf[pos];
256
257    match tag {
258        b'+' => {
259            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
260            Ok((
261                Frame::SimpleString(input.slice(pos + 1..line_end)),
262                after_crlf,
263            ))
264        }
265        b'-' => {
266            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
267            Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
268        }
269        b':' => {
270            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
271            let v = parse_i64(&buf[pos + 1..line_end])?;
272            Ok((Frame::Integer(v), after_crlf))
273        }
274        b'$' => {
275            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
276            let len_bytes = &buf[pos + 1..line_end];
277            // streaming header?
278            if len_bytes == b"?" {
279                return Ok((Frame::StreamedStringHeader, after_crlf));
280            }
281            // null bulk
282            if len_bytes == b"-1" {
283                return Ok((Frame::BulkString(None), after_crlf));
284            }
285            let len = parse_usize(len_bytes)?;
286            if len == 0 {
287                if after_crlf + 1 >= buf.len() {
288                    return Err(ParseError::Incomplete);
289                }
290                if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
291                    return Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2));
292                } else {
293                    return Err(ParseError::InvalidFormat);
294                }
295            }
296            let data_start = after_crlf;
297            let data_end = data_start + len;
298            if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
299                return Err(ParseError::Incomplete);
300            }
301            Ok((
302                Frame::BulkString(Some(input.slice(data_start..data_end))),
303                data_end + 2,
304            ))
305        }
306        b'_' => {
307            if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
308                Ok((Frame::Null, pos + 3))
309            } else {
310                Err(ParseError::Incomplete)
311            }
312        }
313        b',' => {
314            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
315            let line_bytes = &buf[pos + 1..line_end];
316            // special float cases
317            if line_bytes == b"inf" || line_bytes == b"-inf" || line_bytes == b"nan" {
318                return Ok((
319                    Frame::SpecialFloat(input.slice(pos + 1..line_end)),
320                    after_crlf,
321                ));
322            }
323            // numeric double
324            let s = std::str::from_utf8(line_bytes).map_err(|_| ParseError::Utf8Error)?;
325            let v = s.parse::<f64>().map_err(|_| ParseError::InvalidFormat)?;
326            Ok((Frame::Double(v), after_crlf))
327        }
328        b'#' => {
329            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
330            match &buf[pos + 1..line_end] {
331                b"t" => Ok((Frame::Boolean(true), after_crlf)),
332                b"f" => Ok((Frame::Boolean(false), after_crlf)),
333                _ => Err(ParseError::InvalidBoolean),
334            }
335        }
336        b'(' => {
337            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
338            Ok((Frame::BigNumber(input.slice(pos + 1..line_end)), after_crlf))
339        }
340        b'=' => {
341            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
342            let len_bytes = &buf[pos + 1..line_end];
343            // streaming header
344            if len_bytes == b"?" {
345                return Ok((Frame::StreamedVerbatimStringHeader, after_crlf));
346            }
347            // null case
348            if len_bytes == b"-1" {
349                return Ok((
350                    Frame::VerbatimString(Bytes::new(), Bytes::new()),
351                    after_crlf,
352                ));
353            }
354            let len = parse_usize(len_bytes)?;
355            let data_start = after_crlf;
356            let data_end = data_start + len;
357            if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
358                return Err(ParseError::Incomplete);
359            }
360            // find colon separator in payload
361            let sep = buf[data_start..data_end]
362                .iter()
363                .position(|&b| b == b':')
364                .ok_or(ParseError::InvalidFormat)?;
365            let format = input.slice(data_start..data_start + sep);
366            let content = input.slice(data_start + sep + 1..data_end);
367            Ok((Frame::VerbatimString(format, content), data_end + 2))
368        }
369        b'!' => {
370            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
371            let len_bytes = &buf[pos + 1..line_end];
372            // streaming blob error header
373            if len_bytes == b"?" {
374                return Ok((Frame::StreamedBlobErrorHeader, after_crlf));
375            }
376            if len_bytes == b"-1" {
377                return Ok((Frame::BlobError(Bytes::new()), after_crlf));
378            }
379            let len = parse_usize(len_bytes)?;
380            let data_start = after_crlf;
381            let data_end = data_start + len;
382            if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
383                return Err(ParseError::Incomplete);
384            }
385            Ok((
386                Frame::BlobError(input.slice(data_start..data_end)),
387                data_end + 2,
388            ))
389        }
390        b'*' => {
391            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
392            let len_bytes = &buf[pos + 1..line_end];
393            if len_bytes == b"?" {
394                return Ok((Frame::StreamedArrayHeader, after_crlf));
395            }
396            if len_bytes == b"-1" {
397                return Ok((Frame::Array(None), after_crlf));
398            }
399            let count = parse_count(len_bytes)?;
400            if count == 0 {
401                return Ok((Frame::Array(Some(Vec::new())), after_crlf));
402            }
403            let mut cursor = after_crlf;
404            let mut items = Vec::with_capacity(count);
405            for _ in 0..count {
406                let (item, next) = parse_frame_inner(input, cursor)?;
407                items.push(item);
408                cursor = next;
409            }
410            Ok((Frame::Array(Some(items)), cursor))
411        }
412        b'~' => {
413            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
414            let len_bytes = &buf[pos + 1..line_end];
415            if len_bytes == b"?" {
416                return Ok((Frame::StreamedSetHeader, after_crlf));
417            }
418            let count = parse_count(len_bytes)?;
419            let mut cursor = after_crlf;
420            let mut items = Vec::with_capacity(count);
421            for _ in 0..count {
422                let (item, next) = parse_frame_inner(input, cursor)?;
423                items.push(item);
424                cursor = next;
425            }
426            Ok((Frame::Set(items), cursor))
427        }
428        b'%' | b'|' => {
429            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
430            let len_bytes = &buf[pos + 1..line_end];
431            if len_bytes == b"?" {
432                return if tag == b'%' {
433                    Ok((Frame::StreamedMapHeader, after_crlf))
434                } else {
435                    Ok((Frame::StreamedAttributeHeader, after_crlf))
436                };
437            }
438            let count = parse_count(len_bytes)?;
439            let mut cursor = after_crlf;
440            let mut pairs = Vec::with_capacity(count);
441            for _ in 0..count {
442                let (key, next1) = parse_frame_inner(input, cursor)?;
443                let (val, next2) = parse_frame_inner(input, next1)?;
444                pairs.push((key, val));
445                cursor = next2;
446            }
447            if tag == b'%' {
448                Ok((Frame::Map(pairs), cursor))
449            } else {
450                Ok((Frame::Attribute(pairs), cursor))
451            }
452        }
453        b'>' => {
454            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
455            let len_bytes = &buf[pos + 1..line_end];
456            if len_bytes == b"?" {
457                return Ok((Frame::StreamedPushHeader, after_crlf));
458            }
459            let count = parse_count(len_bytes)?;
460            let mut cursor = after_crlf;
461            let mut items = Vec::with_capacity(count);
462            for _ in 0..count {
463                let (item, next) = parse_frame_inner(input, cursor)?;
464                items.push(item);
465                cursor = next;
466            }
467            Ok((Frame::Push(items), cursor))
468        }
469        b';' => {
470            let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
471            let len = parse_usize(&buf[pos + 1..line_end])?;
472            if len == 0 {
473                if after_crlf + 1 >= buf.len() {
474                    return Err(ParseError::Incomplete);
475                }
476                if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
477                    return Ok((Frame::StreamedStringChunk(Bytes::new()), after_crlf + 2));
478                } else {
479                    return Err(ParseError::InvalidFormat);
480                }
481            }
482            let data_start = after_crlf;
483            let data_end = data_start + len;
484            if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
485                return Err(ParseError::Incomplete);
486            }
487            Ok((
488                Frame::StreamedStringChunk(input.slice(data_start..data_end)),
489                data_end + 2,
490            ))
491        }
492        b'.' => {
493            if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
494                Ok((Frame::StreamTerminator, pos + 3))
495            } else {
496                Err(ParseError::Incomplete)
497            }
498        }
499        _ => Err(ParseError::InvalidTag(tag)),
500    }
501}
502
503/// Parse a complete RESP3 streaming sequence, accumulating chunks until termination.
504///
505/// This function handles RESP3 streaming sequences that begin with streaming headers
506/// (`$?`, `*?`, `~?`, `%?`, `|?`, `>?`) and accumulates the following data until
507/// the appropriate terminator is encountered.
508///
509/// # Streaming Types Supported
510///
511/// - **Streaming Strings**: `$?\r\n` followed by chunks terminated with `;0\r\n`
512/// - **Streaming Arrays**: `*?\r\n` followed by frames terminated with `.\r\n`
513/// - **Streaming Sets**: `~?\r\n` followed by frames terminated with `.\r\n`
514/// - **Streaming Maps**: `%?\r\n` followed by key-value pairs terminated with `.\r\n`
515/// - **Streaming Attributes**: `|?\r\n` followed by key-value pairs terminated with `.\r\n`
516/// - **Streaming Push**: `>?\r\n` followed by frames terminated with `.\r\n`
517///
518/// # Examples
519///
520/// ## Streaming String
521/// ```rust
522/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
523/// use bytes::Bytes;
524///
525/// let data = Bytes::from("$?\r\n;4\r\nHell\r\n;6\r\no worl\r\n;1\r\nd\r\n;0\r\n\r\n");
526/// let (frame, rest) = parse_streaming_sequence(data).unwrap();
527///
528/// if let Frame::StreamedString(chunks) = frame {
529///     assert_eq!(chunks.len(), 3);
530///     let full_string: String = chunks.iter()
531///         .map(|chunk| std::str::from_utf8(chunk).unwrap())
532///         .collect::<Vec<_>>()
533///         .join("");
534///     assert_eq!(full_string, "Hello world");
535/// }
536/// assert!(rest.is_empty());
537/// ```
538///
539/// ## Streaming Array
540/// ```rust
541/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
542/// use bytes::Bytes;
543///
544/// let data = Bytes::from("*?\r\n+hello\r\n:42\r\n#t\r\n.\r\n");
545/// let (frame, _) = parse_streaming_sequence(data).unwrap();
546///
547/// if let Frame::StreamedArray(items) = frame {
548///     assert_eq!(items.len(), 3);
549///     // items[0] = SimpleString("hello")
550///     // items[1] = Integer(42)
551///     // items[2] = Boolean(true)
552/// }
553/// ```
554///
555/// ## Streaming Map
556/// ```rust
557/// use resp_rs::resp3::{parse_streaming_sequence, Frame};
558/// use bytes::Bytes;
559///
560/// let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+key2\r\n:123\r\n.\r\n");
561/// let (frame, _) = parse_streaming_sequence(data).unwrap();
562///
563/// if let Frame::StreamedMap(pairs) = frame {
564///     assert_eq!(pairs.len(), 2);
565///     // pairs[0] = (SimpleString("key1"), SimpleString("val1"))
566///     // pairs[1] = (SimpleString("key2"), Integer(123))
567/// }
568/// ```
569///
570/// # Errors
571///
572/// Returns `ParseError::Incomplete` if the stream is not complete or if required
573/// terminators are missing. Returns `ParseError::InvalidFormat` for malformed
574/// chunk data or unexpected frame types within streaming sequences.
575///
576/// # Notes
577///
578/// - For non-streaming frames, this function simply returns the parsed frame
579/// - Streaming string chunks are accumulated in order
580/// - All other streaming types accumulate complete frames until termination
581/// - Zero-copy parsing is used where possible to minimize allocations
582pub fn parse_streaming_sequence(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
583    if input.is_empty() {
584        return Err(ParseError::Incomplete);
585    }
586
587    let (header, mut rest) = parse_frame(input)?;
588
589    match header {
590        Frame::StreamedStringHeader => {
591            // Parse streaming string chunks until zero-length chunk
592            let mut chunks = Vec::new();
593
594            loop {
595                let (frame, new_rest) = parse_frame(rest)?;
596                rest = new_rest;
597
598                match frame {
599                    Frame::StreamedStringChunk(chunk) => {
600                        if chunk.is_empty() {
601                            // Zero-length chunk indicates end of stream
602                            break;
603                        }
604                        chunks.push(chunk);
605                    }
606                    _ => {
607                        return Err(ParseError::InvalidFormat);
608                    }
609                }
610            }
611
612            Ok((Frame::StreamedString(chunks), rest))
613        }
614        Frame::StreamedBlobErrorHeader => {
615            // Parse streaming blob error chunks until zero-length chunk
616            // Similar to StreamedStringHeader but for blob errors
617            let mut chunks = Vec::new();
618
619            loop {
620                let (frame, new_rest) = parse_frame(rest)?;
621                rest = new_rest;
622
623                match frame {
624                    Frame::BlobError(chunk) => {
625                        // Check for terminator by parsing as bulk string
626                        // In streaming context, !0\r\n acts as terminator
627                        if chunk.is_empty() {
628                            break;
629                        }
630                        chunks.push(chunk);
631                    }
632                    _ => {
633                        return Err(ParseError::InvalidFormat);
634                    }
635                }
636            }
637
638            // Concatenate all error chunks
639            let total_len: usize = chunks.iter().map(|c| c.len()).sum();
640            let mut combined = Vec::with_capacity(total_len);
641            for chunk in chunks {
642                combined.extend_from_slice(&chunk);
643            }
644            Ok((Frame::BlobError(Bytes::from(combined)), rest))
645        }
646        Frame::StreamedVerbatimStringHeader => {
647            // Parse streaming verbatim string chunks until zero-length chunk
648            let mut chunks = Vec::new();
649
650            loop {
651                let (frame, new_rest) = parse_frame(rest)?;
652                rest = new_rest;
653
654                match frame {
655                    Frame::VerbatimString(format, content) => {
656                        // Zero-length content indicates end of stream
657                        if content.is_empty() && format.len() == 3 {
658                            break;
659                        }
660                        // For now, just collect the content parts
661                        // In a full implementation, we'd validate format consistency
662                        chunks.push((format, content));
663                    }
664                    _ => {
665                        return Err(ParseError::InvalidFormat);
666                    }
667                }
668            }
669
670            // Concatenate all chunks, using format from first chunk
671            if chunks.is_empty() {
672                return Ok((Frame::VerbatimString(Bytes::new(), Bytes::new()), rest));
673            }
674
675            let format = chunks[0].0.clone();
676            let total_len: usize = chunks.iter().map(|(_, c)| c.len()).sum();
677            let mut combined = Vec::with_capacity(total_len);
678            for (_, content) in chunks {
679                combined.extend_from_slice(&content);
680            }
681            Ok((Frame::VerbatimString(format, Bytes::from(combined)), rest))
682        }
683        Frame::StreamedArrayHeader => {
684            // Parse streaming array items until terminator
685            let mut items = Vec::new();
686
687            loop {
688                let (frame, new_rest) = parse_frame(rest)?;
689                rest = new_rest;
690
691                match frame {
692                    Frame::StreamTerminator => {
693                        break;
694                    }
695                    item => {
696                        items.push(item);
697                    }
698                }
699            }
700
701            Ok((Frame::StreamedArray(items), rest))
702        }
703        Frame::StreamedSetHeader => {
704            // Parse streaming set items until terminator
705            let mut items = Vec::new();
706
707            loop {
708                let (frame, new_rest) = parse_frame(rest)?;
709                rest = new_rest;
710
711                match frame {
712                    Frame::StreamTerminator => {
713                        break;
714                    }
715                    item => {
716                        items.push(item);
717                    }
718                }
719            }
720
721            Ok((Frame::StreamedSet(items), rest))
722        }
723        Frame::StreamedMapHeader => {
724            // Parse streaming map pairs until terminator
725            let mut pairs = Vec::new();
726
727            loop {
728                let (frame, new_rest) = parse_frame(rest)?;
729                rest = new_rest;
730
731                match frame {
732                    Frame::StreamTerminator => {
733                        break;
734                    }
735                    key => {
736                        let (value, newer_rest) = parse_frame(rest)?;
737                        rest = newer_rest;
738                        pairs.push((key, value));
739                    }
740                }
741            }
742
743            Ok((Frame::StreamedMap(pairs), rest))
744        }
745        Frame::StreamedAttributeHeader => {
746            // Parse streaming attribute pairs until terminator
747            let mut pairs = Vec::new();
748
749            loop {
750                let (frame, new_rest) = parse_frame(rest)?;
751                rest = new_rest;
752
753                match frame {
754                    Frame::StreamTerminator => {
755                        break;
756                    }
757                    key => {
758                        let (value, newer_rest) = parse_frame(rest)?;
759                        rest = newer_rest;
760                        pairs.push((key, value));
761                    }
762                }
763            }
764
765            Ok((Frame::StreamedAttribute(pairs), rest))
766        }
767        Frame::StreamedPushHeader => {
768            // Parse streaming push items until terminator
769            let mut items = Vec::new();
770
771            loop {
772                let (frame, new_rest) = parse_frame(rest)?;
773                rest = new_rest;
774
775                match frame {
776                    Frame::StreamTerminator => {
777                        break;
778                    }
779                    item => {
780                        items.push(item);
781                    }
782                }
783            }
784
785            Ok((Frame::StreamedPush(items), rest))
786        }
787        _ => {
788            // Not a streaming sequence, just return the original frame
789            Ok((header, rest))
790        }
791    }
792}
793
794/// Find `\r\n` in `buf` starting at `from`. Returns `(line_end, after_crlf)` where
795/// `line_end` is the position of `\r` and `after_crlf` is the position after `\n`.
796#[inline]
797fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
798    let mut i = from;
799    let len = buf.len();
800    while i + 1 < len {
801        if buf[i] == b'\r' && buf[i + 1] == b'\n' {
802            return Ok((i, i + 2));
803        }
804        i += 1;
805    }
806    Err(ParseError::Incomplete)
807}
808
809/// Parse a `usize` directly from ASCII digit bytes, no UTF-8 validation needed.
810#[inline]
811fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
812    if buf.is_empty() {
813        return Err(ParseError::BadLength);
814    }
815    let mut v: usize = 0;
816    for &b in buf {
817        if !b.is_ascii_digit() {
818            return Err(ParseError::BadLength);
819        }
820        v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
821        v = v
822            .checked_add((b - b'0') as usize)
823            .ok_or(ParseError::BadLength)?;
824    }
825    Ok(v)
826}
827
828/// Parse an `i64` directly from ASCII bytes (optional leading `-`), no UTF-8 validation.
829#[inline]
830fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
831    if buf.is_empty() {
832        return Err(ParseError::InvalidFormat);
833    }
834    let (neg, digits) = if buf[0] == b'-' {
835        (true, &buf[1..])
836    } else {
837        (false, buf)
838    };
839    if digits.is_empty() {
840        return Err(ParseError::InvalidFormat);
841    }
842    let mut v: i64 = 0;
843    for (i, &d) in digits.iter().enumerate() {
844        if !d.is_ascii_digit() {
845            return Err(ParseError::InvalidFormat);
846        }
847        let digit = (d - b'0') as i64;
848        if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
849            return Ok(i64::MIN);
850        }
851        if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
852            return Err(ParseError::Overflow);
853        }
854        v = v * 10 + digit;
855    }
856    if neg { Ok(-v) } else { Ok(v) }
857}
858
859/// Parse a collection count (usize) with MAX_COLLECTION_SIZE check.
860#[inline]
861fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
862    let count = parse_usize(buf)?;
863    if count > MAX_COLLECTION_SIZE {
864        return Err(ParseError::BadLength);
865    }
866    Ok(count)
867}
868
869/// Converts a Frame to its RESP3 byte representation.
870///
871/// This function serializes a Frame into the corresponding RESP3 protocol bytes.
872pub fn frame_to_bytes(frame: &Frame) -> Bytes {
873    let mut buf = BytesMut::new();
874    serialize_frame(frame, &mut buf);
875    buf.freeze()
876}
877
878fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
879    match frame {
880        Frame::SimpleString(s) => {
881            buf.put_u8(b'+');
882            buf.extend_from_slice(s);
883            buf.extend_from_slice(b"\r\n");
884        }
885        Frame::Error(e) => {
886            buf.put_u8(b'-');
887            buf.extend_from_slice(e);
888            buf.extend_from_slice(b"\r\n");
889        }
890        Frame::Integer(i) => {
891            buf.put_u8(b':');
892            let s = i.to_string();
893            buf.extend_from_slice(s.as_bytes());
894            buf.extend_from_slice(b"\r\n");
895        }
896        Frame::BulkString(opt) => {
897            buf.put_u8(b'$');
898            match opt {
899                Some(data) => {
900                    let len = data.len().to_string();
901                    buf.extend_from_slice(len.as_bytes());
902                    buf.extend_from_slice(b"\r\n");
903                    buf.extend_from_slice(data);
904                    buf.extend_from_slice(b"\r\n");
905                }
906                None => {
907                    buf.extend_from_slice(b"-1\r\n");
908                }
909            }
910        }
911        Frame::BlobError(data) => {
912            buf.put_u8(b'!');
913            let len = data.len().to_string();
914            buf.extend_from_slice(len.as_bytes());
915            buf.extend_from_slice(b"\r\n");
916            buf.extend_from_slice(data);
917            buf.extend_from_slice(b"\r\n");
918        }
919        Frame::StreamedStringHeader => {
920            buf.extend_from_slice(b"$?\r\n");
921        }
922        Frame::StreamedBlobErrorHeader => {
923            buf.extend_from_slice(b"!?\r\n");
924        }
925        Frame::StreamedVerbatimStringHeader => {
926            buf.extend_from_slice(b"=?\r\n");
927        }
928        Frame::StreamedArrayHeader => {
929            buf.extend_from_slice(b"*?\r\n");
930        }
931        Frame::StreamedSetHeader => {
932            buf.extend_from_slice(b"~?\r\n");
933        }
934        Frame::StreamedMapHeader => {
935            buf.extend_from_slice(b"%?\r\n");
936        }
937        Frame::StreamedAttributeHeader => {
938            buf.extend_from_slice(b"|?\r\n");
939        }
940        Frame::StreamedPushHeader => {
941            buf.extend_from_slice(b">?\r\n");
942        }
943        Frame::StreamedStringChunk(data) => {
944            buf.put_u8(b';');
945            let len = data.len().to_string();
946            buf.extend_from_slice(len.as_bytes());
947            buf.extend_from_slice(b"\r\n");
948            buf.extend_from_slice(data);
949            buf.extend_from_slice(b"\r\n");
950        }
951        Frame::StreamedString(chunks) => {
952            // Serialize as streaming string sequence: $?\r\n + chunks + terminator
953            buf.extend_from_slice(b"$?\r\n");
954            for chunk in chunks {
955                buf.put_u8(b';');
956                let len = chunk.len().to_string();
957                buf.extend_from_slice(len.as_bytes());
958                buf.extend_from_slice(b"\r\n");
959                buf.extend_from_slice(chunk);
960                buf.extend_from_slice(b"\r\n");
961            }
962            buf.extend_from_slice(b";0\r\n\r\n");
963        }
964        Frame::StreamedArray(items) => {
965            buf.extend_from_slice(b"*?\r\n");
966            for item in items {
967                serialize_frame(item, buf);
968            }
969            buf.extend_from_slice(b".\r\n");
970        }
971        Frame::StreamedSet(items) => {
972            buf.extend_from_slice(b"~?\r\n");
973            for item in items {
974                serialize_frame(item, buf);
975            }
976            buf.extend_from_slice(b".\r\n");
977        }
978        Frame::StreamedMap(pairs) => {
979            buf.extend_from_slice(b"%?\r\n");
980            for (key, value) in pairs {
981                serialize_frame(key, buf);
982                serialize_frame(value, buf);
983            }
984            buf.extend_from_slice(b".\r\n");
985        }
986        Frame::StreamedAttribute(pairs) => {
987            buf.extend_from_slice(b"|?\r\n");
988            for (key, value) in pairs {
989                serialize_frame(key, buf);
990                serialize_frame(value, buf);
991            }
992            buf.extend_from_slice(b".\r\n");
993        }
994        Frame::StreamedPush(items) => {
995            buf.extend_from_slice(b">?\r\n");
996            for item in items {
997                serialize_frame(item, buf);
998            }
999            buf.extend_from_slice(b".\r\n");
1000        }
1001        Frame::StreamTerminator => {
1002            buf.extend_from_slice(b".\r\n");
1003        }
1004        Frame::Null => {
1005            buf.extend_from_slice(b"_\r\n");
1006        }
1007        Frame::Double(d) => {
1008            buf.put_u8(b',');
1009            let s = d.to_string();
1010            buf.extend_from_slice(s.as_bytes());
1011            buf.extend_from_slice(b"\r\n");
1012        }
1013        Frame::SpecialFloat(f) => {
1014            buf.put_u8(b',');
1015            buf.extend_from_slice(f);
1016            buf.extend_from_slice(b"\r\n");
1017        }
1018        Frame::Boolean(b) => {
1019            buf.extend_from_slice(if *b { b"#t\r\n" } else { b"#f\r\n" });
1020        }
1021        Frame::BigNumber(n) => {
1022            buf.put_u8(b'(');
1023            buf.extend_from_slice(n);
1024            buf.extend_from_slice(b"\r\n");
1025        }
1026        Frame::VerbatimString(format, content) => {
1027            buf.put_u8(b'=');
1028            let total_len = format.len() + 1 + content.len(); // +1 for the colon
1029            let len = total_len.to_string();
1030            buf.extend_from_slice(len.as_bytes());
1031            buf.extend_from_slice(b"\r\n");
1032            buf.extend_from_slice(format);
1033            buf.put_u8(b':');
1034            buf.extend_from_slice(content);
1035            buf.extend_from_slice(b"\r\n");
1036        }
1037        Frame::Array(opt) => {
1038            buf.put_u8(b'*');
1039            match opt {
1040                Some(items) => {
1041                    let len = items.len().to_string();
1042                    buf.extend_from_slice(len.as_bytes());
1043                    buf.extend_from_slice(b"\r\n");
1044                    for item in items {
1045                        serialize_frame(item, buf);
1046                    }
1047                }
1048                None => {
1049                    buf.extend_from_slice(b"-1\r\n");
1050                }
1051            }
1052        }
1053        Frame::Set(items) => {
1054            buf.put_u8(b'~');
1055            let len = items.len().to_string();
1056            buf.extend_from_slice(len.as_bytes());
1057            buf.extend_from_slice(b"\r\n");
1058            for item in items {
1059                serialize_frame(item, buf);
1060            }
1061        }
1062        Frame::Map(pairs) => {
1063            buf.put_u8(b'%');
1064            let len = pairs.len().to_string();
1065            buf.extend_from_slice(len.as_bytes());
1066            buf.extend_from_slice(b"\r\n");
1067            for (key, value) in pairs {
1068                serialize_frame(key, buf);
1069                serialize_frame(value, buf);
1070            }
1071        }
1072        Frame::Attribute(pairs) => {
1073            buf.put_u8(b'|');
1074            let len = pairs.len().to_string();
1075            buf.extend_from_slice(len.as_bytes());
1076            buf.extend_from_slice(b"\r\n");
1077            for (key, value) in pairs {
1078                serialize_frame(key, buf);
1079                serialize_frame(value, buf);
1080            }
1081        }
1082        Frame::Push(items) => {
1083            buf.put_u8(b'>');
1084            let len = items.len().to_string();
1085            buf.extend_from_slice(len.as_bytes());
1086            buf.extend_from_slice(b"\r\n");
1087            for item in items {
1088                serialize_frame(item, buf);
1089            }
1090        }
1091    }
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096    use super::{Frame, ParseError, Parser, frame_to_bytes, parse_frame, parse_streaming_sequence};
1097    use bytes::Bytes;
1098
1099    #[test]
1100    fn test_parse_frame_simple_string() {
1101        let input = Bytes::from("+HELLO\r\nWORLD");
1102        let (frame, rest) = parse_frame(input.clone()).unwrap();
1103        assert_eq!(frame, Frame::SimpleString(Bytes::from("HELLO")));
1104        assert_eq!(rest, Bytes::from("WORLD"));
1105    }
1106
1107    #[test]
1108    fn test_parse_frame_blob_error() {
1109        let input = Bytes::from("!5\r\nERROR\r\nREST");
1110        let (frame, rest) = parse_frame(input.clone()).unwrap();
1111        assert_eq!(frame, Frame::BlobError(Bytes::from("ERROR")));
1112        assert_eq!(rest, Bytes::from("REST"));
1113    }
1114
1115    #[test]
1116    fn test_parse_frame_error() {
1117        let input = Bytes::from("-ERR fail\r\nLEFT");
1118        let (frame, rest) = parse_frame(input.clone()).unwrap();
1119        assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
1120        assert_eq!(rest, Bytes::from("LEFT"));
1121    }
1122
1123    #[test]
1124    fn test_parse_frame_integer() {
1125        let input = Bytes::from(":42\r\nTAIL");
1126        let (frame, rest) = parse_frame(input.clone()).unwrap();
1127        assert_eq!(frame, Frame::Integer(42));
1128        assert_eq!(rest, Bytes::from("TAIL"));
1129    }
1130
1131    #[test]
1132    fn test_parse_frame_bulk_string() {
1133        let input = Bytes::from("$3\r\nfoo\r\nREST");
1134        let (frame, rest) = parse_frame(input.clone()).unwrap();
1135        assert_eq!(frame, Frame::BulkString(Some(Bytes::from("foo"))));
1136        assert_eq!(rest, Bytes::from("REST"));
1137        let null_input = Bytes::from("$-1\r\nAFTER");
1138        let (frame, rest) = parse_frame(null_input.clone()).unwrap();
1139        assert_eq!(frame, Frame::BulkString(None));
1140        assert_eq!(rest, Bytes::from("AFTER"));
1141    }
1142
1143    #[test]
1144    fn test_parse_frame_null() {
1145        let input = Bytes::from("_\r\nLEFT");
1146        let (frame, rest) = parse_frame(input.clone()).unwrap();
1147        assert_eq!(frame, Frame::Null);
1148        assert_eq!(rest, Bytes::from("LEFT"));
1149    }
1150
1151    #[test]
1152    fn test_parse_frame_double_and_special_float() {
1153        let input = Bytes::from(",3.5\r\nNEXT");
1154        let (frame, rest) = parse_frame(input.clone()).unwrap();
1155        assert_eq!(frame, Frame::Double(3.5));
1156        assert_eq!(rest, Bytes::from("NEXT"));
1157        let input_inf = Bytes::from(",inf\r\nTAIL");
1158        let (frame, rest) = parse_frame(input_inf.clone()).unwrap();
1159        assert_eq!(frame, Frame::SpecialFloat(Bytes::from("inf")));
1160        assert_eq!(rest, Bytes::from("TAIL"));
1161    }
1162
1163    #[test]
1164    fn test_parse_frame_boolean() {
1165        let input_true = Bytes::from("#t\r\nXYZ");
1166        let (frame, rest) = parse_frame(input_true.clone()).unwrap();
1167        assert_eq!(frame, Frame::Boolean(true));
1168        assert_eq!(rest, Bytes::from("XYZ"));
1169        let input_false = Bytes::from("#f\r\nDONE");
1170        let (frame, rest) = parse_frame(input_false.clone()).unwrap();
1171        assert_eq!(frame, Frame::Boolean(false));
1172        assert_eq!(rest, Bytes::from("DONE"));
1173    }
1174
1175    #[test]
1176    fn test_parse_frame_big_number() {
1177        let input = Bytes::from("(123456789\r\nEND");
1178        let (frame, rest) = parse_frame(input.clone()).unwrap();
1179        assert_eq!(frame, Frame::BigNumber(Bytes::from("123456789")));
1180        assert_eq!(rest, Bytes::from("END"));
1181    }
1182
1183    #[test]
1184    fn test_parse_frame_verbatim_string() {
1185        let input = Bytes::from("=12\r\ntxt:hi there\r\nAFTER");
1186        let (frame, rest) = parse_frame(input.clone()).unwrap();
1187        assert_eq!(
1188            frame,
1189            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hi there")) // Frame::VerbatimString {
1190                                                                               //     format: "txt".to_string(),
1191                                                                               //     content: "hi there".to_string()
1192                                                                               // }
1193        );
1194        assert_eq!(rest, Bytes::from("AFTER"));
1195    }
1196
1197    #[test]
1198    fn test_parse_frame_array_set_push_map_attribute() {
1199        // Array of two bulk strings
1200        let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\nTAIL");
1201        let (frame, rest) = parse_frame(input.clone()).unwrap();
1202        assert_eq!(
1203            frame,
1204            Frame::Array(Some(vec![
1205                Frame::BulkString(Some(Bytes::from("foo"))),
1206                Frame::BulkString(Some(Bytes::from("bar")))
1207            ]))
1208        );
1209        assert_eq!(rest, Bytes::from("TAIL"));
1210        // Null array
1211        let input_null = Bytes::from("*-1\r\nEND");
1212        let (frame, rest) = parse_frame(input_null.clone()).unwrap();
1213        assert_eq!(frame, Frame::Array(None));
1214        assert_eq!(rest, Bytes::from("END"));
1215        // Set of two simple strings
1216        let input_set = Bytes::from("~2\r\n+foo\r\n+bar\r\nTAIL");
1217        let (frame, rest) = parse_frame(input_set.clone()).unwrap();
1218        assert_eq!(
1219            frame,
1220            Frame::Set(vec![
1221                Frame::SimpleString(Bytes::from("foo")),
1222                Frame::SimpleString(Bytes::from("bar")),
1223            ])
1224        );
1225        assert_eq!(rest, Bytes::from("TAIL"));
1226        // Map of two key-value pairs
1227        let input_map = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\nTRAIL");
1228        let (frame, rest) = parse_frame(input_map.clone()).unwrap();
1229        assert_eq!(
1230            frame,
1231            Frame::Map(vec![
1232                (
1233                    Frame::SimpleString(Bytes::from("key1")),
1234                    Frame::SimpleString(Bytes::from("val1"))
1235                ),
1236                (
1237                    Frame::SimpleString(Bytes::from("key2")),
1238                    Frame::SimpleString(Bytes::from("val2"))
1239                ),
1240            ])
1241        );
1242        assert_eq!(rest, Bytes::from("TRAIL"));
1243        // Attribute
1244        let input_attr = Bytes::from("|1\r\n+meta\r\n+data\r\nAFTER");
1245        let (frame, rest) = parse_frame(input_attr.clone()).unwrap();
1246        assert_eq!(
1247            frame,
1248            Frame::Attribute(vec![(
1249                Frame::SimpleString(Bytes::from("meta")),
1250                Frame::SimpleString(Bytes::from("data"))
1251            ),])
1252        );
1253        assert_eq!(rest, Bytes::from("AFTER"));
1254        // Push
1255        let input_push = Bytes::from(">2\r\n+type\r\n:1\r\nNEXT");
1256        let (frame, rest) = parse_frame(input_push.clone()).unwrap();
1257        assert_eq!(
1258            frame,
1259            Frame::Push(vec![
1260                Frame::SimpleString(Bytes::from("type")),
1261                Frame::Integer(1),
1262            ])
1263        );
1264        assert_eq!(rest, Bytes::from("NEXT"));
1265    }
1266
1267    #[test]
1268    fn test_parse_frame_empty_input() {
1269        assert!(parse_frame(Bytes::new()).is_err());
1270    }
1271
1272    #[test]
1273    fn test_parse_frame_invalid_tag() {
1274        let input = Bytes::from("X123\r\n");
1275        assert!(parse_frame(input).is_err());
1276    }
1277
1278    #[test]
1279    fn test_parse_frame_malformed_bulk_length() {
1280        let input = Bytes::from("$x\r\nfoo\r\n");
1281        assert!(parse_frame(input).is_err());
1282    }
1283
1284    #[test]
1285    fn test_parse_frame_zero_length_bulk() {
1286        let input = Bytes::from("$0\r\n\r\nTAIL");
1287        let (frame, rest) = parse_frame(input.clone()).unwrap();
1288        assert_eq!(frame, Frame::BulkString(Some(Bytes::from(""))));
1289        assert_eq!(rest, Bytes::from("TAIL"));
1290    }
1291
1292    #[test]
1293    fn test_parse_frame_zero_length_blob_error() {
1294        let input = Bytes::from("!0\r\n\r\nREST");
1295        let (frame, rest) = parse_frame(input.clone()).unwrap();
1296        assert_eq!(frame, Frame::BlobError(Bytes::new()));
1297        assert_eq!(rest, Bytes::from("REST"));
1298    }
1299
1300    #[test]
1301    fn test_parse_frame_missing_crlf() {
1302        let input = Bytes::from(":42\nTAIL");
1303        assert!(parse_frame(input).is_err());
1304    }
1305
1306    #[test]
1307    fn test_parse_frame_unicode_simple_string() {
1308        let input = Bytes::from("+こんにちは\r\nEND");
1309        let (frame, rest) = parse_frame(input.clone()).unwrap();
1310        assert_eq!(frame, Frame::SimpleString(Bytes::from("こんにちは")));
1311        assert_eq!(rest, Bytes::from("END"));
1312    }
1313
1314    #[test]
1315    fn test_parse_frame_chained_frames() {
1316        let combined = Bytes::from("+OK\r\n:1\r\nfoo");
1317        let (f1, rem) = parse_frame(combined.clone()).unwrap();
1318        assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
1319        let (f2, rem2) = parse_frame(rem).unwrap();
1320        assert_eq!(f2, Frame::Integer(1));
1321        assert_eq!(rem2, Bytes::from("foo"));
1322    }
1323
1324    #[test]
1325    fn test_parse_frame_empty_array() {
1326        let input = Bytes::from("*0\r\nTAIL");
1327        let (frame, rest) = parse_frame(input.clone()).unwrap();
1328        assert_eq!(frame, Frame::Array(Some(vec![])));
1329        assert_eq!(rest, Bytes::from("TAIL"));
1330    }
1331
1332    #[test]
1333    fn test_parse_frame_partial_array_data() {
1334        let input = Bytes::from("*2\r\n+OK\r\n");
1335        assert!(parse_frame(input).is_err());
1336    }
1337
1338    #[test]
1339    fn test_parse_frame_streamed_string() {
1340        let input = Bytes::from("$?\r\n$5\r\nhello\r\n$0\r\n\r\nREST");
1341        let (frame, rem) = parse_frame(input.clone()).unwrap();
1342        assert_eq!(frame, Frame::StreamedStringHeader);
1343        let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1344        assert_eq!(chunk, Frame::BulkString(Some(Bytes::from("hello"))));
1345        let (terminator, rest) = parse_frame(rem2.clone()).unwrap();
1346        assert_eq!(terminator, Frame::BulkString(Some(Bytes::from(""))));
1347        assert_eq!(rest, Bytes::from("REST"));
1348    }
1349
1350    #[test]
1351    fn test_parse_frame_streamed_blob_error() {
1352        let input = Bytes::from("!?\r\n!5\r\nERROR\r\nREST");
1353        let (frame, rem) = parse_frame(input.clone()).unwrap();
1354        assert_eq!(frame, Frame::StreamedBlobErrorHeader);
1355        let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1356        assert_eq!(chunk, Frame::BlobError(Bytes::from("ERROR")));
1357        assert_eq!(rem2, Bytes::from("REST"));
1358    }
1359
1360    #[test]
1361    fn test_parse_frame_streamed_verbatim_string() {
1362        let input = Bytes::from("=?\r\n=9\r\ntxt:hello\r\nTAIL");
1363        let (frame, rem) = parse_frame(input.clone()).unwrap();
1364        assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
1365        let (chunk, rest) = parse_frame(rem.clone()).unwrap();
1366        assert_eq!(
1367            chunk,
1368            Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello")) // Frame::VerbatimString {
1369                                                                            //     format: "txt".to_string(),
1370                                                                            //     content: "hello".to_string()
1371                                                                            // }
1372        );
1373        assert_eq!(rest, Bytes::from("TAIL"));
1374    }
1375
1376    #[test]
1377    fn test_parse_frame_streamed_array() {
1378        let input = Bytes::from("*?\r\n+one\r\n+two\r\n*0\r\nEND");
1379        let (header, rem) = parse_frame(input.clone()).unwrap();
1380        assert_eq!(header, Frame::StreamedArrayHeader);
1381        let (item1, rem2) = parse_frame(rem.clone()).unwrap();
1382        assert_eq!(item1, Frame::SimpleString(Bytes::from("one")));
1383        let (item2, rem3) = parse_frame(rem2.clone()).unwrap();
1384        assert_eq!(item2, Frame::SimpleString(Bytes::from("two")));
1385        let (terminator, rest) = parse_frame(rem3.clone()).unwrap();
1386        assert_eq!(terminator, Frame::Array(Some(vec![])));
1387        assert_eq!(rest, Bytes::from("END"));
1388    }
1389
1390    #[test]
1391    fn test_parse_frame_streamed_set_map_attr_push() {
1392        // Set
1393        let input_set = Bytes::from("~?\r\n+foo\r\n+bar\r\n~0\r\nTAIL");
1394        let (h_set, rem0) = parse_frame(input_set.clone()).unwrap();
1395        assert_eq!(h_set, Frame::StreamedSetHeader);
1396        let (s1, rem1) = parse_frame(rem0.clone()).unwrap();
1397        assert_eq!(s1, Frame::SimpleString(Bytes::from("foo")));
1398        let (s2, rem2) = parse_frame(rem1.clone()).unwrap();
1399        assert_eq!(s2, Frame::SimpleString(Bytes::from("bar")));
1400        let (term_set, rest_set) = parse_frame(rem2.clone()).unwrap();
1401        assert_eq!(term_set, Frame::Set(vec![]));
1402        assert_eq!(rest_set, Bytes::from("TAIL"));
1403        // Map
1404        let input_map = Bytes::from("%?\r\n+key\r\n+val\r\n%0\r\nNEXT");
1405        let (h_map, rem_map) = parse_frame(input_map.clone()).unwrap();
1406        assert_eq!(h_map, Frame::StreamedMapHeader);
1407        let (k, rem_map2) = parse_frame(rem_map.clone()).unwrap();
1408        assert_eq!(k, Frame::SimpleString(Bytes::from("key")));
1409        let (v, rem_map3) = parse_frame(rem_map2.clone()).unwrap();
1410        assert_eq!(v, Frame::SimpleString(Bytes::from("val")));
1411        let (term_map, rest_map4) = parse_frame(rem_map3.clone()).unwrap();
1412        assert_eq!(term_map, Frame::Map(vec![]));
1413        assert_eq!(rest_map4, Bytes::from("NEXT"));
1414        // Attribute
1415        let input_attr = Bytes::from("|?\r\n+meta\r\n+info\r\n|0\r\nMORE");
1416        let (h_attr, rem_attr) = parse_frame(input_attr.clone()).unwrap();
1417        assert_eq!(h_attr, Frame::StreamedAttributeHeader);
1418        let (a1, rem_attr2) = parse_frame(rem_attr.clone()).unwrap();
1419        assert_eq!(a1, Frame::SimpleString(Bytes::from("meta")));
1420        let (a2, rem_attr3) = parse_frame(rem_attr2.clone()).unwrap();
1421        assert_eq!(a2, Frame::SimpleString(Bytes::from("info")));
1422        let (term_attr, rest_attr) = parse_frame(rem_attr3.clone()).unwrap();
1423        assert_eq!(term_attr, Frame::Attribute(vec![]));
1424        assert_eq!(rest_attr, Bytes::from("MORE"));
1425        // Push
1426        let input_push = Bytes::from(">?\r\n:1\r\n:2\r\n>0\r\nEND");
1427        let (h_push, rem_push) = parse_frame(input_push.clone()).unwrap();
1428        assert_eq!(h_push, Frame::StreamedPushHeader);
1429        let (p1, rem_push2) = parse_frame(rem_push.clone()).unwrap();
1430        assert_eq!(p1, Frame::Integer(1));
1431        let (p2, rem_push3) = parse_frame(rem_push2.clone()).unwrap();
1432        assert_eq!(p2, Frame::Integer(2));
1433        let (term_push, rest_push) = parse_frame(rem_push3.clone()).unwrap();
1434        assert_eq!(term_push, Frame::Push(vec![]));
1435        assert_eq!(rest_push, Bytes::from("END"));
1436    }
1437
1438    #[test]
1439    fn test_parse_frame_stream_terminator() {
1440        let input = Bytes::from(".\r\nREST");
1441        let (frame, rest) = parse_frame(input.clone()).unwrap();
1442        assert_eq!(frame, Frame::StreamTerminator);
1443        assert_eq!(rest, Bytes::from("REST"));
1444    }
1445
1446    #[test]
1447    fn test_parse_frame_null_bulk_and_error() {
1448        let input1 = Bytes::from("!-1\r\nTAIL");
1449        let (f1, r1) = parse_frame(input1.clone()).unwrap();
1450        assert_eq!(f1, Frame::BlobError(Bytes::new()));
1451        assert_eq!(r1, Bytes::from("TAIL"));
1452        let input2 = Bytes::from("=-1\r\nTAIL");
1453        let (f2, r2) = parse_frame(input2.clone()).unwrap();
1454        assert_eq!(f2, Frame::VerbatimString(Bytes::new(), Bytes::new()));
1455        assert_eq!(r2, Bytes::from("TAIL"));
1456    }
1457
1458    #[test]
1459    fn test_parse_frame_special_float_nan() {
1460        let input = Bytes::from(",nan\r\nTAIL");
1461        let (frame, rest) = parse_frame(input.clone()).unwrap();
1462        assert_eq!(frame, Frame::SpecialFloat(Bytes::from("nan")));
1463        assert_eq!(rest, Bytes::from("TAIL"));
1464    }
1465
1466    #[test]
1467    fn test_parse_frame_big_number_zero() {
1468        let input = Bytes::from("(0\r\nEND");
1469        let (frame, rest) = parse_frame(input.clone()).unwrap();
1470        assert_eq!(frame, Frame::BigNumber(Bytes::from("0")));
1471        assert_eq!(rest, Bytes::from("END"));
1472    }
1473
1474    #[test]
1475    fn test_parse_frame_collection_empty() {
1476        let input_push = Bytes::from(">0\r\nTAIL");
1477        let (f_push, r_push) = parse_frame(input_push.clone()).unwrap();
1478        assert_eq!(f_push, Frame::Push(vec![]));
1479        assert_eq!(r_push, Bytes::from("TAIL"));
1480        let input_attr = Bytes::from("|0\r\nAFTER");
1481        let (f_attr, r_attr) = parse_frame(input_attr.clone()).unwrap();
1482        assert_eq!(f_attr, Frame::Attribute(vec![]));
1483        assert_eq!(r_attr, Bytes::from("AFTER"));
1484        let input_map = Bytes::from("%0\r\nEND");
1485        let (f_map, r_map) = parse_frame(input_map.clone()).unwrap();
1486        assert_eq!(f_map, Frame::Map(vec![]));
1487        assert_eq!(r_map, Bytes::from("END"));
1488        let input_set = Bytes::from("~0\r\nDONE");
1489        let (f_set, r_set) = parse_frame(input_set.clone()).unwrap();
1490        assert_eq!(f_set, Frame::Set(vec![]));
1491        assert_eq!(r_set, Bytes::from("DONE"));
1492        let input_arr = Bytes::from("*-1\r\nFIN");
1493        let (f_arr, r_arr) = parse_frame(input_arr.clone()).unwrap();
1494        assert_eq!(f_arr, Frame::Array(None));
1495        assert_eq!(r_arr, Bytes::from("FIN"));
1496    }
1497
1498    // Round-trip tests for serialization and parsing
1499
1500    #[test]
1501    fn test_roundtrip_simple_string() {
1502        let original = Bytes::from("+hello\r\n");
1503        let (frame, _) = parse_frame(original.clone()).unwrap();
1504        let serialized = frame_to_bytes(&frame);
1505        assert_eq!(original, serialized);
1506
1507        let (reparsed, _) = parse_frame(serialized).unwrap();
1508        assert_eq!(frame, reparsed);
1509    }
1510
1511    #[test]
1512    fn test_roundtrip_error() {
1513        let original = Bytes::from("-ERR error message\r\n");
1514        let (frame, _) = parse_frame(original.clone()).unwrap();
1515        let serialized = frame_to_bytes(&frame);
1516        assert_eq!(original, serialized);
1517
1518        let (reparsed, _) = parse_frame(serialized).unwrap();
1519        assert_eq!(frame, reparsed);
1520    }
1521
1522    #[test]
1523    fn test_roundtrip_integer() {
1524        let original = Bytes::from(":12345\r\n");
1525        let (frame, _) = parse_frame(original.clone()).unwrap();
1526        let serialized = frame_to_bytes(&frame);
1527        assert_eq!(original, serialized);
1528
1529        let (reparsed, _) = parse_frame(serialized).unwrap();
1530        assert_eq!(frame, reparsed);
1531    }
1532
1533    #[test]
1534    fn test_roundtrip_bulk_string() {
1535        let original = Bytes::from("$5\r\nhello\r\n");
1536        let (frame, _) = parse_frame(original.clone()).unwrap();
1537        let serialized = frame_to_bytes(&frame);
1538        assert_eq!(original, serialized);
1539
1540        let (reparsed, _) = parse_frame(serialized).unwrap();
1541        assert_eq!(frame, reparsed);
1542
1543        // Test null bulk string
1544        let original_null = Bytes::from("$-1\r\n");
1545        let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1546        let serialized_null = frame_to_bytes(&frame_null);
1547        assert_eq!(original_null, serialized_null);
1548
1549        let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1550        assert_eq!(frame_null, reparsed_null);
1551    }
1552
1553    #[test]
1554    fn test_roundtrip_blob_error() {
1555        let original = Bytes::from("!5\r\nerror\r\n");
1556        let (frame, _) = parse_frame(original.clone()).unwrap();
1557        let serialized = frame_to_bytes(&frame);
1558        assert_eq!(original, serialized);
1559
1560        let (reparsed, _) = parse_frame(serialized).unwrap();
1561        assert_eq!(frame, reparsed);
1562    }
1563
1564    #[test]
1565    fn test_roundtrip_null() {
1566        let original = Bytes::from("_\r\n");
1567        let (frame, _) = parse_frame(original.clone()).unwrap();
1568        let serialized = frame_to_bytes(&frame);
1569        assert_eq!(original, serialized);
1570
1571        let (reparsed, _) = parse_frame(serialized).unwrap();
1572        assert_eq!(frame, reparsed);
1573    }
1574
1575    #[test]
1576    fn test_roundtrip_double() {
1577        let original = Bytes::from(",3.14159\r\n");
1578        let (frame, _) = parse_frame(original.clone()).unwrap();
1579        let serialized = frame_to_bytes(&frame);
1580
1581        // Note: The exact string representation of floating point numbers might differ
1582        // between parsing and serializing due to formatting differences
1583        let (reparsed, _) = parse_frame(serialized).unwrap();
1584        assert_eq!(frame, reparsed);
1585    }
1586
1587    #[test]
1588    fn test_roundtrip_special_float() {
1589        let original = Bytes::from(",inf\r\n");
1590        let (frame, _) = parse_frame(original.clone()).unwrap();
1591        let serialized = frame_to_bytes(&frame);
1592        assert_eq!(original, serialized);
1593
1594        let (reparsed, _) = parse_frame(serialized).unwrap();
1595        assert_eq!(frame, reparsed);
1596    }
1597
1598    #[test]
1599    fn test_roundtrip_boolean() {
1600        let original_true = Bytes::from("#t\r\n");
1601        let (frame_true, _) = parse_frame(original_true.clone()).unwrap();
1602        let serialized_true = frame_to_bytes(&frame_true);
1603        assert_eq!(original_true, serialized_true);
1604
1605        let (reparsed_true, _) = parse_frame(serialized_true).unwrap();
1606        assert_eq!(frame_true, reparsed_true);
1607
1608        let original_false = Bytes::from("#f\r\n");
1609        let (frame_false, _) = parse_frame(original_false.clone()).unwrap();
1610        let serialized_false = frame_to_bytes(&frame_false);
1611        assert_eq!(original_false, serialized_false);
1612
1613        let (reparsed_false, _) = parse_frame(serialized_false).unwrap();
1614        assert_eq!(frame_false, reparsed_false);
1615    }
1616
1617    #[test]
1618    fn test_roundtrip_big_number() {
1619        let original = Bytes::from("(12345678901234567890\r\n");
1620        let (frame, _) = parse_frame(original.clone()).unwrap();
1621        let serialized = frame_to_bytes(&frame);
1622        assert_eq!(original, serialized);
1623
1624        let (reparsed, _) = parse_frame(serialized).unwrap();
1625        assert_eq!(frame, reparsed);
1626    }
1627
1628    #[test]
1629    fn test_roundtrip_verbatim_string() {
1630        let original = Bytes::from("=10\r\ntxt:hello!\r\n");
1631        let (frame, _) = parse_frame(original.clone()).unwrap();
1632        let serialized = frame_to_bytes(&frame);
1633        assert_eq!(original, serialized);
1634
1635        let (reparsed, _) = parse_frame(serialized).unwrap();
1636        assert_eq!(frame, reparsed);
1637    }
1638
1639    #[test]
1640    fn test_roundtrip_array() {
1641        let original = Bytes::from("*2\r\n+hello\r\n:123\r\n");
1642        let (frame, _) = parse_frame(original.clone()).unwrap();
1643        let serialized = frame_to_bytes(&frame);
1644        assert_eq!(original, serialized);
1645
1646        let (reparsed, _) = parse_frame(serialized).unwrap();
1647        assert_eq!(frame, reparsed);
1648
1649        // Test null array
1650        let original_null = Bytes::from("*-1\r\n");
1651        let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1652        let serialized_null = frame_to_bytes(&frame_null);
1653        assert_eq!(original_null, serialized_null);
1654
1655        let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1656        assert_eq!(frame_null, reparsed_null);
1657    }
1658
1659    #[test]
1660    fn test_roundtrip_set() {
1661        let original = Bytes::from("~2\r\n+one\r\n+two\r\n");
1662        let (frame, _) = parse_frame(original.clone()).unwrap();
1663        let serialized = frame_to_bytes(&frame);
1664        assert_eq!(original, serialized);
1665
1666        let (reparsed, _) = parse_frame(serialized).unwrap();
1667        assert_eq!(frame, reparsed);
1668    }
1669
1670    #[test]
1671    fn test_roundtrip_map() {
1672        let original = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\n");
1673        let (frame, _) = parse_frame(original.clone()).unwrap();
1674        let serialized = frame_to_bytes(&frame);
1675        assert_eq!(original, serialized);
1676
1677        let (reparsed, _) = parse_frame(serialized).unwrap();
1678        assert_eq!(frame, reparsed);
1679    }
1680
1681    #[test]
1682    fn test_roundtrip_attribute() {
1683        let original = Bytes::from("|1\r\n+key\r\n+val\r\n");
1684        let (frame, _) = parse_frame(original.clone()).unwrap();
1685        let serialized = frame_to_bytes(&frame);
1686        assert_eq!(original, serialized);
1687
1688        let (reparsed, _) = parse_frame(serialized).unwrap();
1689        assert_eq!(frame, reparsed);
1690    }
1691
1692    #[test]
1693    fn test_roundtrip_push() {
1694        let original = Bytes::from(">2\r\n+msg\r\n+data\r\n");
1695        let (frame, _) = parse_frame(original.clone()).unwrap();
1696        let serialized = frame_to_bytes(&frame);
1697        assert_eq!(original, serialized);
1698
1699        let (reparsed, _) = parse_frame(serialized).unwrap();
1700        assert_eq!(frame, reparsed);
1701    }
1702
1703    #[test]
1704    fn test_roundtrip_streaming_headers() {
1705        let headers = [
1706            ("$?\r\n", Frame::StreamedStringHeader),
1707            ("!?\r\n", Frame::StreamedBlobErrorHeader),
1708            ("=?\r\n", Frame::StreamedVerbatimStringHeader),
1709            ("*?\r\n", Frame::StreamedArrayHeader),
1710            ("~?\r\n", Frame::StreamedSetHeader),
1711            ("%?\r\n", Frame::StreamedMapHeader),
1712            ("|?\r\n", Frame::StreamedAttributeHeader),
1713            (">?\r\n", Frame::StreamedPushHeader),
1714            (".\r\n", Frame::StreamTerminator),
1715        ];
1716
1717        for (original_str, expected_frame) in headers {
1718            let original = Bytes::from(original_str);
1719            let (frame, _) = parse_frame(original.clone()).unwrap();
1720            assert_eq!(frame, expected_frame);
1721
1722            let serialized = frame_to_bytes(&frame);
1723            assert_eq!(original, serialized);
1724
1725            let (reparsed, _) = parse_frame(serialized).unwrap();
1726            assert_eq!(frame, reparsed);
1727        }
1728    }
1729
1730    #[test]
1731    fn test_roundtrip_streaming_chunks() {
1732        let chunks = [
1733            (
1734                ";4\r\nHell\r\n",
1735                Frame::StreamedStringChunk(Bytes::from("Hell")),
1736            ),
1737            (
1738                ";5\r\no wor\r\n",
1739                Frame::StreamedStringChunk(Bytes::from("o wor")),
1740            ),
1741            (";1\r\nd\r\n", Frame::StreamedStringChunk(Bytes::from("d"))),
1742            (";0\r\n\r\n", Frame::StreamedStringChunk(Bytes::new())),
1743            (
1744                ";11\r\nHello World\r\n",
1745                Frame::StreamedStringChunk(Bytes::from("Hello World")),
1746            ),
1747        ];
1748
1749        for (original_str, expected_frame) in chunks {
1750            let original = Bytes::from(original_str);
1751            let (frame, rest) = parse_frame(original.clone()).unwrap();
1752            assert_eq!(frame, expected_frame);
1753            assert!(rest.is_empty());
1754
1755            let serialized = frame_to_bytes(&frame);
1756            assert_eq!(original, serialized);
1757
1758            let (reparsed, _) = parse_frame(serialized).unwrap();
1759            assert_eq!(frame, reparsed);
1760        }
1761    }
1762
1763    #[test]
1764    fn test_streaming_chunks_edge_cases() {
1765        // Test incomplete chunk (missing data)
1766        let data = Bytes::from(";4\r\nHel");
1767        let result = parse_frame(data);
1768        assert!(matches!(result, Err(ParseError::Incomplete)));
1769
1770        // Test incomplete chunk (missing CRLF)
1771        let data = Bytes::from(";4\r\nHell");
1772        let result = parse_frame(data);
1773        assert!(matches!(result, Err(ParseError::Incomplete)));
1774
1775        // Test invalid length format
1776        let data = Bytes::from(";abc\r\ndata\r\n");
1777        let result = parse_frame(data);
1778        assert!(matches!(result, Err(ParseError::BadLength)));
1779
1780        // Test negative length
1781        let data = Bytes::from(";-1\r\ndata\r\n");
1782        let result = parse_frame(data);
1783        assert!(matches!(result, Err(ParseError::BadLength)));
1784
1785        // Test length mismatch (length says 5 but only 4 bytes)
1786        let data = Bytes::from(";5\r\nHell\r\n");
1787        let result = parse_frame(data);
1788        assert!(matches!(result, Err(ParseError::Incomplete)));
1789
1790        // Test zero-length chunk without trailing CRLF returns Incomplete
1791        let data = Bytes::from(";0\r\n");
1792        let result = parse_frame(data);
1793        assert!(matches!(result, Err(ParseError::Incomplete)));
1794
1795        // Test binary data in chunk
1796        let binary_data = b"\x00\x01\x02\x03\xFF";
1797        let mut chunk_data = Vec::new();
1798        chunk_data.extend_from_slice(b";5\r\n");
1799        chunk_data.extend_from_slice(binary_data);
1800        chunk_data.extend_from_slice(b"\r\n");
1801        let data = Bytes::from(chunk_data);
1802        let result = parse_frame(data);
1803        assert!(result.is_ok());
1804        let (frame, _) = result.unwrap();
1805        if let Frame::StreamedStringChunk(chunk) = frame {
1806            assert_eq!(chunk.as_ref(), binary_data);
1807        }
1808    }
1809
1810    #[test]
1811    fn test_roundtrip_streaming_sequences() {
1812        // Test streaming string roundtrip
1813        let streaming_string = Frame::StreamedString(vec![
1814            Bytes::from("Hell"),
1815            Bytes::from("o wor"),
1816            Bytes::from("ld"),
1817        ]);
1818        let serialized = frame_to_bytes(&streaming_string);
1819        let expected = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;2\r\nld\r\n;0\r\n\r\n";
1820        assert_eq!(serialized, Bytes::from(expected));
1821
1822        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1823        assert_eq!(parsed, streaming_string);
1824
1825        // Test streaming array roundtrip
1826        let streaming_array = Frame::StreamedArray(vec![
1827            Frame::SimpleString(Bytes::from("hello")),
1828            Frame::Integer(42),
1829            Frame::Boolean(true),
1830        ]);
1831        let serialized = frame_to_bytes(&streaming_array);
1832        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1833        assert_eq!(parsed, streaming_array);
1834
1835        // Test streaming map roundtrip
1836        let streaming_map = Frame::StreamedMap(vec![
1837            (
1838                Frame::SimpleString(Bytes::from("key1")),
1839                Frame::SimpleString(Bytes::from("val1")),
1840            ),
1841            (
1842                Frame::SimpleString(Bytes::from("key2")),
1843                Frame::Integer(123),
1844            ),
1845        ]);
1846        let serialized = frame_to_bytes(&streaming_map);
1847        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1848        assert_eq!(parsed, streaming_map);
1849
1850        // Test empty streaming string
1851        let empty_streaming = Frame::StreamedString(vec![]);
1852        let serialized = frame_to_bytes(&empty_streaming);
1853        let expected = "$?\r\n;0\r\n\r\n";
1854        assert_eq!(serialized, Bytes::from(expected));
1855        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1856        assert_eq!(parsed, empty_streaming);
1857
1858        // Test streaming set roundtrip
1859        let streaming_set = Frame::StreamedSet(vec![
1860            Frame::SimpleString(Bytes::from("apple")),
1861            Frame::SimpleString(Bytes::from("banana")),
1862            Frame::Integer(42),
1863        ]);
1864        let serialized = frame_to_bytes(&streaming_set);
1865        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1866        assert_eq!(parsed, streaming_set);
1867
1868        // Test streaming attribute roundtrip
1869        let streaming_attribute = Frame::StreamedAttribute(vec![
1870            (
1871                Frame::SimpleString(Bytes::from("trace-id")),
1872                Frame::SimpleString(Bytes::from("abc123")),
1873            ),
1874            (
1875                Frame::SimpleString(Bytes::from("span-id")),
1876                Frame::SimpleString(Bytes::from("def456")),
1877            ),
1878        ]);
1879        let serialized = frame_to_bytes(&streaming_attribute);
1880        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1881        assert_eq!(parsed, streaming_attribute);
1882
1883        // Test streaming push roundtrip
1884        let streaming_push = Frame::StreamedPush(vec![
1885            Frame::SimpleString(Bytes::from("pubsub")),
1886            Frame::SimpleString(Bytes::from("channel1")),
1887            Frame::SimpleString(Bytes::from("message data")),
1888        ]);
1889        let serialized = frame_to_bytes(&streaming_push);
1890        let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1891        assert_eq!(parsed, streaming_push);
1892
1893        // Test empty streaming containers
1894        let empty_array = Frame::StreamedArray(vec![]);
1895        let serialized = frame_to_bytes(&empty_array);
1896        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1897        assert_eq!(parsed, empty_array);
1898
1899        let empty_set = Frame::StreamedSet(vec![]);
1900        let serialized = frame_to_bytes(&empty_set);
1901        let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1902        assert_eq!(parsed, empty_set);
1903    }
1904
1905    #[test]
1906    fn test_streaming_sequences_edge_cases() {
1907        // Test incomplete streaming string (missing zero-length terminator)
1908        let data = Bytes::from("$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n");
1909        let result = parse_streaming_sequence(data);
1910        assert!(matches!(result, Err(ParseError::Incomplete)));
1911
1912        // Test malformed chunk in streaming sequence
1913        let data = Bytes::from("$?\r\n;abc\r\nHell\r\n;0\r\n");
1914        let result = parse_streaming_sequence(data);
1915        assert!(matches!(result, Err(ParseError::BadLength)));
1916
1917        // Test streaming array with incomplete terminator
1918        let data = Bytes::from("*?\r\n+hello\r\n:42\r\n");
1919        let result = parse_streaming_sequence(data);
1920        assert!(matches!(result, Err(ParseError::Incomplete)));
1921
1922        // Test mixed streaming and non-streaming content
1923        let data = Bytes::from("*?\r\n+hello\r\n*2\r\n:1\r\n:2\r\n.\r\n");
1924        let result = parse_streaming_sequence(data);
1925        assert!(result.is_ok());
1926        let (frame, _) = result.unwrap();
1927        if let Frame::StreamedArray(items) = frame {
1928            assert_eq!(items.len(), 2);
1929            assert!(matches!(items[0], Frame::SimpleString(_)));
1930            assert!(matches!(items[1], Frame::Array(_)));
1931        }
1932
1933        // Test empty streaming containers
1934        let data = Bytes::from("*?\r\n.\r\n");
1935        let result = parse_streaming_sequence(data);
1936        assert!(result.is_ok());
1937        let (frame, _) = result.unwrap();
1938        if let Frame::StreamedArray(items) = frame {
1939            assert!(items.is_empty());
1940        }
1941
1942        // Test streaming map with odd number of elements (should work)
1943        let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+orphan\r\n.\r\n");
1944        let result = parse_streaming_sequence(data);
1945        assert!(matches!(result, Err(ParseError::Incomplete)));
1946
1947        // Test non-streaming frame passed to parse_streaming_sequence
1948        let data = Bytes::from("+simple\r\n");
1949        let result = parse_streaming_sequence(data);
1950        assert!(result.is_ok());
1951        let (frame, _) = result.unwrap();
1952        assert!(matches!(frame, Frame::SimpleString(_)));
1953
1954        // Test extremely large chunk size (should fail gracefully)
1955        let data = Bytes::from(";999999999999999999\r\ndata\r\n");
1956        let result = parse_frame(data);
1957        // The parsing might succeed but fail later when trying to read the data
1958        // Let's check what actually happens and accept either BadLength or Incomplete
1959        match &result {
1960            Err(ParseError::BadLength) => {}  // Expected
1961            Err(ParseError::Incomplete) => {} // Also acceptable - might not have enough data for huge chunk
1962            Err(e) => panic!("Got unexpected error type: {e:?}"),
1963            Ok(_) => panic!("Large chunk size should fail"),
1964        }
1965
1966        // Test streaming string with non-chunk frame mixed in
1967        let data = Bytes::from("$?\r\n+invalid\r\n;0\r\n");
1968        let result = parse_streaming_sequence(data);
1969        assert!(matches!(result, Err(ParseError::InvalidFormat)));
1970
1971        // Test streaming sequence with corrupted terminator
1972        let data = Bytes::from("*?\r\n+hello\r\n.corrupted\r\n");
1973        let result = parse_streaming_sequence(data);
1974        assert!(matches!(result, Err(ParseError::Incomplete)));
1975
1976        // Test empty input to parse_streaming_sequence
1977        let data = Bytes::new();
1978        let result = parse_streaming_sequence(data);
1979        assert!(matches!(result, Err(ParseError::Incomplete)));
1980
1981        // Test streaming sequence with partial frame at end
1982        let data = Bytes::from("*?\r\n+hello\r\n$5\r\nwo");
1983        let result = parse_streaming_sequence(data);
1984        assert!(matches!(result, Err(ParseError::Incomplete)));
1985    }
1986
1987    #[test]
1988    fn test_roundtrip_nested_structures() {
1989        // Test a complex nested structure
1990        let original = Bytes::from(
1991            "*3\r\n+hello\r\n%2\r\n+key1\r\n:123\r\n+key2\r\n~1\r\n+item\r\n|1\r\n+meta\r\n+data\r\n",
1992        );
1993        let (frame, _) = parse_frame(original.clone()).unwrap();
1994        let serialized = frame_to_bytes(&frame);
1995
1996        let (reparsed, _) = parse_frame(serialized).unwrap();
1997        assert_eq!(frame, reparsed);
1998    }
1999
2000    #[test]
2001    fn test_zero_length_bulk_string_requires_trailing_crlf() {
2002        // Complete: $0\r\n\r\n
2003        let input = Bytes::from("$0\r\n\r\nTAIL");
2004        let (frame, rest) = parse_frame(input).unwrap();
2005        assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
2006        assert_eq!(rest, Bytes::from("TAIL"));
2007
2008        // Incomplete: $0\r\n with no trailing data
2009        let input = Bytes::from("$0\r\n");
2010        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2011
2012        // Incomplete: $0\r\n with only one byte
2013        let input = Bytes::from("$0\r\n\r");
2014        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2015
2016        // Invalid: $0\r\n followed by non-CRLF
2017        let input = Bytes::from("$0\r\nXY");
2018        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2019    }
2020
2021    #[test]
2022    fn test_zero_length_streamed_chunk_requires_trailing_crlf() {
2023        // Complete: ;0\r\n\r\n
2024        let input = Bytes::from(";0\r\n\r\nTAIL");
2025        let (frame, rest) = parse_frame(input).unwrap();
2026        assert_eq!(frame, Frame::StreamedStringChunk(Bytes::new()));
2027        assert_eq!(rest, Bytes::from("TAIL"));
2028
2029        // Incomplete: ;0\r\n with no trailing data
2030        let input = Bytes::from(";0\r\n");
2031        assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2032
2033        // Invalid: ;0\r\n followed by non-CRLF
2034        let input = Bytes::from(";0\r\nXY");
2035        assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2036    }
2037
2038    #[test]
2039    fn test_integer_overflow_returns_overflow_error() {
2040        // One past i64::MAX
2041        let input = Bytes::from(":9223372036854775808\r\n");
2042        assert_eq!(parse_frame(input), Err(ParseError::Overflow));
2043
2044        // i64::MAX should succeed
2045        let input = Bytes::from(":9223372036854775807\r\n");
2046        let (frame, _) = parse_frame(input).unwrap();
2047        assert_eq!(frame, Frame::Integer(i64::MAX));
2048
2049        // i64::MIN should succeed
2050        let input = Bytes::from(":-9223372036854775808\r\n");
2051        let (frame, _) = parse_frame(input).unwrap();
2052        assert_eq!(frame, Frame::Integer(i64::MIN));
2053    }
2054
2055    #[test]
2056    fn test_parser_propagates_errors() {
2057        let mut parser = Parser::new();
2058        parser.feed(Bytes::from("XINVALID\r\n"));
2059        let result = parser.next_frame();
2060        assert!(result.is_err());
2061        assert_eq!(result.unwrap_err(), ParseError::InvalidTag(b'X'));
2062    }
2063
2064    #[test]
2065    fn test_parser_returns_ok_none_for_incomplete() {
2066        let mut parser = Parser::new();
2067        parser.feed(Bytes::from("+HELL"));
2068        assert_eq!(parser.next_frame().unwrap(), None);
2069    }
2070}