Skip to main content

parse_streaming_sequence

Function parse_streaming_sequence 

Source
pub fn parse_streaming_sequence(
    input: Bytes,
) -> Result<(Frame, Bytes), ParseError>
Expand description

Parse a complete RESP3 streaming sequence, accumulating chunks until termination.

This function handles RESP3 streaming sequences that begin with streaming headers ($?, *?, ~?, %?, |?, >?) and accumulates the following data until the appropriate terminator is encountered.

§Streaming Types Supported

  • Streaming Strings: $?\r\n followed by chunks terminated with ;0\r\n
  • Streaming Arrays: *?\r\n followed by frames terminated with .\r\n
  • Streaming Sets: ~?\r\n followed by frames terminated with .\r\n
  • Streaming Maps: %?\r\n followed by key-value pairs terminated with .\r\n
  • Streaming Attributes: |?\r\n followed by key-value pairs terminated with .\r\n
  • Streaming Push: >?\r\n followed by frames terminated with .\r\n

§Examples

§Streaming String

use resp_rs::resp3::{parse_streaming_sequence, Frame};
use bytes::Bytes;

let data = Bytes::from("$?\r\n;4\r\nHell\r\n;6\r\no worl\r\n;1\r\nd\r\n;0\r\n\r\n");
let (frame, rest) = parse_streaming_sequence(data).unwrap();

if let Frame::StreamedString(chunks) = frame {
    assert_eq!(chunks.len(), 3);
    let full_string: String = chunks.iter()
        .map(|chunk| core::str::from_utf8(chunk).unwrap())
        .collect::<Vec<_>>()
        .join("");
    assert_eq!(full_string, "Hello world");
}
assert!(rest.is_empty());

§Streaming Array

use resp_rs::resp3::{parse_streaming_sequence, Frame};
use bytes::Bytes;

let data = Bytes::from("*?\r\n+hello\r\n:42\r\n#t\r\n.\r\n");
let (frame, _) = parse_streaming_sequence(data).unwrap();

if let Frame::StreamedArray(items) = frame {
    assert_eq!(items.len(), 3);
    // items[0] = SimpleString("hello")
    // items[1] = Integer(42)
    // items[2] = Boolean(true)
}

§Streaming Map

use resp_rs::resp3::{parse_streaming_sequence, Frame};
use bytes::Bytes;

let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+key2\r\n:123\r\n.\r\n");
let (frame, _) = parse_streaming_sequence(data).unwrap();

if let Frame::StreamedMap(pairs) = frame {
    assert_eq!(pairs.len(), 2);
    // pairs[0] = (SimpleString("key1"), SimpleString("val1"))
    // pairs[1] = (SimpleString("key2"), Integer(123))
}

§Errors

Returns ParseError::Incomplete if the stream is not complete or if required terminators are missing. Returns ParseError::InvalidFormat for malformed chunk data or unexpected frame types within streaming sequences.

§Notes

  • For non-streaming frames, this function simply returns the parsed frame
  • Streaming string chunks are accumulated in order
  • All other streaming types accumulate complete frames until termination
  • Zero-copy parsing is used where possible to minimize allocations