1use bytes::{BufMut, Bytes, BytesMut};
29
30const MAX_COLLECTION_SIZE: usize = 10_000_000;
32
33const MAX_BULK_STRING_SIZE: usize = 512 * 1024 * 1024;
35
36#[derive(Default, Debug)]
41pub struct Parser {
42 buffer: BytesMut,
43}
44
45impl Parser {
46 pub fn new() -> Self {
48 Self {
49 buffer: BytesMut::new(),
50 }
51 }
52
53 pub fn feed(&mut self, data: Bytes) {
57 self.buffer.extend_from_slice(&data);
58 }
59
60 pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
66 if self.buffer.is_empty() {
67 return Ok(None);
68 }
69
70 let bytes = self.buffer.split().freeze();
71
72 match parse_frame_inner(&bytes, 0) {
73 Ok((frame, consumed)) => {
74 if consumed < bytes.len() {
75 self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
76 }
77 Ok(Some(frame))
78 }
79 Err(ParseError::Incomplete) => {
80 self.buffer.unsplit(bytes.into());
81 Ok(None)
82 }
83 Err(e) => {
84 Err(e)
87 }
88 }
89 }
90
91 pub fn buffered_bytes(&self) -> usize {
93 self.buffer.len()
94 }
95
96 pub fn clear(&mut self) {
98 self.buffer.clear();
99 }
100}
101
102#[derive(Debug, Clone, PartialEq)]
108pub enum Frame {
109 SimpleString(Bytes),
111 Error(Bytes),
113 Integer(i64),
115 BulkString(Option<Bytes>),
118 BlobError(Bytes),
120 StreamedStringHeader,
122 StreamedBlobErrorHeader,
124 StreamedVerbatimStringHeader,
126 StreamedArrayHeader,
128 StreamedSetHeader,
130 StreamedMapHeader,
132 StreamedAttributeHeader,
134 StreamedPushHeader,
136 StreamedStringChunk(Bytes),
152
153 StreamedString(Vec<Bytes>),
171
172 StreamedArray(Vec<Frame>),
190
191 StreamedSet(Vec<Frame>),
196
197 StreamedMap(Vec<(Frame, Frame)>),
213
214 StreamedAttribute(Vec<(Frame, Frame)>),
219
220 StreamedPush(Vec<Frame>),
238 StreamTerminator,
240 Null,
242 Double(f64),
244 SpecialFloat(Bytes),
246 Boolean(bool),
248 BigNumber(Bytes),
250 VerbatimString(Bytes, Bytes),
253 Array(Option<Vec<Frame>>),
255 Set(Vec<Frame>),
257 Map(Vec<(Frame, Frame)>),
259 Attribute(Vec<(Frame, Frame)>),
261 Push(Vec<Frame>),
263}
264
265pub use crate::ParseError;
266
267pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
271 let (frame, consumed) = parse_frame_inner(&input, 0)?;
272 Ok((frame, input.slice(consumed..)))
273}
274
275#[inline(never)]
278fn parse_blob_bounds(
279 buf: &[u8],
280 after_crlf: usize,
281 len: usize,
282) -> Result<(usize, usize), ParseError> {
283 if len == 0 {
284 if after_crlf + 1 >= buf.len() {
285 return Err(ParseError::Incomplete);
286 }
287 if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
288 return Ok((after_crlf, after_crlf));
289 } else {
290 return Err(ParseError::InvalidFormat);
291 }
292 }
293 let data_start = after_crlf;
294 let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
295 if data_end + 1 >= buf.len() {
296 return Err(ParseError::Incomplete);
297 }
298 if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
299 return Err(ParseError::InvalidFormat);
300 }
301 Ok((data_start, data_end))
302}
303
304fn parse_bulk_string(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
306 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
307 let len_bytes = &buf[pos + 1..line_end];
308 if len_bytes == b"?" {
309 return Ok((Frame::StreamedStringHeader, after_crlf));
310 }
311 if len_bytes == b"-1" {
312 return Ok((Frame::BulkString(None), after_crlf));
313 }
314 let len = parse_usize(len_bytes)?;
315 if len > MAX_BULK_STRING_SIZE {
316 return Err(ParseError::BadLength);
317 }
318 let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
319 if data_start == data_end {
320 Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2))
321 } else {
322 Ok((
323 Frame::BulkString(Some(input.slice(data_start..data_end))),
324 data_end + 2,
325 ))
326 }
327}
328
329#[inline(never)]
331fn parse_double_frame(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
332 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
333 let line_bytes = &buf[pos + 1..line_end];
334 if line_bytes == b"inf" || line_bytes == b"-inf" || line_bytes == b"nan" {
335 return Ok((
336 Frame::SpecialFloat(input.slice(pos + 1..line_end)),
337 after_crlf,
338 ));
339 }
340 let s = std::str::from_utf8(line_bytes).map_err(|_| ParseError::Utf8Error)?;
341 let v = s.parse::<f64>().map_err(|_| ParseError::InvalidFormat)?;
342 if v.is_infinite() || v.is_nan() {
343 let canonical = if v.is_nan() {
344 "nan"
345 } else if v.is_sign_negative() {
346 "-inf"
347 } else {
348 "inf"
349 };
350 return Ok((Frame::SpecialFloat(Bytes::from(canonical)), after_crlf));
351 }
352 Ok((Frame::Double(v), after_crlf))
353}
354
355#[inline(never)]
357fn parse_verbatim(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
358 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
359 let len_bytes = &buf[pos + 1..line_end];
360 if len_bytes == b"?" {
361 return Ok((Frame::StreamedVerbatimStringHeader, after_crlf));
362 }
363 if len_bytes == b"-1" {
364 return Err(ParseError::BadLength);
365 }
366 let len = parse_usize(len_bytes)?;
367 if len > MAX_BULK_STRING_SIZE {
368 return Err(ParseError::BadLength);
369 }
370 let data_start = after_crlf;
371 let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
372 if data_end + 1 >= buf.len() {
373 return Err(ParseError::Incomplete);
374 }
375 if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
376 return Err(ParseError::InvalidFormat);
377 }
378 let sep = buf[data_start..data_end]
379 .iter()
380 .position(|&b| b == b':')
381 .ok_or(ParseError::InvalidFormat)?;
382 if sep != 3 {
383 return Err(ParseError::InvalidFormat);
384 }
385 let format = input.slice(data_start..data_start + sep);
386 let content = input.slice(data_start + sep + 1..data_end);
387 Ok((Frame::VerbatimString(format, content), data_end + 2))
388}
389
390#[inline(never)]
392fn parse_blob_error(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
393 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
394 let len_bytes = &buf[pos + 1..line_end];
395 if len_bytes == b"?" {
396 return Ok((Frame::StreamedBlobErrorHeader, after_crlf));
397 }
398 if len_bytes == b"-1" {
399 return Err(ParseError::BadLength);
400 }
401 let len = parse_usize(len_bytes)?;
402 if len > MAX_BULK_STRING_SIZE {
403 return Err(ParseError::BadLength);
404 }
405 let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
406 if data_start == data_end {
407 Ok((Frame::BlobError(Bytes::new()), after_crlf + 2))
408 } else {
409 Ok((
410 Frame::BlobError(input.slice(data_start..data_end)),
411 data_end + 2,
412 ))
413 }
414}
415
416#[inline(never)]
418fn parse_collection(
419 input: &Bytes,
420 buf: &[u8],
421 pos: usize,
422 tag: u8,
423) -> Result<(Frame, usize), ParseError> {
424 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
425 let len_bytes = &buf[pos + 1..line_end];
426
427 if len_bytes == b"?" {
429 return match tag {
430 b'*' => Ok((Frame::StreamedArrayHeader, after_crlf)),
431 b'~' => Ok((Frame::StreamedSetHeader, after_crlf)),
432 b'>' => Ok((Frame::StreamedPushHeader, after_crlf)),
433 _ => unreachable!(),
434 };
435 }
436 if tag == b'*' && len_bytes == b"-1" {
438 return Ok((Frame::Array(None), after_crlf));
439 }
440 let count = parse_count(len_bytes)?;
441 if count == 0 {
442 return match tag {
443 b'*' => Ok((Frame::Array(Some(Vec::new())), after_crlf)),
444 b'~' => Ok((Frame::Set(Vec::new()), after_crlf)),
445 b'>' => Ok((Frame::Push(Vec::new()), after_crlf)),
446 _ => unreachable!(),
447 };
448 }
449 let mut cursor = after_crlf;
450 let mut items = Vec::with_capacity(count);
451 for _ in 0..count {
452 let (item, next) = parse_frame_inner(input, cursor)?;
453 items.push(item);
454 cursor = next;
455 }
456 match tag {
457 b'*' => Ok((Frame::Array(Some(items)), cursor)),
458 b'~' => Ok((Frame::Set(items), cursor)),
459 b'>' => Ok((Frame::Push(items), cursor)),
460 _ => unreachable!(),
461 }
462}
463
464#[inline(never)]
466fn parse_pairs(
467 input: &Bytes,
468 buf: &[u8],
469 pos: usize,
470 tag: u8,
471) -> Result<(Frame, usize), ParseError> {
472 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
473 let len_bytes = &buf[pos + 1..line_end];
474 if len_bytes == b"?" {
475 return if tag == b'%' {
476 Ok((Frame::StreamedMapHeader, after_crlf))
477 } else {
478 Ok((Frame::StreamedAttributeHeader, after_crlf))
479 };
480 }
481 let count = parse_count(len_bytes)?;
482 let mut cursor = after_crlf;
483 let mut pairs = Vec::with_capacity(count);
484 for _ in 0..count {
485 let (key, next1) = parse_frame_inner(input, cursor)?;
486 let (val, next2) = parse_frame_inner(input, next1)?;
487 pairs.push((key, val));
488 cursor = next2;
489 }
490 if tag == b'%' {
491 Ok((Frame::Map(pairs), cursor))
492 } else {
493 Ok((Frame::Attribute(pairs), cursor))
494 }
495}
496
497#[inline(never)]
499fn parse_streamed_chunk(
500 input: &Bytes,
501 buf: &[u8],
502 pos: usize,
503) -> Result<(Frame, usize), ParseError> {
504 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
505 let len = parse_usize(&buf[pos + 1..line_end])?;
506 if len > MAX_BULK_STRING_SIZE {
507 return Err(ParseError::BadLength);
508 }
509 let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
510 if data_start == data_end {
511 Ok((Frame::StreamedStringChunk(Bytes::new()), after_crlf + 2))
512 } else {
513 Ok((
514 Frame::StreamedStringChunk(input.slice(data_start..data_end)),
515 data_end + 2,
516 ))
517 }
518}
519
520fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
524 let buf = input.as_ref();
525 if pos >= buf.len() {
526 return Err(ParseError::Incomplete);
527 }
528
529 let tag = buf[pos];
530
531 match tag {
532 b'+' => {
534 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
535 Ok((
536 Frame::SimpleString(input.slice(pos + 1..line_end)),
537 after_crlf,
538 ))
539 }
540 b'-' => {
541 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
542 Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
543 }
544 b':' => {
545 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
546 let v = parse_i64(&buf[pos + 1..line_end])?;
547 Ok((Frame::Integer(v), after_crlf))
548 }
549 b'#' => {
550 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
551 match &buf[pos + 1..line_end] {
552 b"t" => Ok((Frame::Boolean(true), after_crlf)),
553 b"f" => Ok((Frame::Boolean(false), after_crlf)),
554 _ => Err(ParseError::InvalidBoolean),
555 }
556 }
557 b'(' => {
558 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
559 Ok((Frame::BigNumber(input.slice(pos + 1..line_end)), after_crlf))
560 }
561 b'_' => {
562 if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
563 Ok((Frame::Null, pos + 3))
564 } else {
565 Err(ParseError::Incomplete)
566 }
567 }
568 b'.' => {
569 if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
570 Ok((Frame::StreamTerminator, pos + 3))
571 } else {
572 Err(ParseError::Incomplete)
573 }
574 }
575
576 b'$' => parse_bulk_string(input, buf, pos),
578 b',' => parse_double_frame(input, buf, pos),
579 b'=' => parse_verbatim(input, buf, pos),
580 b'!' => parse_blob_error(input, buf, pos),
581 b';' => parse_streamed_chunk(input, buf, pos),
582
583 b'*' | b'~' | b'>' => parse_collection(input, buf, pos, tag),
585 b'%' | b'|' => parse_pairs(input, buf, pos, tag),
586
587 _ => Err(ParseError::InvalidTag(tag)),
588 }
589}
590
591#[cfg(feature = "unsafe-internals")]
592#[path = "resp3_unchecked.rs"]
593mod unchecked;
594#[cfg(feature = "unsafe-internals")]
595pub use unchecked::parse_frame_unchecked;
596
597pub fn parse_streaming_sequence(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
677 if input.is_empty() {
678 return Err(ParseError::Incomplete);
679 }
680
681 let (header, mut rest) = parse_frame(input)?;
682
683 match header {
684 Frame::StreamedStringHeader => {
685 let mut chunks = Vec::new();
687
688 loop {
689 let (frame, new_rest) = parse_frame(rest)?;
690 rest = new_rest;
691
692 match frame {
693 Frame::StreamedStringChunk(chunk) => {
694 if chunk.is_empty() {
695 break;
697 }
698 chunks.push(chunk);
699 }
700 _ => {
701 return Err(ParseError::InvalidFormat);
702 }
703 }
704 }
705
706 Ok((Frame::StreamedString(chunks), rest))
707 }
708 Frame::StreamedBlobErrorHeader | Frame::StreamedVerbatimStringHeader => {
709 Ok((header, rest))
712 }
713 Frame::StreamedArrayHeader => {
714 let mut items = Vec::new();
716
717 loop {
718 let (frame, new_rest) = parse_frame(rest)?;
719 rest = new_rest;
720
721 match frame {
722 Frame::StreamTerminator => {
723 break;
724 }
725 item => {
726 items.push(item);
727 }
728 }
729 }
730
731 Ok((Frame::StreamedArray(items), rest))
732 }
733 Frame::StreamedSetHeader => {
734 let mut items = Vec::new();
736
737 loop {
738 let (frame, new_rest) = parse_frame(rest)?;
739 rest = new_rest;
740
741 match frame {
742 Frame::StreamTerminator => {
743 break;
744 }
745 item => {
746 items.push(item);
747 }
748 }
749 }
750
751 Ok((Frame::StreamedSet(items), rest))
752 }
753 Frame::StreamedMapHeader => {
754 let mut pairs = Vec::new();
756
757 loop {
758 let (frame, new_rest) = parse_frame(rest)?;
759 rest = new_rest;
760
761 match frame {
762 Frame::StreamTerminator => {
763 break;
764 }
765 key => {
766 let (value, newer_rest) = parse_frame(rest)?;
767 if matches!(value, Frame::StreamTerminator) {
768 return Err(ParseError::InvalidFormat);
769 }
770 rest = newer_rest;
771 pairs.push((key, value));
772 }
773 }
774 }
775
776 Ok((Frame::StreamedMap(pairs), rest))
777 }
778 Frame::StreamedAttributeHeader => {
779 let mut pairs = Vec::new();
781
782 loop {
783 let (frame, new_rest) = parse_frame(rest)?;
784 rest = new_rest;
785
786 match frame {
787 Frame::StreamTerminator => {
788 break;
789 }
790 key => {
791 let (value, newer_rest) = parse_frame(rest)?;
792 if matches!(value, Frame::StreamTerminator) {
793 return Err(ParseError::InvalidFormat);
794 }
795 rest = newer_rest;
796 pairs.push((key, value));
797 }
798 }
799 }
800
801 Ok((Frame::StreamedAttribute(pairs), rest))
802 }
803 Frame::StreamedPushHeader => {
804 let mut items = Vec::new();
806
807 loop {
808 let (frame, new_rest) = parse_frame(rest)?;
809 rest = new_rest;
810
811 match frame {
812 Frame::StreamTerminator => {
813 break;
814 }
815 item => {
816 items.push(item);
817 }
818 }
819 }
820
821 Ok((Frame::StreamedPush(items), rest))
822 }
823 _ => {
824 Ok((header, rest))
826 }
827 }
828}
829
830#[inline]
833fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
834 let mut i = from;
835 let len = buf.len();
836 while i + 1 < len {
837 if buf[i] == b'\r' && buf[i + 1] == b'\n' {
838 return Ok((i, i + 2));
839 }
840 i += 1;
841 }
842 Err(ParseError::Incomplete)
843}
844
845#[inline]
847fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
848 if buf.is_empty() {
849 return Err(ParseError::BadLength);
850 }
851 let mut v: usize = 0;
852 for &b in buf {
853 if !b.is_ascii_digit() {
854 return Err(ParseError::BadLength);
855 }
856 v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
857 v = v
858 .checked_add((b - b'0') as usize)
859 .ok_or(ParseError::BadLength)?;
860 }
861 Ok(v)
862}
863
864#[inline]
866fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
867 if buf.is_empty() {
868 return Err(ParseError::InvalidFormat);
869 }
870 let (neg, digits) = if buf[0] == b'-' {
871 (true, &buf[1..])
872 } else {
873 (false, buf)
874 };
875 if digits.is_empty() {
876 return Err(ParseError::InvalidFormat);
877 }
878 let mut v: i64 = 0;
879 for (i, &d) in digits.iter().enumerate() {
880 if !d.is_ascii_digit() {
881 return Err(ParseError::InvalidFormat);
882 }
883 let digit = (d - b'0') as i64;
884 if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
885 return Ok(i64::MIN);
886 }
887 if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
888 return Err(ParseError::Overflow);
889 }
890 v = v * 10 + digit;
891 }
892 if neg { Ok(-v) } else { Ok(v) }
893}
894
895#[inline]
897fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
898 let count = parse_usize(buf)?;
899 if count > MAX_COLLECTION_SIZE {
900 return Err(ParseError::BadLength);
901 }
902 Ok(count)
903}
904
905pub fn frame_to_bytes(frame: &Frame) -> Bytes {
909 let mut buf = BytesMut::new();
910 serialize_frame(frame, &mut buf);
911 buf.freeze()
912}
913
914fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
915 match frame {
916 Frame::SimpleString(s) => {
917 buf.put_u8(b'+');
918 buf.extend_from_slice(s);
919 buf.extend_from_slice(b"\r\n");
920 }
921 Frame::Error(e) => {
922 buf.put_u8(b'-');
923 buf.extend_from_slice(e);
924 buf.extend_from_slice(b"\r\n");
925 }
926 Frame::Integer(i) => {
927 buf.put_u8(b':');
928 let s = i.to_string();
929 buf.extend_from_slice(s.as_bytes());
930 buf.extend_from_slice(b"\r\n");
931 }
932 Frame::BulkString(opt) => {
933 buf.put_u8(b'$');
934 match opt {
935 Some(data) => {
936 let len = data.len().to_string();
937 buf.extend_from_slice(len.as_bytes());
938 buf.extend_from_slice(b"\r\n");
939 buf.extend_from_slice(data);
940 buf.extend_from_slice(b"\r\n");
941 }
942 None => {
943 buf.extend_from_slice(b"-1\r\n");
944 }
945 }
946 }
947 Frame::BlobError(data) => {
948 buf.put_u8(b'!');
949 let len = data.len().to_string();
950 buf.extend_from_slice(len.as_bytes());
951 buf.extend_from_slice(b"\r\n");
952 buf.extend_from_slice(data);
953 buf.extend_from_slice(b"\r\n");
954 }
955 Frame::StreamedStringHeader => {
956 buf.extend_from_slice(b"$?\r\n");
957 }
958 Frame::StreamedBlobErrorHeader => {
959 buf.extend_from_slice(b"!?\r\n");
960 }
961 Frame::StreamedVerbatimStringHeader => {
962 buf.extend_from_slice(b"=?\r\n");
963 }
964 Frame::StreamedArrayHeader => {
965 buf.extend_from_slice(b"*?\r\n");
966 }
967 Frame::StreamedSetHeader => {
968 buf.extend_from_slice(b"~?\r\n");
969 }
970 Frame::StreamedMapHeader => {
971 buf.extend_from_slice(b"%?\r\n");
972 }
973 Frame::StreamedAttributeHeader => {
974 buf.extend_from_slice(b"|?\r\n");
975 }
976 Frame::StreamedPushHeader => {
977 buf.extend_from_slice(b">?\r\n");
978 }
979 Frame::StreamedStringChunk(data) => {
980 buf.put_u8(b';');
981 let len = data.len().to_string();
982 buf.extend_from_slice(len.as_bytes());
983 buf.extend_from_slice(b"\r\n");
984 buf.extend_from_slice(data);
985 buf.extend_from_slice(b"\r\n");
986 }
987 Frame::StreamedString(chunks) => {
988 buf.extend_from_slice(b"$?\r\n");
990 for chunk in chunks {
991 buf.put_u8(b';');
992 let len = chunk.len().to_string();
993 buf.extend_from_slice(len.as_bytes());
994 buf.extend_from_slice(b"\r\n");
995 buf.extend_from_slice(chunk);
996 buf.extend_from_slice(b"\r\n");
997 }
998 buf.extend_from_slice(b";0\r\n\r\n");
999 }
1000 Frame::StreamedArray(items) => {
1001 buf.extend_from_slice(b"*?\r\n");
1002 for item in items {
1003 serialize_frame(item, buf);
1004 }
1005 buf.extend_from_slice(b".\r\n");
1006 }
1007 Frame::StreamedSet(items) => {
1008 buf.extend_from_slice(b"~?\r\n");
1009 for item in items {
1010 serialize_frame(item, buf);
1011 }
1012 buf.extend_from_slice(b".\r\n");
1013 }
1014 Frame::StreamedMap(pairs) => {
1015 buf.extend_from_slice(b"%?\r\n");
1016 for (key, value) in pairs {
1017 serialize_frame(key, buf);
1018 serialize_frame(value, buf);
1019 }
1020 buf.extend_from_slice(b".\r\n");
1021 }
1022 Frame::StreamedAttribute(pairs) => {
1023 buf.extend_from_slice(b"|?\r\n");
1024 for (key, value) in pairs {
1025 serialize_frame(key, buf);
1026 serialize_frame(value, buf);
1027 }
1028 buf.extend_from_slice(b".\r\n");
1029 }
1030 Frame::StreamedPush(items) => {
1031 buf.extend_from_slice(b">?\r\n");
1032 for item in items {
1033 serialize_frame(item, buf);
1034 }
1035 buf.extend_from_slice(b".\r\n");
1036 }
1037 Frame::StreamTerminator => {
1038 buf.extend_from_slice(b".\r\n");
1039 }
1040 Frame::Null => {
1041 buf.extend_from_slice(b"_\r\n");
1042 }
1043 Frame::Double(d) => {
1044 buf.put_u8(b',');
1045 let s = d.to_string();
1046 buf.extend_from_slice(s.as_bytes());
1047 buf.extend_from_slice(b"\r\n");
1048 }
1049 Frame::SpecialFloat(f) => {
1050 buf.put_u8(b',');
1051 buf.extend_from_slice(f);
1052 buf.extend_from_slice(b"\r\n");
1053 }
1054 Frame::Boolean(b) => {
1055 buf.extend_from_slice(if *b { b"#t\r\n" } else { b"#f\r\n" });
1056 }
1057 Frame::BigNumber(n) => {
1058 buf.put_u8(b'(');
1059 buf.extend_from_slice(n);
1060 buf.extend_from_slice(b"\r\n");
1061 }
1062 Frame::VerbatimString(format, content) => {
1063 buf.put_u8(b'=');
1064 let total_len = format.len() + 1 + content.len(); let len = total_len.to_string();
1066 buf.extend_from_slice(len.as_bytes());
1067 buf.extend_from_slice(b"\r\n");
1068 buf.extend_from_slice(format);
1069 buf.put_u8(b':');
1070 buf.extend_from_slice(content);
1071 buf.extend_from_slice(b"\r\n");
1072 }
1073 Frame::Array(opt) => {
1074 buf.put_u8(b'*');
1075 match opt {
1076 Some(items) => {
1077 let len = items.len().to_string();
1078 buf.extend_from_slice(len.as_bytes());
1079 buf.extend_from_slice(b"\r\n");
1080 for item in items {
1081 serialize_frame(item, buf);
1082 }
1083 }
1084 None => {
1085 buf.extend_from_slice(b"-1\r\n");
1086 }
1087 }
1088 }
1089 Frame::Set(items) => {
1090 buf.put_u8(b'~');
1091 let len = items.len().to_string();
1092 buf.extend_from_slice(len.as_bytes());
1093 buf.extend_from_slice(b"\r\n");
1094 for item in items {
1095 serialize_frame(item, buf);
1096 }
1097 }
1098 Frame::Map(pairs) => {
1099 buf.put_u8(b'%');
1100 let len = pairs.len().to_string();
1101 buf.extend_from_slice(len.as_bytes());
1102 buf.extend_from_slice(b"\r\n");
1103 for (key, value) in pairs {
1104 serialize_frame(key, buf);
1105 serialize_frame(value, buf);
1106 }
1107 }
1108 Frame::Attribute(pairs) => {
1109 buf.put_u8(b'|');
1110 let len = pairs.len().to_string();
1111 buf.extend_from_slice(len.as_bytes());
1112 buf.extend_from_slice(b"\r\n");
1113 for (key, value) in pairs {
1114 serialize_frame(key, buf);
1115 serialize_frame(value, buf);
1116 }
1117 }
1118 Frame::Push(items) => {
1119 buf.put_u8(b'>');
1120 let len = items.len().to_string();
1121 buf.extend_from_slice(len.as_bytes());
1122 buf.extend_from_slice(b"\r\n");
1123 for item in items {
1124 serialize_frame(item, buf);
1125 }
1126 }
1127 }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132 use super::{Frame, ParseError, Parser, frame_to_bytes, parse_frame, parse_streaming_sequence};
1133 use bytes::Bytes;
1134
1135 #[test]
1136 fn test_parse_frame_simple_string() {
1137 let input = Bytes::from("+HELLO\r\nWORLD");
1138 let (frame, rest) = parse_frame(input.clone()).unwrap();
1139 assert_eq!(frame, Frame::SimpleString(Bytes::from("HELLO")));
1140 assert_eq!(rest, Bytes::from("WORLD"));
1141 }
1142
1143 #[test]
1144 fn test_parse_frame_blob_error() {
1145 let input = Bytes::from("!5\r\nERROR\r\nREST");
1146 let (frame, rest) = parse_frame(input.clone()).unwrap();
1147 assert_eq!(frame, Frame::BlobError(Bytes::from("ERROR")));
1148 assert_eq!(rest, Bytes::from("REST"));
1149 }
1150
1151 #[test]
1152 fn test_parse_frame_error() {
1153 let input = Bytes::from("-ERR fail\r\nLEFT");
1154 let (frame, rest) = parse_frame(input.clone()).unwrap();
1155 assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
1156 assert_eq!(rest, Bytes::from("LEFT"));
1157 }
1158
1159 #[test]
1160 fn test_parse_frame_integer() {
1161 let input = Bytes::from(":42\r\nTAIL");
1162 let (frame, rest) = parse_frame(input.clone()).unwrap();
1163 assert_eq!(frame, Frame::Integer(42));
1164 assert_eq!(rest, Bytes::from("TAIL"));
1165 }
1166
1167 #[test]
1168 fn test_parse_frame_bulk_string() {
1169 let input = Bytes::from("$3\r\nfoo\r\nREST");
1170 let (frame, rest) = parse_frame(input.clone()).unwrap();
1171 assert_eq!(frame, Frame::BulkString(Some(Bytes::from("foo"))));
1172 assert_eq!(rest, Bytes::from("REST"));
1173 let null_input = Bytes::from("$-1\r\nAFTER");
1174 let (frame, rest) = parse_frame(null_input.clone()).unwrap();
1175 assert_eq!(frame, Frame::BulkString(None));
1176 assert_eq!(rest, Bytes::from("AFTER"));
1177 }
1178
1179 #[test]
1180 fn test_parse_frame_null() {
1181 let input = Bytes::from("_\r\nLEFT");
1182 let (frame, rest) = parse_frame(input.clone()).unwrap();
1183 assert_eq!(frame, Frame::Null);
1184 assert_eq!(rest, Bytes::from("LEFT"));
1185 }
1186
1187 #[test]
1188 fn test_parse_frame_double_and_special_float() {
1189 let input = Bytes::from(",3.5\r\nNEXT");
1190 let (frame, rest) = parse_frame(input.clone()).unwrap();
1191 assert_eq!(frame, Frame::Double(3.5));
1192 assert_eq!(rest, Bytes::from("NEXT"));
1193 let input_inf = Bytes::from(",inf\r\nTAIL");
1194 let (frame, rest) = parse_frame(input_inf.clone()).unwrap();
1195 assert_eq!(frame, Frame::SpecialFloat(Bytes::from("inf")));
1196 assert_eq!(rest, Bytes::from("TAIL"));
1197 }
1198
1199 #[test]
1200 fn test_parse_frame_boolean() {
1201 let input_true = Bytes::from("#t\r\nXYZ");
1202 let (frame, rest) = parse_frame(input_true.clone()).unwrap();
1203 assert_eq!(frame, Frame::Boolean(true));
1204 assert_eq!(rest, Bytes::from("XYZ"));
1205 let input_false = Bytes::from("#f\r\nDONE");
1206 let (frame, rest) = parse_frame(input_false.clone()).unwrap();
1207 assert_eq!(frame, Frame::Boolean(false));
1208 assert_eq!(rest, Bytes::from("DONE"));
1209 }
1210
1211 #[test]
1212 fn test_parse_frame_big_number() {
1213 let input = Bytes::from("(123456789\r\nEND");
1214 let (frame, rest) = parse_frame(input.clone()).unwrap();
1215 assert_eq!(frame, Frame::BigNumber(Bytes::from("123456789")));
1216 assert_eq!(rest, Bytes::from("END"));
1217 }
1218
1219 #[test]
1220 fn test_parse_frame_verbatim_string() {
1221 let input = Bytes::from("=12\r\ntxt:hi there\r\nAFTER");
1222 let (frame, rest) = parse_frame(input.clone()).unwrap();
1223 assert_eq!(
1224 frame,
1225 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hi there")) );
1230 assert_eq!(rest, Bytes::from("AFTER"));
1231 }
1232
1233 #[test]
1234 fn test_parse_frame_array_set_push_map_attribute() {
1235 let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\nTAIL");
1237 let (frame, rest) = parse_frame(input.clone()).unwrap();
1238 assert_eq!(
1239 frame,
1240 Frame::Array(Some(vec![
1241 Frame::BulkString(Some(Bytes::from("foo"))),
1242 Frame::BulkString(Some(Bytes::from("bar")))
1243 ]))
1244 );
1245 assert_eq!(rest, Bytes::from("TAIL"));
1246 let input_null = Bytes::from("*-1\r\nEND");
1248 let (frame, rest) = parse_frame(input_null.clone()).unwrap();
1249 assert_eq!(frame, Frame::Array(None));
1250 assert_eq!(rest, Bytes::from("END"));
1251 let input_set = Bytes::from("~2\r\n+foo\r\n+bar\r\nTAIL");
1253 let (frame, rest) = parse_frame(input_set.clone()).unwrap();
1254 assert_eq!(
1255 frame,
1256 Frame::Set(vec![
1257 Frame::SimpleString(Bytes::from("foo")),
1258 Frame::SimpleString(Bytes::from("bar")),
1259 ])
1260 );
1261 assert_eq!(rest, Bytes::from("TAIL"));
1262 let input_map = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\nTRAIL");
1264 let (frame, rest) = parse_frame(input_map.clone()).unwrap();
1265 assert_eq!(
1266 frame,
1267 Frame::Map(vec![
1268 (
1269 Frame::SimpleString(Bytes::from("key1")),
1270 Frame::SimpleString(Bytes::from("val1"))
1271 ),
1272 (
1273 Frame::SimpleString(Bytes::from("key2")),
1274 Frame::SimpleString(Bytes::from("val2"))
1275 ),
1276 ])
1277 );
1278 assert_eq!(rest, Bytes::from("TRAIL"));
1279 let input_attr = Bytes::from("|1\r\n+meta\r\n+data\r\nAFTER");
1281 let (frame, rest) = parse_frame(input_attr.clone()).unwrap();
1282 assert_eq!(
1283 frame,
1284 Frame::Attribute(vec![(
1285 Frame::SimpleString(Bytes::from("meta")),
1286 Frame::SimpleString(Bytes::from("data"))
1287 ),])
1288 );
1289 assert_eq!(rest, Bytes::from("AFTER"));
1290 let input_push = Bytes::from(">2\r\n+type\r\n:1\r\nNEXT");
1292 let (frame, rest) = parse_frame(input_push.clone()).unwrap();
1293 assert_eq!(
1294 frame,
1295 Frame::Push(vec![
1296 Frame::SimpleString(Bytes::from("type")),
1297 Frame::Integer(1),
1298 ])
1299 );
1300 assert_eq!(rest, Bytes::from("NEXT"));
1301 }
1302
1303 #[test]
1304 fn test_parse_frame_empty_input() {
1305 assert!(parse_frame(Bytes::new()).is_err());
1306 }
1307
1308 #[test]
1309 fn test_parse_frame_invalid_tag() {
1310 let input = Bytes::from("X123\r\n");
1311 assert!(parse_frame(input).is_err());
1312 }
1313
1314 #[test]
1315 fn test_parse_frame_malformed_bulk_length() {
1316 let input = Bytes::from("$x\r\nfoo\r\n");
1317 assert!(parse_frame(input).is_err());
1318 }
1319
1320 #[test]
1321 fn test_parse_frame_zero_length_bulk() {
1322 let input = Bytes::from("$0\r\n\r\nTAIL");
1323 let (frame, rest) = parse_frame(input.clone()).unwrap();
1324 assert_eq!(frame, Frame::BulkString(Some(Bytes::from(""))));
1325 assert_eq!(rest, Bytes::from("TAIL"));
1326 }
1327
1328 #[test]
1329 fn test_parse_frame_zero_length_blob_error() {
1330 let input = Bytes::from("!0\r\n\r\nREST");
1331 let (frame, rest) = parse_frame(input.clone()).unwrap();
1332 assert_eq!(frame, Frame::BlobError(Bytes::new()));
1333 assert_eq!(rest, Bytes::from("REST"));
1334 }
1335
1336 #[test]
1337 fn test_parse_frame_missing_crlf() {
1338 let input = Bytes::from(":42\nTAIL");
1339 assert!(parse_frame(input).is_err());
1340 }
1341
1342 #[test]
1343 fn test_parse_frame_unicode_simple_string() {
1344 let input = Bytes::from("+こんにちは\r\nEND");
1345 let (frame, rest) = parse_frame(input.clone()).unwrap();
1346 assert_eq!(frame, Frame::SimpleString(Bytes::from("こんにちは")));
1347 assert_eq!(rest, Bytes::from("END"));
1348 }
1349
1350 #[test]
1351 fn test_parse_frame_chained_frames() {
1352 let combined = Bytes::from("+OK\r\n:1\r\nfoo");
1353 let (f1, rem) = parse_frame(combined.clone()).unwrap();
1354 assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
1355 let (f2, rem2) = parse_frame(rem).unwrap();
1356 assert_eq!(f2, Frame::Integer(1));
1357 assert_eq!(rem2, Bytes::from("foo"));
1358 }
1359
1360 #[test]
1361 fn test_parse_frame_empty_array() {
1362 let input = Bytes::from("*0\r\nTAIL");
1363 let (frame, rest) = parse_frame(input.clone()).unwrap();
1364 assert_eq!(frame, Frame::Array(Some(vec![])));
1365 assert_eq!(rest, Bytes::from("TAIL"));
1366 }
1367
1368 #[test]
1369 fn test_parse_frame_partial_array_data() {
1370 let input = Bytes::from("*2\r\n+OK\r\n");
1371 assert!(parse_frame(input).is_err());
1372 }
1373
1374 #[test]
1375 fn test_parse_frame_streamed_string() {
1376 let input = Bytes::from("$?\r\n$5\r\nhello\r\n$0\r\n\r\nREST");
1377 let (frame, rem) = parse_frame(input.clone()).unwrap();
1378 assert_eq!(frame, Frame::StreamedStringHeader);
1379 let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1380 assert_eq!(chunk, Frame::BulkString(Some(Bytes::from("hello"))));
1381 let (terminator, rest) = parse_frame(rem2.clone()).unwrap();
1382 assert_eq!(terminator, Frame::BulkString(Some(Bytes::from(""))));
1383 assert_eq!(rest, Bytes::from("REST"));
1384 }
1385
1386 #[test]
1387 fn test_parse_frame_streamed_blob_error() {
1388 let input = Bytes::from("!?\r\n!5\r\nERROR\r\nREST");
1389 let (frame, rem) = parse_frame(input.clone()).unwrap();
1390 assert_eq!(frame, Frame::StreamedBlobErrorHeader);
1391 let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1392 assert_eq!(chunk, Frame::BlobError(Bytes::from("ERROR")));
1393 assert_eq!(rem2, Bytes::from("REST"));
1394 }
1395
1396 #[test]
1397 fn test_parse_frame_streamed_verbatim_string() {
1398 let input = Bytes::from("=?\r\n=9\r\ntxt:hello\r\nTAIL");
1399 let (frame, rem) = parse_frame(input.clone()).unwrap();
1400 assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
1401 let (chunk, rest) = parse_frame(rem.clone()).unwrap();
1402 assert_eq!(
1403 chunk,
1404 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello")) );
1409 assert_eq!(rest, Bytes::from("TAIL"));
1410 }
1411
1412 #[test]
1413 fn test_parse_frame_streamed_array() {
1414 let input = Bytes::from("*?\r\n+one\r\n+two\r\n*0\r\nEND");
1415 let (header, rem) = parse_frame(input.clone()).unwrap();
1416 assert_eq!(header, Frame::StreamedArrayHeader);
1417 let (item1, rem2) = parse_frame(rem.clone()).unwrap();
1418 assert_eq!(item1, Frame::SimpleString(Bytes::from("one")));
1419 let (item2, rem3) = parse_frame(rem2.clone()).unwrap();
1420 assert_eq!(item2, Frame::SimpleString(Bytes::from("two")));
1421 let (terminator, rest) = parse_frame(rem3.clone()).unwrap();
1422 assert_eq!(terminator, Frame::Array(Some(vec![])));
1423 assert_eq!(rest, Bytes::from("END"));
1424 }
1425
1426 #[test]
1427 fn test_parse_frame_streamed_set_map_attr_push() {
1428 let input_set = Bytes::from("~?\r\n+foo\r\n+bar\r\n~0\r\nTAIL");
1430 let (h_set, rem0) = parse_frame(input_set.clone()).unwrap();
1431 assert_eq!(h_set, Frame::StreamedSetHeader);
1432 let (s1, rem1) = parse_frame(rem0.clone()).unwrap();
1433 assert_eq!(s1, Frame::SimpleString(Bytes::from("foo")));
1434 let (s2, rem2) = parse_frame(rem1.clone()).unwrap();
1435 assert_eq!(s2, Frame::SimpleString(Bytes::from("bar")));
1436 let (term_set, rest_set) = parse_frame(rem2.clone()).unwrap();
1437 assert_eq!(term_set, Frame::Set(vec![]));
1438 assert_eq!(rest_set, Bytes::from("TAIL"));
1439 let input_map = Bytes::from("%?\r\n+key\r\n+val\r\n%0\r\nNEXT");
1441 let (h_map, rem_map) = parse_frame(input_map.clone()).unwrap();
1442 assert_eq!(h_map, Frame::StreamedMapHeader);
1443 let (k, rem_map2) = parse_frame(rem_map.clone()).unwrap();
1444 assert_eq!(k, Frame::SimpleString(Bytes::from("key")));
1445 let (v, rem_map3) = parse_frame(rem_map2.clone()).unwrap();
1446 assert_eq!(v, Frame::SimpleString(Bytes::from("val")));
1447 let (term_map, rest_map4) = parse_frame(rem_map3.clone()).unwrap();
1448 assert_eq!(term_map, Frame::Map(vec![]));
1449 assert_eq!(rest_map4, Bytes::from("NEXT"));
1450 let input_attr = Bytes::from("|?\r\n+meta\r\n+info\r\n|0\r\nMORE");
1452 let (h_attr, rem_attr) = parse_frame(input_attr.clone()).unwrap();
1453 assert_eq!(h_attr, Frame::StreamedAttributeHeader);
1454 let (a1, rem_attr2) = parse_frame(rem_attr.clone()).unwrap();
1455 assert_eq!(a1, Frame::SimpleString(Bytes::from("meta")));
1456 let (a2, rem_attr3) = parse_frame(rem_attr2.clone()).unwrap();
1457 assert_eq!(a2, Frame::SimpleString(Bytes::from("info")));
1458 let (term_attr, rest_attr) = parse_frame(rem_attr3.clone()).unwrap();
1459 assert_eq!(term_attr, Frame::Attribute(vec![]));
1460 assert_eq!(rest_attr, Bytes::from("MORE"));
1461 let input_push = Bytes::from(">?\r\n:1\r\n:2\r\n>0\r\nEND");
1463 let (h_push, rem_push) = parse_frame(input_push.clone()).unwrap();
1464 assert_eq!(h_push, Frame::StreamedPushHeader);
1465 let (p1, rem_push2) = parse_frame(rem_push.clone()).unwrap();
1466 assert_eq!(p1, Frame::Integer(1));
1467 let (p2, rem_push3) = parse_frame(rem_push2.clone()).unwrap();
1468 assert_eq!(p2, Frame::Integer(2));
1469 let (term_push, rest_push) = parse_frame(rem_push3.clone()).unwrap();
1470 assert_eq!(term_push, Frame::Push(vec![]));
1471 assert_eq!(rest_push, Bytes::from("END"));
1472 }
1473
1474 #[test]
1475 fn test_parse_frame_stream_terminator() {
1476 let input = Bytes::from(".\r\nREST");
1477 let (frame, rest) = parse_frame(input.clone()).unwrap();
1478 assert_eq!(frame, Frame::StreamTerminator);
1479 assert_eq!(rest, Bytes::from("REST"));
1480 }
1481
1482 #[test]
1483 fn test_parse_frame_null_blob_error_rejected() {
1484 let input = Bytes::from("!-1\r\nTAIL");
1485 assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1486 }
1487
1488 #[test]
1489 fn test_parse_frame_null_verbatim_rejected() {
1490 let input = Bytes::from("=-1\r\nTAIL");
1491 assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1492 }
1493
1494 #[test]
1495 fn test_verbatim_string_format_must_be_3_bytes() {
1496 let input = Bytes::from("=6\r\nx:data\r\n");
1498 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1499
1500 let input = Bytes::from("=9\r\ntxtx:data\r\n");
1502 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1503
1504 let input = Bytes::from("=5\r\n:data\r\n");
1506 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1507
1508 let input = Bytes::from("=8\r\ntxt:data\r\n");
1510 let (frame, _) = parse_frame(input).unwrap();
1511 assert_eq!(
1512 frame,
1513 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("data"))
1514 );
1515 }
1516
1517 #[test]
1518 fn test_parse_frame_special_float_nan() {
1519 let input = Bytes::from(",nan\r\nTAIL");
1520 let (frame, rest) = parse_frame(input.clone()).unwrap();
1521 assert_eq!(frame, Frame::SpecialFloat(Bytes::from("nan")));
1522 assert_eq!(rest, Bytes::from("TAIL"));
1523 }
1524
1525 #[test]
1526 fn test_parse_frame_big_number_zero() {
1527 let input = Bytes::from("(0\r\nEND");
1528 let (frame, rest) = parse_frame(input.clone()).unwrap();
1529 assert_eq!(frame, Frame::BigNumber(Bytes::from("0")));
1530 assert_eq!(rest, Bytes::from("END"));
1531 }
1532
1533 #[test]
1534 fn test_parse_frame_collection_empty() {
1535 let input_push = Bytes::from(">0\r\nTAIL");
1536 let (f_push, r_push) = parse_frame(input_push.clone()).unwrap();
1537 assert_eq!(f_push, Frame::Push(vec![]));
1538 assert_eq!(r_push, Bytes::from("TAIL"));
1539 let input_attr = Bytes::from("|0\r\nAFTER");
1540 let (f_attr, r_attr) = parse_frame(input_attr.clone()).unwrap();
1541 assert_eq!(f_attr, Frame::Attribute(vec![]));
1542 assert_eq!(r_attr, Bytes::from("AFTER"));
1543 let input_map = Bytes::from("%0\r\nEND");
1544 let (f_map, r_map) = parse_frame(input_map.clone()).unwrap();
1545 assert_eq!(f_map, Frame::Map(vec![]));
1546 assert_eq!(r_map, Bytes::from("END"));
1547 let input_set = Bytes::from("~0\r\nDONE");
1548 let (f_set, r_set) = parse_frame(input_set.clone()).unwrap();
1549 assert_eq!(f_set, Frame::Set(vec![]));
1550 assert_eq!(r_set, Bytes::from("DONE"));
1551 let input_arr = Bytes::from("*-1\r\nFIN");
1552 let (f_arr, r_arr) = parse_frame(input_arr.clone()).unwrap();
1553 assert_eq!(f_arr, Frame::Array(None));
1554 assert_eq!(r_arr, Bytes::from("FIN"));
1555 }
1556
1557 #[test]
1560 fn test_roundtrip_simple_string() {
1561 let original = Bytes::from("+hello\r\n");
1562 let (frame, _) = parse_frame(original.clone()).unwrap();
1563 let serialized = frame_to_bytes(&frame);
1564 assert_eq!(original, serialized);
1565
1566 let (reparsed, _) = parse_frame(serialized).unwrap();
1567 assert_eq!(frame, reparsed);
1568 }
1569
1570 #[test]
1571 fn test_roundtrip_error() {
1572 let original = Bytes::from("-ERR error message\r\n");
1573 let (frame, _) = parse_frame(original.clone()).unwrap();
1574 let serialized = frame_to_bytes(&frame);
1575 assert_eq!(original, serialized);
1576
1577 let (reparsed, _) = parse_frame(serialized).unwrap();
1578 assert_eq!(frame, reparsed);
1579 }
1580
1581 #[test]
1582 fn test_roundtrip_integer() {
1583 let original = Bytes::from(":12345\r\n");
1584 let (frame, _) = parse_frame(original.clone()).unwrap();
1585 let serialized = frame_to_bytes(&frame);
1586 assert_eq!(original, serialized);
1587
1588 let (reparsed, _) = parse_frame(serialized).unwrap();
1589 assert_eq!(frame, reparsed);
1590 }
1591
1592 #[test]
1593 fn test_roundtrip_bulk_string() {
1594 let original = Bytes::from("$5\r\nhello\r\n");
1595 let (frame, _) = parse_frame(original.clone()).unwrap();
1596 let serialized = frame_to_bytes(&frame);
1597 assert_eq!(original, serialized);
1598
1599 let (reparsed, _) = parse_frame(serialized).unwrap();
1600 assert_eq!(frame, reparsed);
1601
1602 let original_null = Bytes::from("$-1\r\n");
1604 let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1605 let serialized_null = frame_to_bytes(&frame_null);
1606 assert_eq!(original_null, serialized_null);
1607
1608 let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1609 assert_eq!(frame_null, reparsed_null);
1610 }
1611
1612 #[test]
1613 fn test_roundtrip_blob_error() {
1614 let original = Bytes::from("!5\r\nerror\r\n");
1615 let (frame, _) = parse_frame(original.clone()).unwrap();
1616 let serialized = frame_to_bytes(&frame);
1617 assert_eq!(original, serialized);
1618
1619 let (reparsed, _) = parse_frame(serialized).unwrap();
1620 assert_eq!(frame, reparsed);
1621 }
1622
1623 #[test]
1624 fn test_roundtrip_null() {
1625 let original = Bytes::from("_\r\n");
1626 let (frame, _) = parse_frame(original.clone()).unwrap();
1627 let serialized = frame_to_bytes(&frame);
1628 assert_eq!(original, serialized);
1629
1630 let (reparsed, _) = parse_frame(serialized).unwrap();
1631 assert_eq!(frame, reparsed);
1632 }
1633
1634 #[test]
1635 fn test_roundtrip_double() {
1636 let original = Bytes::from(",3.14159\r\n");
1637 let (frame, _) = parse_frame(original.clone()).unwrap();
1638 let serialized = frame_to_bytes(&frame);
1639
1640 let (reparsed, _) = parse_frame(serialized).unwrap();
1643 assert_eq!(frame, reparsed);
1644 }
1645
1646 #[test]
1647 fn test_roundtrip_special_float() {
1648 let original = Bytes::from(",inf\r\n");
1649 let (frame, _) = parse_frame(original.clone()).unwrap();
1650 let serialized = frame_to_bytes(&frame);
1651 assert_eq!(original, serialized);
1652
1653 let (reparsed, _) = parse_frame(serialized).unwrap();
1654 assert_eq!(frame, reparsed);
1655 }
1656
1657 #[test]
1658 fn test_roundtrip_boolean() {
1659 let original_true = Bytes::from("#t\r\n");
1660 let (frame_true, _) = parse_frame(original_true.clone()).unwrap();
1661 let serialized_true = frame_to_bytes(&frame_true);
1662 assert_eq!(original_true, serialized_true);
1663
1664 let (reparsed_true, _) = parse_frame(serialized_true).unwrap();
1665 assert_eq!(frame_true, reparsed_true);
1666
1667 let original_false = Bytes::from("#f\r\n");
1668 let (frame_false, _) = parse_frame(original_false.clone()).unwrap();
1669 let serialized_false = frame_to_bytes(&frame_false);
1670 assert_eq!(original_false, serialized_false);
1671
1672 let (reparsed_false, _) = parse_frame(serialized_false).unwrap();
1673 assert_eq!(frame_false, reparsed_false);
1674 }
1675
1676 #[test]
1677 fn test_roundtrip_big_number() {
1678 let original = Bytes::from("(12345678901234567890\r\n");
1679 let (frame, _) = parse_frame(original.clone()).unwrap();
1680 let serialized = frame_to_bytes(&frame);
1681 assert_eq!(original, serialized);
1682
1683 let (reparsed, _) = parse_frame(serialized).unwrap();
1684 assert_eq!(frame, reparsed);
1685 }
1686
1687 #[test]
1688 fn test_roundtrip_verbatim_string() {
1689 let original = Bytes::from("=10\r\ntxt:hello!\r\n");
1690 let (frame, _) = parse_frame(original.clone()).unwrap();
1691 let serialized = frame_to_bytes(&frame);
1692 assert_eq!(original, serialized);
1693
1694 let (reparsed, _) = parse_frame(serialized).unwrap();
1695 assert_eq!(frame, reparsed);
1696 }
1697
1698 #[test]
1699 fn test_roundtrip_array() {
1700 let original = Bytes::from("*2\r\n+hello\r\n:123\r\n");
1701 let (frame, _) = parse_frame(original.clone()).unwrap();
1702 let serialized = frame_to_bytes(&frame);
1703 assert_eq!(original, serialized);
1704
1705 let (reparsed, _) = parse_frame(serialized).unwrap();
1706 assert_eq!(frame, reparsed);
1707
1708 let original_null = Bytes::from("*-1\r\n");
1710 let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1711 let serialized_null = frame_to_bytes(&frame_null);
1712 assert_eq!(original_null, serialized_null);
1713
1714 let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1715 assert_eq!(frame_null, reparsed_null);
1716 }
1717
1718 #[test]
1719 fn test_roundtrip_set() {
1720 let original = Bytes::from("~2\r\n+one\r\n+two\r\n");
1721 let (frame, _) = parse_frame(original.clone()).unwrap();
1722 let serialized = frame_to_bytes(&frame);
1723 assert_eq!(original, serialized);
1724
1725 let (reparsed, _) = parse_frame(serialized).unwrap();
1726 assert_eq!(frame, reparsed);
1727 }
1728
1729 #[test]
1730 fn test_roundtrip_map() {
1731 let original = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\n");
1732 let (frame, _) = parse_frame(original.clone()).unwrap();
1733 let serialized = frame_to_bytes(&frame);
1734 assert_eq!(original, serialized);
1735
1736 let (reparsed, _) = parse_frame(serialized).unwrap();
1737 assert_eq!(frame, reparsed);
1738 }
1739
1740 #[test]
1741 fn test_roundtrip_attribute() {
1742 let original = Bytes::from("|1\r\n+key\r\n+val\r\n");
1743 let (frame, _) = parse_frame(original.clone()).unwrap();
1744 let serialized = frame_to_bytes(&frame);
1745 assert_eq!(original, serialized);
1746
1747 let (reparsed, _) = parse_frame(serialized).unwrap();
1748 assert_eq!(frame, reparsed);
1749 }
1750
1751 #[test]
1752 fn test_roundtrip_push() {
1753 let original = Bytes::from(">2\r\n+msg\r\n+data\r\n");
1754 let (frame, _) = parse_frame(original.clone()).unwrap();
1755 let serialized = frame_to_bytes(&frame);
1756 assert_eq!(original, serialized);
1757
1758 let (reparsed, _) = parse_frame(serialized).unwrap();
1759 assert_eq!(frame, reparsed);
1760 }
1761
1762 #[test]
1763 fn test_roundtrip_streaming_headers() {
1764 let headers = [
1765 ("$?\r\n", Frame::StreamedStringHeader),
1766 ("!?\r\n", Frame::StreamedBlobErrorHeader),
1767 ("=?\r\n", Frame::StreamedVerbatimStringHeader),
1768 ("*?\r\n", Frame::StreamedArrayHeader),
1769 ("~?\r\n", Frame::StreamedSetHeader),
1770 ("%?\r\n", Frame::StreamedMapHeader),
1771 ("|?\r\n", Frame::StreamedAttributeHeader),
1772 (">?\r\n", Frame::StreamedPushHeader),
1773 (".\r\n", Frame::StreamTerminator),
1774 ];
1775
1776 for (original_str, expected_frame) in headers {
1777 let original = Bytes::from(original_str);
1778 let (frame, _) = parse_frame(original.clone()).unwrap();
1779 assert_eq!(frame, expected_frame);
1780
1781 let serialized = frame_to_bytes(&frame);
1782 assert_eq!(original, serialized);
1783
1784 let (reparsed, _) = parse_frame(serialized).unwrap();
1785 assert_eq!(frame, reparsed);
1786 }
1787 }
1788
1789 #[test]
1790 fn test_roundtrip_streaming_chunks() {
1791 let chunks = [
1792 (
1793 ";4\r\nHell\r\n",
1794 Frame::StreamedStringChunk(Bytes::from("Hell")),
1795 ),
1796 (
1797 ";5\r\no wor\r\n",
1798 Frame::StreamedStringChunk(Bytes::from("o wor")),
1799 ),
1800 (";1\r\nd\r\n", Frame::StreamedStringChunk(Bytes::from("d"))),
1801 (";0\r\n\r\n", Frame::StreamedStringChunk(Bytes::new())),
1802 (
1803 ";11\r\nHello World\r\n",
1804 Frame::StreamedStringChunk(Bytes::from("Hello World")),
1805 ),
1806 ];
1807
1808 for (original_str, expected_frame) in chunks {
1809 let original = Bytes::from(original_str);
1810 let (frame, rest) = parse_frame(original.clone()).unwrap();
1811 assert_eq!(frame, expected_frame);
1812 assert!(rest.is_empty());
1813
1814 let serialized = frame_to_bytes(&frame);
1815 assert_eq!(original, serialized);
1816
1817 let (reparsed, _) = parse_frame(serialized).unwrap();
1818 assert_eq!(frame, reparsed);
1819 }
1820 }
1821
1822 #[test]
1823 fn test_streaming_chunks_edge_cases() {
1824 let data = Bytes::from(";4\r\nHel");
1826 let result = parse_frame(data);
1827 assert!(matches!(result, Err(ParseError::Incomplete)));
1828
1829 let data = Bytes::from(";4\r\nHell");
1831 let result = parse_frame(data);
1832 assert!(matches!(result, Err(ParseError::Incomplete)));
1833
1834 let data = Bytes::from(";abc\r\ndata\r\n");
1836 let result = parse_frame(data);
1837 assert!(matches!(result, Err(ParseError::BadLength)));
1838
1839 let data = Bytes::from(";-1\r\ndata\r\n");
1841 let result = parse_frame(data);
1842 assert!(matches!(result, Err(ParseError::BadLength)));
1843
1844 let data = Bytes::from(";5\r\nHell\r\n");
1846 let result = parse_frame(data);
1847 assert!(matches!(result, Err(ParseError::Incomplete)));
1848
1849 let data = Bytes::from(";0\r\n");
1851 let result = parse_frame(data);
1852 assert!(matches!(result, Err(ParseError::Incomplete)));
1853
1854 let binary_data = b"\x00\x01\x02\x03\xFF";
1856 let mut chunk_data = Vec::new();
1857 chunk_data.extend_from_slice(b";5\r\n");
1858 chunk_data.extend_from_slice(binary_data);
1859 chunk_data.extend_from_slice(b"\r\n");
1860 let data = Bytes::from(chunk_data);
1861 let result = parse_frame(data);
1862 assert!(result.is_ok());
1863 let (frame, _) = result.unwrap();
1864 if let Frame::StreamedStringChunk(chunk) = frame {
1865 assert_eq!(chunk.as_ref(), binary_data);
1866 }
1867 }
1868
1869 #[test]
1870 fn test_roundtrip_streaming_sequences() {
1871 let streaming_string = Frame::StreamedString(vec![
1873 Bytes::from("Hell"),
1874 Bytes::from("o wor"),
1875 Bytes::from("ld"),
1876 ]);
1877 let serialized = frame_to_bytes(&streaming_string);
1878 let expected = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;2\r\nld\r\n;0\r\n\r\n";
1879 assert_eq!(serialized, Bytes::from(expected));
1880
1881 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1882 assert_eq!(parsed, streaming_string);
1883
1884 let streaming_array = Frame::StreamedArray(vec![
1886 Frame::SimpleString(Bytes::from("hello")),
1887 Frame::Integer(42),
1888 Frame::Boolean(true),
1889 ]);
1890 let serialized = frame_to_bytes(&streaming_array);
1891 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1892 assert_eq!(parsed, streaming_array);
1893
1894 let streaming_map = Frame::StreamedMap(vec![
1896 (
1897 Frame::SimpleString(Bytes::from("key1")),
1898 Frame::SimpleString(Bytes::from("val1")),
1899 ),
1900 (
1901 Frame::SimpleString(Bytes::from("key2")),
1902 Frame::Integer(123),
1903 ),
1904 ]);
1905 let serialized = frame_to_bytes(&streaming_map);
1906 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1907 assert_eq!(parsed, streaming_map);
1908
1909 let empty_streaming = Frame::StreamedString(vec![]);
1911 let serialized = frame_to_bytes(&empty_streaming);
1912 let expected = "$?\r\n;0\r\n\r\n";
1913 assert_eq!(serialized, Bytes::from(expected));
1914 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1915 assert_eq!(parsed, empty_streaming);
1916
1917 let streaming_set = Frame::StreamedSet(vec![
1919 Frame::SimpleString(Bytes::from("apple")),
1920 Frame::SimpleString(Bytes::from("banana")),
1921 Frame::Integer(42),
1922 ]);
1923 let serialized = frame_to_bytes(&streaming_set);
1924 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1925 assert_eq!(parsed, streaming_set);
1926
1927 let streaming_attribute = Frame::StreamedAttribute(vec![
1929 (
1930 Frame::SimpleString(Bytes::from("trace-id")),
1931 Frame::SimpleString(Bytes::from("abc123")),
1932 ),
1933 (
1934 Frame::SimpleString(Bytes::from("span-id")),
1935 Frame::SimpleString(Bytes::from("def456")),
1936 ),
1937 ]);
1938 let serialized = frame_to_bytes(&streaming_attribute);
1939 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1940 assert_eq!(parsed, streaming_attribute);
1941
1942 let streaming_push = Frame::StreamedPush(vec![
1944 Frame::SimpleString(Bytes::from("pubsub")),
1945 Frame::SimpleString(Bytes::from("channel1")),
1946 Frame::SimpleString(Bytes::from("message data")),
1947 ]);
1948 let serialized = frame_to_bytes(&streaming_push);
1949 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1950 assert_eq!(parsed, streaming_push);
1951
1952 let empty_array = Frame::StreamedArray(vec![]);
1954 let serialized = frame_to_bytes(&empty_array);
1955 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1956 assert_eq!(parsed, empty_array);
1957
1958 let empty_set = Frame::StreamedSet(vec![]);
1959 let serialized = frame_to_bytes(&empty_set);
1960 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1961 assert_eq!(parsed, empty_set);
1962 }
1963
1964 #[test]
1965 fn test_streaming_sequences_edge_cases() {
1966 let data = Bytes::from("$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n");
1968 let result = parse_streaming_sequence(data);
1969 assert!(matches!(result, Err(ParseError::Incomplete)));
1970
1971 let data = Bytes::from("$?\r\n;abc\r\nHell\r\n;0\r\n");
1973 let result = parse_streaming_sequence(data);
1974 assert!(matches!(result, Err(ParseError::BadLength)));
1975
1976 let data = Bytes::from("*?\r\n+hello\r\n:42\r\n");
1978 let result = parse_streaming_sequence(data);
1979 assert!(matches!(result, Err(ParseError::Incomplete)));
1980
1981 let data = Bytes::from("*?\r\n+hello\r\n*2\r\n:1\r\n:2\r\n.\r\n");
1983 let result = parse_streaming_sequence(data);
1984 assert!(result.is_ok());
1985 let (frame, _) = result.unwrap();
1986 if let Frame::StreamedArray(items) = frame {
1987 assert_eq!(items.len(), 2);
1988 assert!(matches!(items[0], Frame::SimpleString(_)));
1989 assert!(matches!(items[1], Frame::Array(_)));
1990 }
1991
1992 let data = Bytes::from("*?\r\n.\r\n");
1994 let result = parse_streaming_sequence(data);
1995 assert!(result.is_ok());
1996 let (frame, _) = result.unwrap();
1997 if let Frame::StreamedArray(items) = frame {
1998 assert!(items.is_empty());
1999 }
2000
2001 let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+orphan\r\n.\r\n");
2003 let result = parse_streaming_sequence(data);
2004 assert!(matches!(result, Err(ParseError::InvalidFormat)));
2005
2006 let data = Bytes::from("+simple\r\n");
2008 let result = parse_streaming_sequence(data);
2009 assert!(result.is_ok());
2010 let (frame, _) = result.unwrap();
2011 assert!(matches!(frame, Frame::SimpleString(_)));
2012
2013 let data = Bytes::from(";999999999999999999\r\ndata\r\n");
2015 let result = parse_frame(data);
2016 match &result {
2019 Err(ParseError::BadLength) => {} Err(ParseError::Incomplete) => {} Err(e) => panic!("Got unexpected error type: {e:?}"),
2022 Ok(_) => panic!("Large chunk size should fail"),
2023 }
2024
2025 let data = Bytes::from("$?\r\n+invalid\r\n;0\r\n");
2027 let result = parse_streaming_sequence(data);
2028 assert!(matches!(result, Err(ParseError::InvalidFormat)));
2029
2030 let data = Bytes::from("*?\r\n+hello\r\n.corrupted\r\n");
2032 let result = parse_streaming_sequence(data);
2033 assert!(matches!(result, Err(ParseError::Incomplete)));
2034
2035 let data = Bytes::new();
2037 let result = parse_streaming_sequence(data);
2038 assert!(matches!(result, Err(ParseError::Incomplete)));
2039
2040 let data = Bytes::from("*?\r\n+hello\r\n$5\r\nwo");
2042 let result = parse_streaming_sequence(data);
2043 assert!(matches!(result, Err(ParseError::Incomplete)));
2044 }
2045
2046 #[test]
2047 fn test_roundtrip_nested_structures() {
2048 let original = Bytes::from(
2050 "*3\r\n+hello\r\n%2\r\n+key1\r\n:123\r\n+key2\r\n~1\r\n+item\r\n|1\r\n+meta\r\n+data\r\n",
2051 );
2052 let (frame, _) = parse_frame(original.clone()).unwrap();
2053 let serialized = frame_to_bytes(&frame);
2054
2055 let (reparsed, _) = parse_frame(serialized).unwrap();
2056 assert_eq!(frame, reparsed);
2057 }
2058
2059 #[test]
2060 fn test_zero_length_bulk_string_requires_trailing_crlf() {
2061 let input = Bytes::from("$0\r\n\r\nTAIL");
2063 let (frame, rest) = parse_frame(input).unwrap();
2064 assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
2065 assert_eq!(rest, Bytes::from("TAIL"));
2066
2067 let input = Bytes::from("$0\r\n");
2069 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2070
2071 let input = Bytes::from("$0\r\n\r");
2073 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2074
2075 let input = Bytes::from("$0\r\nXY");
2077 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2078 }
2079
2080 #[test]
2081 fn test_zero_length_streamed_chunk_requires_trailing_crlf() {
2082 let input = Bytes::from(";0\r\n\r\nTAIL");
2084 let (frame, rest) = parse_frame(input).unwrap();
2085 assert_eq!(frame, Frame::StreamedStringChunk(Bytes::new()));
2086 assert_eq!(rest, Bytes::from("TAIL"));
2087
2088 let input = Bytes::from(";0\r\n");
2090 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2091
2092 let input = Bytes::from(";0\r\nXY");
2094 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2095 }
2096
2097 #[test]
2098 fn test_integer_overflow_returns_overflow_error() {
2099 let input = Bytes::from(":9223372036854775808\r\n");
2101 assert_eq!(parse_frame(input), Err(ParseError::Overflow));
2102
2103 let input = Bytes::from(":9223372036854775807\r\n");
2105 let (frame, _) = parse_frame(input).unwrap();
2106 assert_eq!(frame, Frame::Integer(i64::MAX));
2107
2108 let input = Bytes::from(":-9223372036854775808\r\n");
2110 let (frame, _) = parse_frame(input).unwrap();
2111 assert_eq!(frame, Frame::Integer(i64::MIN));
2112 }
2113
2114 #[test]
2115 fn test_parser_propagates_errors() {
2116 let mut parser = Parser::new();
2117 parser.feed(Bytes::from("XINVALID\r\n"));
2118 let result = parser.next_frame();
2119 assert!(result.is_err());
2120 assert_eq!(result.unwrap_err(), ParseError::InvalidTag(b'X'));
2121 }
2122
2123 #[test]
2124 fn test_parser_returns_ok_none_for_incomplete() {
2125 let mut parser = Parser::new();
2126 parser.feed(Bytes::from("+HELL"));
2127 assert_eq!(parser.next_frame().unwrap(), None);
2128 }
2129
2130 #[test]
2131 fn test_integer_negative_overflow() {
2132 assert!(parse_frame(Bytes::from(":-9223372036854775809\r\n")).is_err());
2134 }
2135
2136 #[test]
2137 fn test_nonempty_bulk_malformed_terminator() {
2138 assert_eq!(
2140 parse_frame(Bytes::from("$3\r\nfoo")),
2141 Err(ParseError::Incomplete)
2142 );
2143 assert_eq!(
2145 parse_frame(Bytes::from("$3\r\nfooX")),
2146 Err(ParseError::Incomplete)
2147 );
2148 assert_eq!(
2150 parse_frame(Bytes::from("$3\r\nfooXY")),
2151 Err(ParseError::InvalidFormat)
2152 );
2153 }
2154
2155 #[test]
2156 fn test_blob_error_malformed_terminator() {
2157 assert_eq!(
2158 parse_frame(Bytes::from("!3\r\nerr")),
2159 Err(ParseError::Incomplete)
2160 );
2161 assert_eq!(
2162 parse_frame(Bytes::from("!3\r\nerrXY")),
2163 Err(ParseError::InvalidFormat)
2164 );
2165 }
2166
2167 #[test]
2168 fn test_verbatim_string_malformed_terminator() {
2169 assert_eq!(
2170 parse_frame(Bytes::from("=8\r\ntxt:data")),
2171 Err(ParseError::Incomplete)
2172 );
2173 assert_eq!(
2174 parse_frame(Bytes::from("=8\r\ntxt:dataXY")),
2175 Err(ParseError::InvalidFormat)
2176 );
2177 }
2178
2179 #[test]
2180 fn test_streamed_chunk_malformed_terminator() {
2181 assert_eq!(
2182 parse_frame(Bytes::from(";3\r\nabc")),
2183 Err(ParseError::Incomplete)
2184 );
2185 assert_eq!(
2186 parse_frame(Bytes::from(";3\r\nabcXY")),
2187 Err(ParseError::InvalidFormat)
2188 );
2189 }
2190
2191 #[test]
2192 fn test_bulk_string_size_limit() {
2193 assert_eq!(
2195 parse_frame(Bytes::from("$536870913\r\n")),
2196 Err(ParseError::BadLength)
2197 );
2198 }
2199
2200 #[test]
2201 fn test_blob_error_size_limit() {
2202 assert_eq!(
2203 parse_frame(Bytes::from("!536870913\r\n")),
2204 Err(ParseError::BadLength)
2205 );
2206 }
2207
2208 #[test]
2209 fn test_verbatim_string_size_limit() {
2210 assert_eq!(
2211 parse_frame(Bytes::from("=536870913\r\n")),
2212 Err(ParseError::BadLength)
2213 );
2214 }
2215
2216 #[test]
2217 fn test_streamed_chunk_size_limit() {
2218 assert_eq!(
2219 parse_frame(Bytes::from(";536870913\r\n")),
2220 Err(ParseError::BadLength)
2221 );
2222 }
2223
2224 #[test]
2225 fn test_invalid_double() {
2226 assert_eq!(
2227 parse_frame(Bytes::from(",foo\r\n")),
2228 Err(ParseError::InvalidFormat)
2229 );
2230 }
2231
2232 #[test]
2233 fn test_invalid_boolean() {
2234 assert_eq!(
2235 parse_frame(Bytes::from("#\r\n")),
2236 Err(ParseError::InvalidBoolean)
2237 );
2238 assert_eq!(
2239 parse_frame(Bytes::from("#true\r\n")),
2240 Err(ParseError::InvalidBoolean)
2241 );
2242 }
2243
2244 #[test]
2245 fn test_parser_clears_buffer_on_error() {
2246 let mut parser = Parser::new();
2247 parser.feed(Bytes::from("X\r\n"));
2248 assert_eq!(parser.next_frame(), Err(ParseError::InvalidTag(b'X')));
2249 assert_eq!(parser.buffered_bytes(), 0);
2250 }
2251
2252 #[test]
2253 fn test_parser_recovers_after_error() {
2254 let mut parser = Parser::new();
2255 parser.feed(Bytes::from("X\r\n"));
2256 assert!(parser.next_frame().is_err());
2257 assert_eq!(parser.buffered_bytes(), 0);
2258
2259 parser.feed(Bytes::from("+OK\r\n"));
2260 let frame = parser.next_frame().unwrap().unwrap();
2261 assert_eq!(frame, Frame::SimpleString(Bytes::from("OK")));
2262 }
2263
2264 #[test]
2265 fn test_streaming_set_roundtrip() {
2266 let data = Bytes::from("~?\r\n+a\r\n+b\r\n+c\r\n.\r\n");
2267 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2268 assert_eq!(
2269 frame,
2270 Frame::StreamedSet(vec![
2271 Frame::SimpleString(Bytes::from("a")),
2272 Frame::SimpleString(Bytes::from("b")),
2273 Frame::SimpleString(Bytes::from("c")),
2274 ])
2275 );
2276 assert!(rest.is_empty());
2277 }
2278
2279 #[test]
2280 fn test_streaming_attribute_roundtrip() {
2281 let data = Bytes::from("|?\r\n+key\r\n+val\r\n.\r\n");
2282 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2283 assert_eq!(
2284 frame,
2285 Frame::StreamedAttribute(vec![(
2286 Frame::SimpleString(Bytes::from("key")),
2287 Frame::SimpleString(Bytes::from("val")),
2288 )])
2289 );
2290 assert!(rest.is_empty());
2291 }
2292
2293 #[test]
2294 fn test_streaming_push_roundtrip() {
2295 let data = Bytes::from(">?\r\n+pubsub\r\n+channel\r\n+message\r\n.\r\n");
2296 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2297 assert_eq!(
2298 frame,
2299 Frame::StreamedPush(vec![
2300 Frame::SimpleString(Bytes::from("pubsub")),
2301 Frame::SimpleString(Bytes::from("channel")),
2302 Frame::SimpleString(Bytes::from("message")),
2303 ])
2304 );
2305 assert!(rest.is_empty());
2306 }
2307
2308 #[test]
2309 fn test_empty_streaming_containers() {
2310 let data = Bytes::from("$?\r\n;0\r\n\r\n");
2312 let (frame, _) = parse_streaming_sequence(data).unwrap();
2313 assert_eq!(frame, Frame::StreamedString(vec![]));
2314
2315 let data = Bytes::from("*?\r\n.\r\n");
2317 let (frame, _) = parse_streaming_sequence(data).unwrap();
2318 assert_eq!(frame, Frame::StreamedArray(vec![]));
2319
2320 let data = Bytes::from("~?\r\n.\r\n");
2322 let (frame, _) = parse_streaming_sequence(data).unwrap();
2323 assert_eq!(frame, Frame::StreamedSet(vec![]));
2324
2325 let data = Bytes::from("%?\r\n.\r\n");
2327 let (frame, _) = parse_streaming_sequence(data).unwrap();
2328 assert_eq!(frame, Frame::StreamedMap(vec![]));
2329 }
2330
2331 #[test]
2332 fn test_streaming_attribute_odd_elements_errors() {
2333 let data = Bytes::from("|?\r\n+key\r\n+val\r\n+orphan\r\n.\r\n");
2334 let result = parse_streaming_sequence(data);
2335 assert!(matches!(result, Err(ParseError::InvalidFormat)));
2336 }
2337
2338 #[test]
2339 fn test_streaming_blob_error_header_passthrough() {
2340 let data = Bytes::from("!?\r\n!5\r\nERROR\r\n");
2342 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2343 assert_eq!(frame, Frame::StreamedBlobErrorHeader);
2344 assert!(!rest.is_empty());
2346 }
2347
2348 #[test]
2349 fn test_streaming_verbatim_header_passthrough() {
2350 let data = Bytes::from("=?\r\n=9\r\ntxt:hello\r\n");
2352 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2353 assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
2354 assert!(!rest.is_empty());
2355 }
2356}