1use bytes::{BufMut, Bytes, BytesMut};
29
30const MAX_COLLECTION_SIZE: usize = 10_000_000;
32
33const MAX_BULK_STRING_SIZE: usize = 512 * 1024 * 1024;
35
36#[derive(Default, Debug)]
41pub struct Parser {
42 buffer: BytesMut,
43}
44
45impl Parser {
46 pub fn new() -> Self {
48 Self {
49 buffer: BytesMut::new(),
50 }
51 }
52
53 pub fn feed(&mut self, data: Bytes) {
57 self.buffer.extend_from_slice(&data);
58 }
59
60 pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
66 if self.buffer.is_empty() {
67 return Ok(None);
68 }
69
70 let bytes = self.buffer.split().freeze();
71
72 match parse_frame_inner(&bytes, 0) {
73 Ok((frame, consumed)) => {
74 if consumed < bytes.len() {
75 self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
76 }
77 Ok(Some(frame))
78 }
79 Err(ParseError::Incomplete) => {
80 self.buffer.unsplit(bytes.into());
81 Ok(None)
82 }
83 Err(e) => {
84 Err(e)
87 }
88 }
89 }
90
91 pub fn buffered_bytes(&self) -> usize {
93 self.buffer.len()
94 }
95
96 pub fn clear(&mut self) {
98 self.buffer.clear();
99 }
100}
101
102#[derive(Debug, Clone, PartialEq)]
108pub enum Frame {
109 SimpleString(Bytes),
111 Error(Bytes),
113 Integer(i64),
115 BulkString(Option<Bytes>),
118 BlobError(Bytes),
120 StreamedStringHeader,
122 StreamedBlobErrorHeader,
124 StreamedVerbatimStringHeader,
126 StreamedArrayHeader,
128 StreamedSetHeader,
130 StreamedMapHeader,
132 StreamedAttributeHeader,
134 StreamedPushHeader,
136 StreamedStringChunk(Bytes),
152
153 StreamedString(Vec<Bytes>),
171
172 StreamedArray(Vec<Frame>),
190
191 StreamedSet(Vec<Frame>),
196
197 StreamedMap(Vec<(Frame, Frame)>),
213
214 StreamedAttribute(Vec<(Frame, Frame)>),
219
220 StreamedPush(Vec<Frame>),
238 StreamTerminator,
240 Null,
242 Double(f64),
244 SpecialFloat(Bytes),
246 Boolean(bool),
248 BigNumber(Bytes),
250 VerbatimString(Bytes, Bytes),
253 Array(Option<Vec<Frame>>),
255 Set(Vec<Frame>),
257 Map(Vec<(Frame, Frame)>),
259 Attribute(Vec<(Frame, Frame)>),
261 Push(Vec<Frame>),
263}
264
265pub use crate::ParseError;
266
267pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
271 let (frame, consumed) = parse_frame_inner(&input, 0)?;
272 Ok((frame, input.slice(consumed..)))
273}
274
275#[inline(never)]
278fn parse_blob_bounds(
279 buf: &[u8],
280 after_crlf: usize,
281 len: usize,
282) -> Result<(usize, usize), ParseError> {
283 if len == 0 {
284 if after_crlf + 1 >= buf.len() {
285 return Err(ParseError::Incomplete);
286 }
287 if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
288 return Ok((after_crlf, after_crlf));
289 } else {
290 return Err(ParseError::InvalidFormat);
291 }
292 }
293 let data_start = after_crlf;
294 let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
295 if data_end + 1 >= buf.len() {
296 return Err(ParseError::Incomplete);
297 }
298 if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
299 return Err(ParseError::InvalidFormat);
300 }
301 Ok((data_start, data_end))
302}
303
304fn parse_bulk_string(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
306 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
307 let len_bytes = &buf[pos + 1..line_end];
308 if len_bytes == b"?" {
309 return Ok((Frame::StreamedStringHeader, after_crlf));
310 }
311 if len_bytes == b"-1" {
312 return Ok((Frame::BulkString(None), after_crlf));
313 }
314 let len = parse_usize(len_bytes)?;
315 if len > MAX_BULK_STRING_SIZE {
316 return Err(ParseError::BadLength);
317 }
318 let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
319 if data_start == data_end {
320 Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2))
321 } else {
322 Ok((
323 Frame::BulkString(Some(input.slice(data_start..data_end))),
324 data_end + 2,
325 ))
326 }
327}
328
329#[inline(never)]
331fn parse_double_frame(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
332 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
333 let line_bytes = &buf[pos + 1..line_end];
334 if line_bytes == b"inf" || line_bytes == b"-inf" || line_bytes == b"nan" {
335 return Ok((
336 Frame::SpecialFloat(input.slice(pos + 1..line_end)),
337 after_crlf,
338 ));
339 }
340 let s = std::str::from_utf8(line_bytes).map_err(|_| ParseError::Utf8Error)?;
341 let v = s.parse::<f64>().map_err(|_| ParseError::InvalidFormat)?;
342 if v.is_infinite() || v.is_nan() {
343 let canonical = if v.is_nan() {
344 "nan"
345 } else if v.is_sign_negative() {
346 "-inf"
347 } else {
348 "inf"
349 };
350 return Ok((Frame::SpecialFloat(Bytes::from(canonical)), after_crlf));
351 }
352 Ok((Frame::Double(v), after_crlf))
353}
354
355#[inline(never)]
357fn parse_verbatim(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
358 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
359 let len_bytes = &buf[pos + 1..line_end];
360 if len_bytes == b"?" {
361 return Ok((Frame::StreamedVerbatimStringHeader, after_crlf));
362 }
363 if len_bytes == b"-1" {
364 return Err(ParseError::BadLength);
365 }
366 let len = parse_usize(len_bytes)?;
367 if len > MAX_BULK_STRING_SIZE {
368 return Err(ParseError::BadLength);
369 }
370 let data_start = after_crlf;
371 let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
372 if data_end + 1 >= buf.len() {
373 return Err(ParseError::Incomplete);
374 }
375 if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
376 return Err(ParseError::InvalidFormat);
377 }
378 let sep = buf[data_start..data_end]
379 .iter()
380 .position(|&b| b == b':')
381 .ok_or(ParseError::InvalidFormat)?;
382 if sep != 3 {
383 return Err(ParseError::InvalidFormat);
384 }
385 let format = input.slice(data_start..data_start + sep);
386 let content = input.slice(data_start + sep + 1..data_end);
387 Ok((Frame::VerbatimString(format, content), data_end + 2))
388}
389
390#[inline(never)]
392fn parse_blob_error(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
393 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
394 let len_bytes = &buf[pos + 1..line_end];
395 if len_bytes == b"?" {
396 return Ok((Frame::StreamedBlobErrorHeader, after_crlf));
397 }
398 if len_bytes == b"-1" {
399 return Err(ParseError::BadLength);
400 }
401 let len = parse_usize(len_bytes)?;
402 if len > MAX_BULK_STRING_SIZE {
403 return Err(ParseError::BadLength);
404 }
405 let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
406 if data_start == data_end {
407 Ok((Frame::BlobError(Bytes::new()), after_crlf + 2))
408 } else {
409 Ok((
410 Frame::BlobError(input.slice(data_start..data_end)),
411 data_end + 2,
412 ))
413 }
414}
415
416#[inline(never)]
418fn parse_collection(
419 input: &Bytes,
420 buf: &[u8],
421 pos: usize,
422 tag: u8,
423) -> Result<(Frame, usize), ParseError> {
424 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
425 let len_bytes = &buf[pos + 1..line_end];
426
427 if len_bytes == b"?" {
429 return match tag {
430 b'*' => Ok((Frame::StreamedArrayHeader, after_crlf)),
431 b'~' => Ok((Frame::StreamedSetHeader, after_crlf)),
432 b'>' => Ok((Frame::StreamedPushHeader, after_crlf)),
433 _ => unreachable!(),
434 };
435 }
436 if tag == b'*' && len_bytes == b"-1" {
438 return Ok((Frame::Array(None), after_crlf));
439 }
440 let count = parse_count(len_bytes)?;
441 if count == 0 {
442 return match tag {
443 b'*' => Ok((Frame::Array(Some(Vec::new())), after_crlf)),
444 b'~' => Ok((Frame::Set(Vec::new()), after_crlf)),
445 b'>' => Ok((Frame::Push(Vec::new()), after_crlf)),
446 _ => unreachable!(),
447 };
448 }
449 let mut cursor = after_crlf;
450 let mut items = Vec::with_capacity(count);
451 for _ in 0..count {
452 let (item, next) = parse_frame_inner(input, cursor)?;
453 items.push(item);
454 cursor = next;
455 }
456 match tag {
457 b'*' => Ok((Frame::Array(Some(items)), cursor)),
458 b'~' => Ok((Frame::Set(items), cursor)),
459 b'>' => Ok((Frame::Push(items), cursor)),
460 _ => unreachable!(),
461 }
462}
463
464#[inline(never)]
466fn parse_pairs(
467 input: &Bytes,
468 buf: &[u8],
469 pos: usize,
470 tag: u8,
471) -> Result<(Frame, usize), ParseError> {
472 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
473 let len_bytes = &buf[pos + 1..line_end];
474 if len_bytes == b"?" {
475 return if tag == b'%' {
476 Ok((Frame::StreamedMapHeader, after_crlf))
477 } else {
478 Ok((Frame::StreamedAttributeHeader, after_crlf))
479 };
480 }
481 let count = parse_count(len_bytes)?;
482 let mut cursor = after_crlf;
483 let mut pairs = Vec::with_capacity(count);
484 for _ in 0..count {
485 let (key, next1) = parse_frame_inner(input, cursor)?;
486 let (val, next2) = parse_frame_inner(input, next1)?;
487 pairs.push((key, val));
488 cursor = next2;
489 }
490 if tag == b'%' {
491 Ok((Frame::Map(pairs), cursor))
492 } else {
493 Ok((Frame::Attribute(pairs), cursor))
494 }
495}
496
497#[inline(never)]
499fn parse_streamed_chunk(
500 input: &Bytes,
501 buf: &[u8],
502 pos: usize,
503) -> Result<(Frame, usize), ParseError> {
504 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
505 let len = parse_usize(&buf[pos + 1..line_end])?;
506 if len > MAX_BULK_STRING_SIZE {
507 return Err(ParseError::BadLength);
508 }
509 let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
510 if data_start == data_end {
511 Ok((Frame::StreamedStringChunk(Bytes::new()), after_crlf + 2))
512 } else {
513 Ok((
514 Frame::StreamedStringChunk(input.slice(data_start..data_end)),
515 data_end + 2,
516 ))
517 }
518}
519
520fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
524 let buf = input.as_ref();
525 if pos >= buf.len() {
526 return Err(ParseError::Incomplete);
527 }
528
529 let tag = buf[pos];
530
531 match tag {
532 b'+' => {
534 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
535 Ok((
536 Frame::SimpleString(input.slice(pos + 1..line_end)),
537 after_crlf,
538 ))
539 }
540 b'-' => {
541 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
542 Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
543 }
544 b':' => {
545 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
546 let v = parse_i64(&buf[pos + 1..line_end])?;
547 Ok((Frame::Integer(v), after_crlf))
548 }
549 b'#' => {
550 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
551 match &buf[pos + 1..line_end] {
552 b"t" => Ok((Frame::Boolean(true), after_crlf)),
553 b"f" => Ok((Frame::Boolean(false), after_crlf)),
554 _ => Err(ParseError::InvalidBoolean),
555 }
556 }
557 b'(' => {
558 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
559 Ok((Frame::BigNumber(input.slice(pos + 1..line_end)), after_crlf))
560 }
561 b'_' => {
562 if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
563 Ok((Frame::Null, pos + 3))
564 } else {
565 Err(ParseError::Incomplete)
566 }
567 }
568 b'.' => {
569 if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
570 Ok((Frame::StreamTerminator, pos + 3))
571 } else {
572 Err(ParseError::Incomplete)
573 }
574 }
575
576 b'$' => parse_bulk_string(input, buf, pos),
578 b',' => parse_double_frame(input, buf, pos),
579 b'=' => parse_verbatim(input, buf, pos),
580 b'!' => parse_blob_error(input, buf, pos),
581 b';' => parse_streamed_chunk(input, buf, pos),
582
583 b'*' | b'~' | b'>' => parse_collection(input, buf, pos, tag),
585 b'%' | b'|' => parse_pairs(input, buf, pos, tag),
586
587 _ => Err(ParseError::InvalidTag(tag)),
588 }
589}
590
591pub fn parse_streaming_sequence(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
671 if input.is_empty() {
672 return Err(ParseError::Incomplete);
673 }
674
675 let (header, mut rest) = parse_frame(input)?;
676
677 match header {
678 Frame::StreamedStringHeader => {
679 let mut chunks = Vec::new();
681
682 loop {
683 let (frame, new_rest) = parse_frame(rest)?;
684 rest = new_rest;
685
686 match frame {
687 Frame::StreamedStringChunk(chunk) => {
688 if chunk.is_empty() {
689 break;
691 }
692 chunks.push(chunk);
693 }
694 _ => {
695 return Err(ParseError::InvalidFormat);
696 }
697 }
698 }
699
700 Ok((Frame::StreamedString(chunks), rest))
701 }
702 Frame::StreamedBlobErrorHeader | Frame::StreamedVerbatimStringHeader => {
703 Ok((header, rest))
706 }
707 Frame::StreamedArrayHeader => {
708 let mut items = Vec::new();
710
711 loop {
712 let (frame, new_rest) = parse_frame(rest)?;
713 rest = new_rest;
714
715 match frame {
716 Frame::StreamTerminator => {
717 break;
718 }
719 item => {
720 items.push(item);
721 }
722 }
723 }
724
725 Ok((Frame::StreamedArray(items), rest))
726 }
727 Frame::StreamedSetHeader => {
728 let mut items = Vec::new();
730
731 loop {
732 let (frame, new_rest) = parse_frame(rest)?;
733 rest = new_rest;
734
735 match frame {
736 Frame::StreamTerminator => {
737 break;
738 }
739 item => {
740 items.push(item);
741 }
742 }
743 }
744
745 Ok((Frame::StreamedSet(items), rest))
746 }
747 Frame::StreamedMapHeader => {
748 let mut pairs = Vec::new();
750
751 loop {
752 let (frame, new_rest) = parse_frame(rest)?;
753 rest = new_rest;
754
755 match frame {
756 Frame::StreamTerminator => {
757 break;
758 }
759 key => {
760 let (value, newer_rest) = parse_frame(rest)?;
761 if matches!(value, Frame::StreamTerminator) {
762 return Err(ParseError::InvalidFormat);
763 }
764 rest = newer_rest;
765 pairs.push((key, value));
766 }
767 }
768 }
769
770 Ok((Frame::StreamedMap(pairs), rest))
771 }
772 Frame::StreamedAttributeHeader => {
773 let mut pairs = Vec::new();
775
776 loop {
777 let (frame, new_rest) = parse_frame(rest)?;
778 rest = new_rest;
779
780 match frame {
781 Frame::StreamTerminator => {
782 break;
783 }
784 key => {
785 let (value, newer_rest) = parse_frame(rest)?;
786 if matches!(value, Frame::StreamTerminator) {
787 return Err(ParseError::InvalidFormat);
788 }
789 rest = newer_rest;
790 pairs.push((key, value));
791 }
792 }
793 }
794
795 Ok((Frame::StreamedAttribute(pairs), rest))
796 }
797 Frame::StreamedPushHeader => {
798 let mut items = Vec::new();
800
801 loop {
802 let (frame, new_rest) = parse_frame(rest)?;
803 rest = new_rest;
804
805 match frame {
806 Frame::StreamTerminator => {
807 break;
808 }
809 item => {
810 items.push(item);
811 }
812 }
813 }
814
815 Ok((Frame::StreamedPush(items), rest))
816 }
817 _ => {
818 Ok((header, rest))
820 }
821 }
822}
823
824#[inline]
827fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
828 let mut i = from;
829 let len = buf.len();
830 while i + 1 < len {
831 if buf[i] == b'\r' && buf[i + 1] == b'\n' {
832 return Ok((i, i + 2));
833 }
834 i += 1;
835 }
836 Err(ParseError::Incomplete)
837}
838
839#[inline]
841fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
842 if buf.is_empty() {
843 return Err(ParseError::BadLength);
844 }
845 let mut v: usize = 0;
846 for &b in buf {
847 if !b.is_ascii_digit() {
848 return Err(ParseError::BadLength);
849 }
850 v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
851 v = v
852 .checked_add((b - b'0') as usize)
853 .ok_or(ParseError::BadLength)?;
854 }
855 Ok(v)
856}
857
858#[inline]
860fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
861 if buf.is_empty() {
862 return Err(ParseError::InvalidFormat);
863 }
864 let (neg, digits) = if buf[0] == b'-' {
865 (true, &buf[1..])
866 } else {
867 (false, buf)
868 };
869 if digits.is_empty() {
870 return Err(ParseError::InvalidFormat);
871 }
872 let mut v: i64 = 0;
873 for (i, &d) in digits.iter().enumerate() {
874 if !d.is_ascii_digit() {
875 return Err(ParseError::InvalidFormat);
876 }
877 let digit = (d - b'0') as i64;
878 if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
879 return Ok(i64::MIN);
880 }
881 if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
882 return Err(ParseError::Overflow);
883 }
884 v = v * 10 + digit;
885 }
886 if neg { Ok(-v) } else { Ok(v) }
887}
888
889#[inline]
891fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
892 let count = parse_usize(buf)?;
893 if count > MAX_COLLECTION_SIZE {
894 return Err(ParseError::BadLength);
895 }
896 Ok(count)
897}
898
899pub fn frame_to_bytes(frame: &Frame) -> Bytes {
903 let mut buf = BytesMut::new();
904 serialize_frame(frame, &mut buf);
905 buf.freeze()
906}
907
908fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
909 match frame {
910 Frame::SimpleString(s) => {
911 buf.put_u8(b'+');
912 buf.extend_from_slice(s);
913 buf.extend_from_slice(b"\r\n");
914 }
915 Frame::Error(e) => {
916 buf.put_u8(b'-');
917 buf.extend_from_slice(e);
918 buf.extend_from_slice(b"\r\n");
919 }
920 Frame::Integer(i) => {
921 buf.put_u8(b':');
922 let s = i.to_string();
923 buf.extend_from_slice(s.as_bytes());
924 buf.extend_from_slice(b"\r\n");
925 }
926 Frame::BulkString(opt) => {
927 buf.put_u8(b'$');
928 match opt {
929 Some(data) => {
930 let len = data.len().to_string();
931 buf.extend_from_slice(len.as_bytes());
932 buf.extend_from_slice(b"\r\n");
933 buf.extend_from_slice(data);
934 buf.extend_from_slice(b"\r\n");
935 }
936 None => {
937 buf.extend_from_slice(b"-1\r\n");
938 }
939 }
940 }
941 Frame::BlobError(data) => {
942 buf.put_u8(b'!');
943 let len = data.len().to_string();
944 buf.extend_from_slice(len.as_bytes());
945 buf.extend_from_slice(b"\r\n");
946 buf.extend_from_slice(data);
947 buf.extend_from_slice(b"\r\n");
948 }
949 Frame::StreamedStringHeader => {
950 buf.extend_from_slice(b"$?\r\n");
951 }
952 Frame::StreamedBlobErrorHeader => {
953 buf.extend_from_slice(b"!?\r\n");
954 }
955 Frame::StreamedVerbatimStringHeader => {
956 buf.extend_from_slice(b"=?\r\n");
957 }
958 Frame::StreamedArrayHeader => {
959 buf.extend_from_slice(b"*?\r\n");
960 }
961 Frame::StreamedSetHeader => {
962 buf.extend_from_slice(b"~?\r\n");
963 }
964 Frame::StreamedMapHeader => {
965 buf.extend_from_slice(b"%?\r\n");
966 }
967 Frame::StreamedAttributeHeader => {
968 buf.extend_from_slice(b"|?\r\n");
969 }
970 Frame::StreamedPushHeader => {
971 buf.extend_from_slice(b">?\r\n");
972 }
973 Frame::StreamedStringChunk(data) => {
974 buf.put_u8(b';');
975 let len = data.len().to_string();
976 buf.extend_from_slice(len.as_bytes());
977 buf.extend_from_slice(b"\r\n");
978 buf.extend_from_slice(data);
979 buf.extend_from_slice(b"\r\n");
980 }
981 Frame::StreamedString(chunks) => {
982 buf.extend_from_slice(b"$?\r\n");
984 for chunk in chunks {
985 buf.put_u8(b';');
986 let len = chunk.len().to_string();
987 buf.extend_from_slice(len.as_bytes());
988 buf.extend_from_slice(b"\r\n");
989 buf.extend_from_slice(chunk);
990 buf.extend_from_slice(b"\r\n");
991 }
992 buf.extend_from_slice(b";0\r\n\r\n");
993 }
994 Frame::StreamedArray(items) => {
995 buf.extend_from_slice(b"*?\r\n");
996 for item in items {
997 serialize_frame(item, buf);
998 }
999 buf.extend_from_slice(b".\r\n");
1000 }
1001 Frame::StreamedSet(items) => {
1002 buf.extend_from_slice(b"~?\r\n");
1003 for item in items {
1004 serialize_frame(item, buf);
1005 }
1006 buf.extend_from_slice(b".\r\n");
1007 }
1008 Frame::StreamedMap(pairs) => {
1009 buf.extend_from_slice(b"%?\r\n");
1010 for (key, value) in pairs {
1011 serialize_frame(key, buf);
1012 serialize_frame(value, buf);
1013 }
1014 buf.extend_from_slice(b".\r\n");
1015 }
1016 Frame::StreamedAttribute(pairs) => {
1017 buf.extend_from_slice(b"|?\r\n");
1018 for (key, value) in pairs {
1019 serialize_frame(key, buf);
1020 serialize_frame(value, buf);
1021 }
1022 buf.extend_from_slice(b".\r\n");
1023 }
1024 Frame::StreamedPush(items) => {
1025 buf.extend_from_slice(b">?\r\n");
1026 for item in items {
1027 serialize_frame(item, buf);
1028 }
1029 buf.extend_from_slice(b".\r\n");
1030 }
1031 Frame::StreamTerminator => {
1032 buf.extend_from_slice(b".\r\n");
1033 }
1034 Frame::Null => {
1035 buf.extend_from_slice(b"_\r\n");
1036 }
1037 Frame::Double(d) => {
1038 buf.put_u8(b',');
1039 let s = d.to_string();
1040 buf.extend_from_slice(s.as_bytes());
1041 buf.extend_from_slice(b"\r\n");
1042 }
1043 Frame::SpecialFloat(f) => {
1044 buf.put_u8(b',');
1045 buf.extend_from_slice(f);
1046 buf.extend_from_slice(b"\r\n");
1047 }
1048 Frame::Boolean(b) => {
1049 buf.extend_from_slice(if *b { b"#t\r\n" } else { b"#f\r\n" });
1050 }
1051 Frame::BigNumber(n) => {
1052 buf.put_u8(b'(');
1053 buf.extend_from_slice(n);
1054 buf.extend_from_slice(b"\r\n");
1055 }
1056 Frame::VerbatimString(format, content) => {
1057 buf.put_u8(b'=');
1058 let total_len = format.len() + 1 + content.len(); let len = total_len.to_string();
1060 buf.extend_from_slice(len.as_bytes());
1061 buf.extend_from_slice(b"\r\n");
1062 buf.extend_from_slice(format);
1063 buf.put_u8(b':');
1064 buf.extend_from_slice(content);
1065 buf.extend_from_slice(b"\r\n");
1066 }
1067 Frame::Array(opt) => {
1068 buf.put_u8(b'*');
1069 match opt {
1070 Some(items) => {
1071 let len = items.len().to_string();
1072 buf.extend_from_slice(len.as_bytes());
1073 buf.extend_from_slice(b"\r\n");
1074 for item in items {
1075 serialize_frame(item, buf);
1076 }
1077 }
1078 None => {
1079 buf.extend_from_slice(b"-1\r\n");
1080 }
1081 }
1082 }
1083 Frame::Set(items) => {
1084 buf.put_u8(b'~');
1085 let len = items.len().to_string();
1086 buf.extend_from_slice(len.as_bytes());
1087 buf.extend_from_slice(b"\r\n");
1088 for item in items {
1089 serialize_frame(item, buf);
1090 }
1091 }
1092 Frame::Map(pairs) => {
1093 buf.put_u8(b'%');
1094 let len = pairs.len().to_string();
1095 buf.extend_from_slice(len.as_bytes());
1096 buf.extend_from_slice(b"\r\n");
1097 for (key, value) in pairs {
1098 serialize_frame(key, buf);
1099 serialize_frame(value, buf);
1100 }
1101 }
1102 Frame::Attribute(pairs) => {
1103 buf.put_u8(b'|');
1104 let len = pairs.len().to_string();
1105 buf.extend_from_slice(len.as_bytes());
1106 buf.extend_from_slice(b"\r\n");
1107 for (key, value) in pairs {
1108 serialize_frame(key, buf);
1109 serialize_frame(value, buf);
1110 }
1111 }
1112 Frame::Push(items) => {
1113 buf.put_u8(b'>');
1114 let len = items.len().to_string();
1115 buf.extend_from_slice(len.as_bytes());
1116 buf.extend_from_slice(b"\r\n");
1117 for item in items {
1118 serialize_frame(item, buf);
1119 }
1120 }
1121 }
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126 use super::{Frame, ParseError, Parser, frame_to_bytes, parse_frame, parse_streaming_sequence};
1127 use bytes::Bytes;
1128
1129 #[test]
1130 fn test_parse_frame_simple_string() {
1131 let input = Bytes::from("+HELLO\r\nWORLD");
1132 let (frame, rest) = parse_frame(input.clone()).unwrap();
1133 assert_eq!(frame, Frame::SimpleString(Bytes::from("HELLO")));
1134 assert_eq!(rest, Bytes::from("WORLD"));
1135 }
1136
1137 #[test]
1138 fn test_parse_frame_blob_error() {
1139 let input = Bytes::from("!5\r\nERROR\r\nREST");
1140 let (frame, rest) = parse_frame(input.clone()).unwrap();
1141 assert_eq!(frame, Frame::BlobError(Bytes::from("ERROR")));
1142 assert_eq!(rest, Bytes::from("REST"));
1143 }
1144
1145 #[test]
1146 fn test_parse_frame_error() {
1147 let input = Bytes::from("-ERR fail\r\nLEFT");
1148 let (frame, rest) = parse_frame(input.clone()).unwrap();
1149 assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
1150 assert_eq!(rest, Bytes::from("LEFT"));
1151 }
1152
1153 #[test]
1154 fn test_parse_frame_integer() {
1155 let input = Bytes::from(":42\r\nTAIL");
1156 let (frame, rest) = parse_frame(input.clone()).unwrap();
1157 assert_eq!(frame, Frame::Integer(42));
1158 assert_eq!(rest, Bytes::from("TAIL"));
1159 }
1160
1161 #[test]
1162 fn test_parse_frame_bulk_string() {
1163 let input = Bytes::from("$3\r\nfoo\r\nREST");
1164 let (frame, rest) = parse_frame(input.clone()).unwrap();
1165 assert_eq!(frame, Frame::BulkString(Some(Bytes::from("foo"))));
1166 assert_eq!(rest, Bytes::from("REST"));
1167 let null_input = Bytes::from("$-1\r\nAFTER");
1168 let (frame, rest) = parse_frame(null_input.clone()).unwrap();
1169 assert_eq!(frame, Frame::BulkString(None));
1170 assert_eq!(rest, Bytes::from("AFTER"));
1171 }
1172
1173 #[test]
1174 fn test_parse_frame_null() {
1175 let input = Bytes::from("_\r\nLEFT");
1176 let (frame, rest) = parse_frame(input.clone()).unwrap();
1177 assert_eq!(frame, Frame::Null);
1178 assert_eq!(rest, Bytes::from("LEFT"));
1179 }
1180
1181 #[test]
1182 fn test_parse_frame_double_and_special_float() {
1183 let input = Bytes::from(",3.5\r\nNEXT");
1184 let (frame, rest) = parse_frame(input.clone()).unwrap();
1185 assert_eq!(frame, Frame::Double(3.5));
1186 assert_eq!(rest, Bytes::from("NEXT"));
1187 let input_inf = Bytes::from(",inf\r\nTAIL");
1188 let (frame, rest) = parse_frame(input_inf.clone()).unwrap();
1189 assert_eq!(frame, Frame::SpecialFloat(Bytes::from("inf")));
1190 assert_eq!(rest, Bytes::from("TAIL"));
1191 }
1192
1193 #[test]
1194 fn test_parse_frame_boolean() {
1195 let input_true = Bytes::from("#t\r\nXYZ");
1196 let (frame, rest) = parse_frame(input_true.clone()).unwrap();
1197 assert_eq!(frame, Frame::Boolean(true));
1198 assert_eq!(rest, Bytes::from("XYZ"));
1199 let input_false = Bytes::from("#f\r\nDONE");
1200 let (frame, rest) = parse_frame(input_false.clone()).unwrap();
1201 assert_eq!(frame, Frame::Boolean(false));
1202 assert_eq!(rest, Bytes::from("DONE"));
1203 }
1204
1205 #[test]
1206 fn test_parse_frame_big_number() {
1207 let input = Bytes::from("(123456789\r\nEND");
1208 let (frame, rest) = parse_frame(input.clone()).unwrap();
1209 assert_eq!(frame, Frame::BigNumber(Bytes::from("123456789")));
1210 assert_eq!(rest, Bytes::from("END"));
1211 }
1212
1213 #[test]
1214 fn test_parse_frame_verbatim_string() {
1215 let input = Bytes::from("=12\r\ntxt:hi there\r\nAFTER");
1216 let (frame, rest) = parse_frame(input.clone()).unwrap();
1217 assert_eq!(
1218 frame,
1219 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hi there")) );
1224 assert_eq!(rest, Bytes::from("AFTER"));
1225 }
1226
1227 #[test]
1228 fn test_parse_frame_array_set_push_map_attribute() {
1229 let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\nTAIL");
1231 let (frame, rest) = parse_frame(input.clone()).unwrap();
1232 assert_eq!(
1233 frame,
1234 Frame::Array(Some(vec![
1235 Frame::BulkString(Some(Bytes::from("foo"))),
1236 Frame::BulkString(Some(Bytes::from("bar")))
1237 ]))
1238 );
1239 assert_eq!(rest, Bytes::from("TAIL"));
1240 let input_null = Bytes::from("*-1\r\nEND");
1242 let (frame, rest) = parse_frame(input_null.clone()).unwrap();
1243 assert_eq!(frame, Frame::Array(None));
1244 assert_eq!(rest, Bytes::from("END"));
1245 let input_set = Bytes::from("~2\r\n+foo\r\n+bar\r\nTAIL");
1247 let (frame, rest) = parse_frame(input_set.clone()).unwrap();
1248 assert_eq!(
1249 frame,
1250 Frame::Set(vec![
1251 Frame::SimpleString(Bytes::from("foo")),
1252 Frame::SimpleString(Bytes::from("bar")),
1253 ])
1254 );
1255 assert_eq!(rest, Bytes::from("TAIL"));
1256 let input_map = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\nTRAIL");
1258 let (frame, rest) = parse_frame(input_map.clone()).unwrap();
1259 assert_eq!(
1260 frame,
1261 Frame::Map(vec![
1262 (
1263 Frame::SimpleString(Bytes::from("key1")),
1264 Frame::SimpleString(Bytes::from("val1"))
1265 ),
1266 (
1267 Frame::SimpleString(Bytes::from("key2")),
1268 Frame::SimpleString(Bytes::from("val2"))
1269 ),
1270 ])
1271 );
1272 assert_eq!(rest, Bytes::from("TRAIL"));
1273 let input_attr = Bytes::from("|1\r\n+meta\r\n+data\r\nAFTER");
1275 let (frame, rest) = parse_frame(input_attr.clone()).unwrap();
1276 assert_eq!(
1277 frame,
1278 Frame::Attribute(vec![(
1279 Frame::SimpleString(Bytes::from("meta")),
1280 Frame::SimpleString(Bytes::from("data"))
1281 ),])
1282 );
1283 assert_eq!(rest, Bytes::from("AFTER"));
1284 let input_push = Bytes::from(">2\r\n+type\r\n:1\r\nNEXT");
1286 let (frame, rest) = parse_frame(input_push.clone()).unwrap();
1287 assert_eq!(
1288 frame,
1289 Frame::Push(vec![
1290 Frame::SimpleString(Bytes::from("type")),
1291 Frame::Integer(1),
1292 ])
1293 );
1294 assert_eq!(rest, Bytes::from("NEXT"));
1295 }
1296
1297 #[test]
1298 fn test_parse_frame_empty_input() {
1299 assert!(parse_frame(Bytes::new()).is_err());
1300 }
1301
1302 #[test]
1303 fn test_parse_frame_invalid_tag() {
1304 let input = Bytes::from("X123\r\n");
1305 assert!(parse_frame(input).is_err());
1306 }
1307
1308 #[test]
1309 fn test_parse_frame_malformed_bulk_length() {
1310 let input = Bytes::from("$x\r\nfoo\r\n");
1311 assert!(parse_frame(input).is_err());
1312 }
1313
1314 #[test]
1315 fn test_parse_frame_zero_length_bulk() {
1316 let input = Bytes::from("$0\r\n\r\nTAIL");
1317 let (frame, rest) = parse_frame(input.clone()).unwrap();
1318 assert_eq!(frame, Frame::BulkString(Some(Bytes::from(""))));
1319 assert_eq!(rest, Bytes::from("TAIL"));
1320 }
1321
1322 #[test]
1323 fn test_parse_frame_zero_length_blob_error() {
1324 let input = Bytes::from("!0\r\n\r\nREST");
1325 let (frame, rest) = parse_frame(input.clone()).unwrap();
1326 assert_eq!(frame, Frame::BlobError(Bytes::new()));
1327 assert_eq!(rest, Bytes::from("REST"));
1328 }
1329
1330 #[test]
1331 fn test_parse_frame_missing_crlf() {
1332 let input = Bytes::from(":42\nTAIL");
1333 assert!(parse_frame(input).is_err());
1334 }
1335
1336 #[test]
1337 fn test_parse_frame_unicode_simple_string() {
1338 let input = Bytes::from("+こんにちは\r\nEND");
1339 let (frame, rest) = parse_frame(input.clone()).unwrap();
1340 assert_eq!(frame, Frame::SimpleString(Bytes::from("こんにちは")));
1341 assert_eq!(rest, Bytes::from("END"));
1342 }
1343
1344 #[test]
1345 fn test_parse_frame_chained_frames() {
1346 let combined = Bytes::from("+OK\r\n:1\r\nfoo");
1347 let (f1, rem) = parse_frame(combined.clone()).unwrap();
1348 assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
1349 let (f2, rem2) = parse_frame(rem).unwrap();
1350 assert_eq!(f2, Frame::Integer(1));
1351 assert_eq!(rem2, Bytes::from("foo"));
1352 }
1353
1354 #[test]
1355 fn test_parse_frame_empty_array() {
1356 let input = Bytes::from("*0\r\nTAIL");
1357 let (frame, rest) = parse_frame(input.clone()).unwrap();
1358 assert_eq!(frame, Frame::Array(Some(vec![])));
1359 assert_eq!(rest, Bytes::from("TAIL"));
1360 }
1361
1362 #[test]
1363 fn test_parse_frame_partial_array_data() {
1364 let input = Bytes::from("*2\r\n+OK\r\n");
1365 assert!(parse_frame(input).is_err());
1366 }
1367
1368 #[test]
1369 fn test_parse_frame_streamed_string() {
1370 let input = Bytes::from("$?\r\n$5\r\nhello\r\n$0\r\n\r\nREST");
1371 let (frame, rem) = parse_frame(input.clone()).unwrap();
1372 assert_eq!(frame, Frame::StreamedStringHeader);
1373 let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1374 assert_eq!(chunk, Frame::BulkString(Some(Bytes::from("hello"))));
1375 let (terminator, rest) = parse_frame(rem2.clone()).unwrap();
1376 assert_eq!(terminator, Frame::BulkString(Some(Bytes::from(""))));
1377 assert_eq!(rest, Bytes::from("REST"));
1378 }
1379
1380 #[test]
1381 fn test_parse_frame_streamed_blob_error() {
1382 let input = Bytes::from("!?\r\n!5\r\nERROR\r\nREST");
1383 let (frame, rem) = parse_frame(input.clone()).unwrap();
1384 assert_eq!(frame, Frame::StreamedBlobErrorHeader);
1385 let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1386 assert_eq!(chunk, Frame::BlobError(Bytes::from("ERROR")));
1387 assert_eq!(rem2, Bytes::from("REST"));
1388 }
1389
1390 #[test]
1391 fn test_parse_frame_streamed_verbatim_string() {
1392 let input = Bytes::from("=?\r\n=9\r\ntxt:hello\r\nTAIL");
1393 let (frame, rem) = parse_frame(input.clone()).unwrap();
1394 assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
1395 let (chunk, rest) = parse_frame(rem.clone()).unwrap();
1396 assert_eq!(
1397 chunk,
1398 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello")) );
1403 assert_eq!(rest, Bytes::from("TAIL"));
1404 }
1405
1406 #[test]
1407 fn test_parse_frame_streamed_array() {
1408 let input = Bytes::from("*?\r\n+one\r\n+two\r\n*0\r\nEND");
1409 let (header, rem) = parse_frame(input.clone()).unwrap();
1410 assert_eq!(header, Frame::StreamedArrayHeader);
1411 let (item1, rem2) = parse_frame(rem.clone()).unwrap();
1412 assert_eq!(item1, Frame::SimpleString(Bytes::from("one")));
1413 let (item2, rem3) = parse_frame(rem2.clone()).unwrap();
1414 assert_eq!(item2, Frame::SimpleString(Bytes::from("two")));
1415 let (terminator, rest) = parse_frame(rem3.clone()).unwrap();
1416 assert_eq!(terminator, Frame::Array(Some(vec![])));
1417 assert_eq!(rest, Bytes::from("END"));
1418 }
1419
1420 #[test]
1421 fn test_parse_frame_streamed_set_map_attr_push() {
1422 let input_set = Bytes::from("~?\r\n+foo\r\n+bar\r\n~0\r\nTAIL");
1424 let (h_set, rem0) = parse_frame(input_set.clone()).unwrap();
1425 assert_eq!(h_set, Frame::StreamedSetHeader);
1426 let (s1, rem1) = parse_frame(rem0.clone()).unwrap();
1427 assert_eq!(s1, Frame::SimpleString(Bytes::from("foo")));
1428 let (s2, rem2) = parse_frame(rem1.clone()).unwrap();
1429 assert_eq!(s2, Frame::SimpleString(Bytes::from("bar")));
1430 let (term_set, rest_set) = parse_frame(rem2.clone()).unwrap();
1431 assert_eq!(term_set, Frame::Set(vec![]));
1432 assert_eq!(rest_set, Bytes::from("TAIL"));
1433 let input_map = Bytes::from("%?\r\n+key\r\n+val\r\n%0\r\nNEXT");
1435 let (h_map, rem_map) = parse_frame(input_map.clone()).unwrap();
1436 assert_eq!(h_map, Frame::StreamedMapHeader);
1437 let (k, rem_map2) = parse_frame(rem_map.clone()).unwrap();
1438 assert_eq!(k, Frame::SimpleString(Bytes::from("key")));
1439 let (v, rem_map3) = parse_frame(rem_map2.clone()).unwrap();
1440 assert_eq!(v, Frame::SimpleString(Bytes::from("val")));
1441 let (term_map, rest_map4) = parse_frame(rem_map3.clone()).unwrap();
1442 assert_eq!(term_map, Frame::Map(vec![]));
1443 assert_eq!(rest_map4, Bytes::from("NEXT"));
1444 let input_attr = Bytes::from("|?\r\n+meta\r\n+info\r\n|0\r\nMORE");
1446 let (h_attr, rem_attr) = parse_frame(input_attr.clone()).unwrap();
1447 assert_eq!(h_attr, Frame::StreamedAttributeHeader);
1448 let (a1, rem_attr2) = parse_frame(rem_attr.clone()).unwrap();
1449 assert_eq!(a1, Frame::SimpleString(Bytes::from("meta")));
1450 let (a2, rem_attr3) = parse_frame(rem_attr2.clone()).unwrap();
1451 assert_eq!(a2, Frame::SimpleString(Bytes::from("info")));
1452 let (term_attr, rest_attr) = parse_frame(rem_attr3.clone()).unwrap();
1453 assert_eq!(term_attr, Frame::Attribute(vec![]));
1454 assert_eq!(rest_attr, Bytes::from("MORE"));
1455 let input_push = Bytes::from(">?\r\n:1\r\n:2\r\n>0\r\nEND");
1457 let (h_push, rem_push) = parse_frame(input_push.clone()).unwrap();
1458 assert_eq!(h_push, Frame::StreamedPushHeader);
1459 let (p1, rem_push2) = parse_frame(rem_push.clone()).unwrap();
1460 assert_eq!(p1, Frame::Integer(1));
1461 let (p2, rem_push3) = parse_frame(rem_push2.clone()).unwrap();
1462 assert_eq!(p2, Frame::Integer(2));
1463 let (term_push, rest_push) = parse_frame(rem_push3.clone()).unwrap();
1464 assert_eq!(term_push, Frame::Push(vec![]));
1465 assert_eq!(rest_push, Bytes::from("END"));
1466 }
1467
1468 #[test]
1469 fn test_parse_frame_stream_terminator() {
1470 let input = Bytes::from(".\r\nREST");
1471 let (frame, rest) = parse_frame(input.clone()).unwrap();
1472 assert_eq!(frame, Frame::StreamTerminator);
1473 assert_eq!(rest, Bytes::from("REST"));
1474 }
1475
1476 #[test]
1477 fn test_parse_frame_null_blob_error_rejected() {
1478 let input = Bytes::from("!-1\r\nTAIL");
1479 assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1480 }
1481
1482 #[test]
1483 fn test_parse_frame_null_verbatim_rejected() {
1484 let input = Bytes::from("=-1\r\nTAIL");
1485 assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1486 }
1487
1488 #[test]
1489 fn test_verbatim_string_format_must_be_3_bytes() {
1490 let input = Bytes::from("=6\r\nx:data\r\n");
1492 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1493
1494 let input = Bytes::from("=9\r\ntxtx:data\r\n");
1496 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1497
1498 let input = Bytes::from("=5\r\n:data\r\n");
1500 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1501
1502 let input = Bytes::from("=8\r\ntxt:data\r\n");
1504 let (frame, _) = parse_frame(input).unwrap();
1505 assert_eq!(
1506 frame,
1507 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("data"))
1508 );
1509 }
1510
1511 #[test]
1512 fn test_parse_frame_special_float_nan() {
1513 let input = Bytes::from(",nan\r\nTAIL");
1514 let (frame, rest) = parse_frame(input.clone()).unwrap();
1515 assert_eq!(frame, Frame::SpecialFloat(Bytes::from("nan")));
1516 assert_eq!(rest, Bytes::from("TAIL"));
1517 }
1518
1519 #[test]
1520 fn test_parse_frame_big_number_zero() {
1521 let input = Bytes::from("(0\r\nEND");
1522 let (frame, rest) = parse_frame(input.clone()).unwrap();
1523 assert_eq!(frame, Frame::BigNumber(Bytes::from("0")));
1524 assert_eq!(rest, Bytes::from("END"));
1525 }
1526
1527 #[test]
1528 fn test_parse_frame_collection_empty() {
1529 let input_push = Bytes::from(">0\r\nTAIL");
1530 let (f_push, r_push) = parse_frame(input_push.clone()).unwrap();
1531 assert_eq!(f_push, Frame::Push(vec![]));
1532 assert_eq!(r_push, Bytes::from("TAIL"));
1533 let input_attr = Bytes::from("|0\r\nAFTER");
1534 let (f_attr, r_attr) = parse_frame(input_attr.clone()).unwrap();
1535 assert_eq!(f_attr, Frame::Attribute(vec![]));
1536 assert_eq!(r_attr, Bytes::from("AFTER"));
1537 let input_map = Bytes::from("%0\r\nEND");
1538 let (f_map, r_map) = parse_frame(input_map.clone()).unwrap();
1539 assert_eq!(f_map, Frame::Map(vec![]));
1540 assert_eq!(r_map, Bytes::from("END"));
1541 let input_set = Bytes::from("~0\r\nDONE");
1542 let (f_set, r_set) = parse_frame(input_set.clone()).unwrap();
1543 assert_eq!(f_set, Frame::Set(vec![]));
1544 assert_eq!(r_set, Bytes::from("DONE"));
1545 let input_arr = Bytes::from("*-1\r\nFIN");
1546 let (f_arr, r_arr) = parse_frame(input_arr.clone()).unwrap();
1547 assert_eq!(f_arr, Frame::Array(None));
1548 assert_eq!(r_arr, Bytes::from("FIN"));
1549 }
1550
1551 #[test]
1554 fn test_roundtrip_simple_string() {
1555 let original = Bytes::from("+hello\r\n");
1556 let (frame, _) = parse_frame(original.clone()).unwrap();
1557 let serialized = frame_to_bytes(&frame);
1558 assert_eq!(original, serialized);
1559
1560 let (reparsed, _) = parse_frame(serialized).unwrap();
1561 assert_eq!(frame, reparsed);
1562 }
1563
1564 #[test]
1565 fn test_roundtrip_error() {
1566 let original = Bytes::from("-ERR error message\r\n");
1567 let (frame, _) = parse_frame(original.clone()).unwrap();
1568 let serialized = frame_to_bytes(&frame);
1569 assert_eq!(original, serialized);
1570
1571 let (reparsed, _) = parse_frame(serialized).unwrap();
1572 assert_eq!(frame, reparsed);
1573 }
1574
1575 #[test]
1576 fn test_roundtrip_integer() {
1577 let original = Bytes::from(":12345\r\n");
1578 let (frame, _) = parse_frame(original.clone()).unwrap();
1579 let serialized = frame_to_bytes(&frame);
1580 assert_eq!(original, serialized);
1581
1582 let (reparsed, _) = parse_frame(serialized).unwrap();
1583 assert_eq!(frame, reparsed);
1584 }
1585
1586 #[test]
1587 fn test_roundtrip_bulk_string() {
1588 let original = Bytes::from("$5\r\nhello\r\n");
1589 let (frame, _) = parse_frame(original.clone()).unwrap();
1590 let serialized = frame_to_bytes(&frame);
1591 assert_eq!(original, serialized);
1592
1593 let (reparsed, _) = parse_frame(serialized).unwrap();
1594 assert_eq!(frame, reparsed);
1595
1596 let original_null = Bytes::from("$-1\r\n");
1598 let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1599 let serialized_null = frame_to_bytes(&frame_null);
1600 assert_eq!(original_null, serialized_null);
1601
1602 let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1603 assert_eq!(frame_null, reparsed_null);
1604 }
1605
1606 #[test]
1607 fn test_roundtrip_blob_error() {
1608 let original = Bytes::from("!5\r\nerror\r\n");
1609 let (frame, _) = parse_frame(original.clone()).unwrap();
1610 let serialized = frame_to_bytes(&frame);
1611 assert_eq!(original, serialized);
1612
1613 let (reparsed, _) = parse_frame(serialized).unwrap();
1614 assert_eq!(frame, reparsed);
1615 }
1616
1617 #[test]
1618 fn test_roundtrip_null() {
1619 let original = Bytes::from("_\r\n");
1620 let (frame, _) = parse_frame(original.clone()).unwrap();
1621 let serialized = frame_to_bytes(&frame);
1622 assert_eq!(original, serialized);
1623
1624 let (reparsed, _) = parse_frame(serialized).unwrap();
1625 assert_eq!(frame, reparsed);
1626 }
1627
1628 #[test]
1629 fn test_roundtrip_double() {
1630 let original = Bytes::from(",3.14159\r\n");
1631 let (frame, _) = parse_frame(original.clone()).unwrap();
1632 let serialized = frame_to_bytes(&frame);
1633
1634 let (reparsed, _) = parse_frame(serialized).unwrap();
1637 assert_eq!(frame, reparsed);
1638 }
1639
1640 #[test]
1641 fn test_roundtrip_special_float() {
1642 let original = Bytes::from(",inf\r\n");
1643 let (frame, _) = parse_frame(original.clone()).unwrap();
1644 let serialized = frame_to_bytes(&frame);
1645 assert_eq!(original, serialized);
1646
1647 let (reparsed, _) = parse_frame(serialized).unwrap();
1648 assert_eq!(frame, reparsed);
1649 }
1650
1651 #[test]
1652 fn test_roundtrip_boolean() {
1653 let original_true = Bytes::from("#t\r\n");
1654 let (frame_true, _) = parse_frame(original_true.clone()).unwrap();
1655 let serialized_true = frame_to_bytes(&frame_true);
1656 assert_eq!(original_true, serialized_true);
1657
1658 let (reparsed_true, _) = parse_frame(serialized_true).unwrap();
1659 assert_eq!(frame_true, reparsed_true);
1660
1661 let original_false = Bytes::from("#f\r\n");
1662 let (frame_false, _) = parse_frame(original_false.clone()).unwrap();
1663 let serialized_false = frame_to_bytes(&frame_false);
1664 assert_eq!(original_false, serialized_false);
1665
1666 let (reparsed_false, _) = parse_frame(serialized_false).unwrap();
1667 assert_eq!(frame_false, reparsed_false);
1668 }
1669
1670 #[test]
1671 fn test_roundtrip_big_number() {
1672 let original = Bytes::from("(12345678901234567890\r\n");
1673 let (frame, _) = parse_frame(original.clone()).unwrap();
1674 let serialized = frame_to_bytes(&frame);
1675 assert_eq!(original, serialized);
1676
1677 let (reparsed, _) = parse_frame(serialized).unwrap();
1678 assert_eq!(frame, reparsed);
1679 }
1680
1681 #[test]
1682 fn test_roundtrip_verbatim_string() {
1683 let original = Bytes::from("=10\r\ntxt:hello!\r\n");
1684 let (frame, _) = parse_frame(original.clone()).unwrap();
1685 let serialized = frame_to_bytes(&frame);
1686 assert_eq!(original, serialized);
1687
1688 let (reparsed, _) = parse_frame(serialized).unwrap();
1689 assert_eq!(frame, reparsed);
1690 }
1691
1692 #[test]
1693 fn test_roundtrip_array() {
1694 let original = Bytes::from("*2\r\n+hello\r\n:123\r\n");
1695 let (frame, _) = parse_frame(original.clone()).unwrap();
1696 let serialized = frame_to_bytes(&frame);
1697 assert_eq!(original, serialized);
1698
1699 let (reparsed, _) = parse_frame(serialized).unwrap();
1700 assert_eq!(frame, reparsed);
1701
1702 let original_null = Bytes::from("*-1\r\n");
1704 let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1705 let serialized_null = frame_to_bytes(&frame_null);
1706 assert_eq!(original_null, serialized_null);
1707
1708 let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1709 assert_eq!(frame_null, reparsed_null);
1710 }
1711
1712 #[test]
1713 fn test_roundtrip_set() {
1714 let original = Bytes::from("~2\r\n+one\r\n+two\r\n");
1715 let (frame, _) = parse_frame(original.clone()).unwrap();
1716 let serialized = frame_to_bytes(&frame);
1717 assert_eq!(original, serialized);
1718
1719 let (reparsed, _) = parse_frame(serialized).unwrap();
1720 assert_eq!(frame, reparsed);
1721 }
1722
1723 #[test]
1724 fn test_roundtrip_map() {
1725 let original = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\n");
1726 let (frame, _) = parse_frame(original.clone()).unwrap();
1727 let serialized = frame_to_bytes(&frame);
1728 assert_eq!(original, serialized);
1729
1730 let (reparsed, _) = parse_frame(serialized).unwrap();
1731 assert_eq!(frame, reparsed);
1732 }
1733
1734 #[test]
1735 fn test_roundtrip_attribute() {
1736 let original = Bytes::from("|1\r\n+key\r\n+val\r\n");
1737 let (frame, _) = parse_frame(original.clone()).unwrap();
1738 let serialized = frame_to_bytes(&frame);
1739 assert_eq!(original, serialized);
1740
1741 let (reparsed, _) = parse_frame(serialized).unwrap();
1742 assert_eq!(frame, reparsed);
1743 }
1744
1745 #[test]
1746 fn test_roundtrip_push() {
1747 let original = Bytes::from(">2\r\n+msg\r\n+data\r\n");
1748 let (frame, _) = parse_frame(original.clone()).unwrap();
1749 let serialized = frame_to_bytes(&frame);
1750 assert_eq!(original, serialized);
1751
1752 let (reparsed, _) = parse_frame(serialized).unwrap();
1753 assert_eq!(frame, reparsed);
1754 }
1755
1756 #[test]
1757 fn test_roundtrip_streaming_headers() {
1758 let headers = [
1759 ("$?\r\n", Frame::StreamedStringHeader),
1760 ("!?\r\n", Frame::StreamedBlobErrorHeader),
1761 ("=?\r\n", Frame::StreamedVerbatimStringHeader),
1762 ("*?\r\n", Frame::StreamedArrayHeader),
1763 ("~?\r\n", Frame::StreamedSetHeader),
1764 ("%?\r\n", Frame::StreamedMapHeader),
1765 ("|?\r\n", Frame::StreamedAttributeHeader),
1766 (">?\r\n", Frame::StreamedPushHeader),
1767 (".\r\n", Frame::StreamTerminator),
1768 ];
1769
1770 for (original_str, expected_frame) in headers {
1771 let original = Bytes::from(original_str);
1772 let (frame, _) = parse_frame(original.clone()).unwrap();
1773 assert_eq!(frame, expected_frame);
1774
1775 let serialized = frame_to_bytes(&frame);
1776 assert_eq!(original, serialized);
1777
1778 let (reparsed, _) = parse_frame(serialized).unwrap();
1779 assert_eq!(frame, reparsed);
1780 }
1781 }
1782
1783 #[test]
1784 fn test_roundtrip_streaming_chunks() {
1785 let chunks = [
1786 (
1787 ";4\r\nHell\r\n",
1788 Frame::StreamedStringChunk(Bytes::from("Hell")),
1789 ),
1790 (
1791 ";5\r\no wor\r\n",
1792 Frame::StreamedStringChunk(Bytes::from("o wor")),
1793 ),
1794 (";1\r\nd\r\n", Frame::StreamedStringChunk(Bytes::from("d"))),
1795 (";0\r\n\r\n", Frame::StreamedStringChunk(Bytes::new())),
1796 (
1797 ";11\r\nHello World\r\n",
1798 Frame::StreamedStringChunk(Bytes::from("Hello World")),
1799 ),
1800 ];
1801
1802 for (original_str, expected_frame) in chunks {
1803 let original = Bytes::from(original_str);
1804 let (frame, rest) = parse_frame(original.clone()).unwrap();
1805 assert_eq!(frame, expected_frame);
1806 assert!(rest.is_empty());
1807
1808 let serialized = frame_to_bytes(&frame);
1809 assert_eq!(original, serialized);
1810
1811 let (reparsed, _) = parse_frame(serialized).unwrap();
1812 assert_eq!(frame, reparsed);
1813 }
1814 }
1815
1816 #[test]
1817 fn test_streaming_chunks_edge_cases() {
1818 let data = Bytes::from(";4\r\nHel");
1820 let result = parse_frame(data);
1821 assert!(matches!(result, Err(ParseError::Incomplete)));
1822
1823 let data = Bytes::from(";4\r\nHell");
1825 let result = parse_frame(data);
1826 assert!(matches!(result, Err(ParseError::Incomplete)));
1827
1828 let data = Bytes::from(";abc\r\ndata\r\n");
1830 let result = parse_frame(data);
1831 assert!(matches!(result, Err(ParseError::BadLength)));
1832
1833 let data = Bytes::from(";-1\r\ndata\r\n");
1835 let result = parse_frame(data);
1836 assert!(matches!(result, Err(ParseError::BadLength)));
1837
1838 let data = Bytes::from(";5\r\nHell\r\n");
1840 let result = parse_frame(data);
1841 assert!(matches!(result, Err(ParseError::Incomplete)));
1842
1843 let data = Bytes::from(";0\r\n");
1845 let result = parse_frame(data);
1846 assert!(matches!(result, Err(ParseError::Incomplete)));
1847
1848 let binary_data = b"\x00\x01\x02\x03\xFF";
1850 let mut chunk_data = Vec::new();
1851 chunk_data.extend_from_slice(b";5\r\n");
1852 chunk_data.extend_from_slice(binary_data);
1853 chunk_data.extend_from_slice(b"\r\n");
1854 let data = Bytes::from(chunk_data);
1855 let result = parse_frame(data);
1856 assert!(result.is_ok());
1857 let (frame, _) = result.unwrap();
1858 if let Frame::StreamedStringChunk(chunk) = frame {
1859 assert_eq!(chunk.as_ref(), binary_data);
1860 }
1861 }
1862
1863 #[test]
1864 fn test_roundtrip_streaming_sequences() {
1865 let streaming_string = Frame::StreamedString(vec![
1867 Bytes::from("Hell"),
1868 Bytes::from("o wor"),
1869 Bytes::from("ld"),
1870 ]);
1871 let serialized = frame_to_bytes(&streaming_string);
1872 let expected = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;2\r\nld\r\n;0\r\n\r\n";
1873 assert_eq!(serialized, Bytes::from(expected));
1874
1875 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1876 assert_eq!(parsed, streaming_string);
1877
1878 let streaming_array = Frame::StreamedArray(vec![
1880 Frame::SimpleString(Bytes::from("hello")),
1881 Frame::Integer(42),
1882 Frame::Boolean(true),
1883 ]);
1884 let serialized = frame_to_bytes(&streaming_array);
1885 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1886 assert_eq!(parsed, streaming_array);
1887
1888 let streaming_map = Frame::StreamedMap(vec![
1890 (
1891 Frame::SimpleString(Bytes::from("key1")),
1892 Frame::SimpleString(Bytes::from("val1")),
1893 ),
1894 (
1895 Frame::SimpleString(Bytes::from("key2")),
1896 Frame::Integer(123),
1897 ),
1898 ]);
1899 let serialized = frame_to_bytes(&streaming_map);
1900 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1901 assert_eq!(parsed, streaming_map);
1902
1903 let empty_streaming = Frame::StreamedString(vec![]);
1905 let serialized = frame_to_bytes(&empty_streaming);
1906 let expected = "$?\r\n;0\r\n\r\n";
1907 assert_eq!(serialized, Bytes::from(expected));
1908 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1909 assert_eq!(parsed, empty_streaming);
1910
1911 let streaming_set = Frame::StreamedSet(vec![
1913 Frame::SimpleString(Bytes::from("apple")),
1914 Frame::SimpleString(Bytes::from("banana")),
1915 Frame::Integer(42),
1916 ]);
1917 let serialized = frame_to_bytes(&streaming_set);
1918 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1919 assert_eq!(parsed, streaming_set);
1920
1921 let streaming_attribute = Frame::StreamedAttribute(vec![
1923 (
1924 Frame::SimpleString(Bytes::from("trace-id")),
1925 Frame::SimpleString(Bytes::from("abc123")),
1926 ),
1927 (
1928 Frame::SimpleString(Bytes::from("span-id")),
1929 Frame::SimpleString(Bytes::from("def456")),
1930 ),
1931 ]);
1932 let serialized = frame_to_bytes(&streaming_attribute);
1933 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1934 assert_eq!(parsed, streaming_attribute);
1935
1936 let streaming_push = Frame::StreamedPush(vec![
1938 Frame::SimpleString(Bytes::from("pubsub")),
1939 Frame::SimpleString(Bytes::from("channel1")),
1940 Frame::SimpleString(Bytes::from("message data")),
1941 ]);
1942 let serialized = frame_to_bytes(&streaming_push);
1943 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1944 assert_eq!(parsed, streaming_push);
1945
1946 let empty_array = Frame::StreamedArray(vec![]);
1948 let serialized = frame_to_bytes(&empty_array);
1949 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1950 assert_eq!(parsed, empty_array);
1951
1952 let empty_set = Frame::StreamedSet(vec![]);
1953 let serialized = frame_to_bytes(&empty_set);
1954 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1955 assert_eq!(parsed, empty_set);
1956 }
1957
1958 #[test]
1959 fn test_streaming_sequences_edge_cases() {
1960 let data = Bytes::from("$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n");
1962 let result = parse_streaming_sequence(data);
1963 assert!(matches!(result, Err(ParseError::Incomplete)));
1964
1965 let data = Bytes::from("$?\r\n;abc\r\nHell\r\n;0\r\n");
1967 let result = parse_streaming_sequence(data);
1968 assert!(matches!(result, Err(ParseError::BadLength)));
1969
1970 let data = Bytes::from("*?\r\n+hello\r\n:42\r\n");
1972 let result = parse_streaming_sequence(data);
1973 assert!(matches!(result, Err(ParseError::Incomplete)));
1974
1975 let data = Bytes::from("*?\r\n+hello\r\n*2\r\n:1\r\n:2\r\n.\r\n");
1977 let result = parse_streaming_sequence(data);
1978 assert!(result.is_ok());
1979 let (frame, _) = result.unwrap();
1980 if let Frame::StreamedArray(items) = frame {
1981 assert_eq!(items.len(), 2);
1982 assert!(matches!(items[0], Frame::SimpleString(_)));
1983 assert!(matches!(items[1], Frame::Array(_)));
1984 }
1985
1986 let data = Bytes::from("*?\r\n.\r\n");
1988 let result = parse_streaming_sequence(data);
1989 assert!(result.is_ok());
1990 let (frame, _) = result.unwrap();
1991 if let Frame::StreamedArray(items) = frame {
1992 assert!(items.is_empty());
1993 }
1994
1995 let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+orphan\r\n.\r\n");
1997 let result = parse_streaming_sequence(data);
1998 assert!(matches!(result, Err(ParseError::InvalidFormat)));
1999
2000 let data = Bytes::from("+simple\r\n");
2002 let result = parse_streaming_sequence(data);
2003 assert!(result.is_ok());
2004 let (frame, _) = result.unwrap();
2005 assert!(matches!(frame, Frame::SimpleString(_)));
2006
2007 let data = Bytes::from(";999999999999999999\r\ndata\r\n");
2009 let result = parse_frame(data);
2010 match &result {
2013 Err(ParseError::BadLength) => {} Err(ParseError::Incomplete) => {} Err(e) => panic!("Got unexpected error type: {e:?}"),
2016 Ok(_) => panic!("Large chunk size should fail"),
2017 }
2018
2019 let data = Bytes::from("$?\r\n+invalid\r\n;0\r\n");
2021 let result = parse_streaming_sequence(data);
2022 assert!(matches!(result, Err(ParseError::InvalidFormat)));
2023
2024 let data = Bytes::from("*?\r\n+hello\r\n.corrupted\r\n");
2026 let result = parse_streaming_sequence(data);
2027 assert!(matches!(result, Err(ParseError::Incomplete)));
2028
2029 let data = Bytes::new();
2031 let result = parse_streaming_sequence(data);
2032 assert!(matches!(result, Err(ParseError::Incomplete)));
2033
2034 let data = Bytes::from("*?\r\n+hello\r\n$5\r\nwo");
2036 let result = parse_streaming_sequence(data);
2037 assert!(matches!(result, Err(ParseError::Incomplete)));
2038 }
2039
2040 #[test]
2041 fn test_roundtrip_nested_structures() {
2042 let original = Bytes::from(
2044 "*3\r\n+hello\r\n%2\r\n+key1\r\n:123\r\n+key2\r\n~1\r\n+item\r\n|1\r\n+meta\r\n+data\r\n",
2045 );
2046 let (frame, _) = parse_frame(original.clone()).unwrap();
2047 let serialized = frame_to_bytes(&frame);
2048
2049 let (reparsed, _) = parse_frame(serialized).unwrap();
2050 assert_eq!(frame, reparsed);
2051 }
2052
2053 #[test]
2054 fn test_zero_length_bulk_string_requires_trailing_crlf() {
2055 let input = Bytes::from("$0\r\n\r\nTAIL");
2057 let (frame, rest) = parse_frame(input).unwrap();
2058 assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
2059 assert_eq!(rest, Bytes::from("TAIL"));
2060
2061 let input = Bytes::from("$0\r\n");
2063 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2064
2065 let input = Bytes::from("$0\r\n\r");
2067 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2068
2069 let input = Bytes::from("$0\r\nXY");
2071 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2072 }
2073
2074 #[test]
2075 fn test_zero_length_streamed_chunk_requires_trailing_crlf() {
2076 let input = Bytes::from(";0\r\n\r\nTAIL");
2078 let (frame, rest) = parse_frame(input).unwrap();
2079 assert_eq!(frame, Frame::StreamedStringChunk(Bytes::new()));
2080 assert_eq!(rest, Bytes::from("TAIL"));
2081
2082 let input = Bytes::from(";0\r\n");
2084 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2085
2086 let input = Bytes::from(";0\r\nXY");
2088 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2089 }
2090
2091 #[test]
2092 fn test_integer_overflow_returns_overflow_error() {
2093 let input = Bytes::from(":9223372036854775808\r\n");
2095 assert_eq!(parse_frame(input), Err(ParseError::Overflow));
2096
2097 let input = Bytes::from(":9223372036854775807\r\n");
2099 let (frame, _) = parse_frame(input).unwrap();
2100 assert_eq!(frame, Frame::Integer(i64::MAX));
2101
2102 let input = Bytes::from(":-9223372036854775808\r\n");
2104 let (frame, _) = parse_frame(input).unwrap();
2105 assert_eq!(frame, Frame::Integer(i64::MIN));
2106 }
2107
2108 #[test]
2109 fn test_parser_propagates_errors() {
2110 let mut parser = Parser::new();
2111 parser.feed(Bytes::from("XINVALID\r\n"));
2112 let result = parser.next_frame();
2113 assert!(result.is_err());
2114 assert_eq!(result.unwrap_err(), ParseError::InvalidTag(b'X'));
2115 }
2116
2117 #[test]
2118 fn test_parser_returns_ok_none_for_incomplete() {
2119 let mut parser = Parser::new();
2120 parser.feed(Bytes::from("+HELL"));
2121 assert_eq!(parser.next_frame().unwrap(), None);
2122 }
2123
2124 #[test]
2125 fn test_integer_negative_overflow() {
2126 assert!(parse_frame(Bytes::from(":-9223372036854775809\r\n")).is_err());
2128 }
2129
2130 #[test]
2131 fn test_nonempty_bulk_malformed_terminator() {
2132 assert_eq!(
2134 parse_frame(Bytes::from("$3\r\nfoo")),
2135 Err(ParseError::Incomplete)
2136 );
2137 assert_eq!(
2139 parse_frame(Bytes::from("$3\r\nfooX")),
2140 Err(ParseError::Incomplete)
2141 );
2142 assert_eq!(
2144 parse_frame(Bytes::from("$3\r\nfooXY")),
2145 Err(ParseError::InvalidFormat)
2146 );
2147 }
2148
2149 #[test]
2150 fn test_blob_error_malformed_terminator() {
2151 assert_eq!(
2152 parse_frame(Bytes::from("!3\r\nerr")),
2153 Err(ParseError::Incomplete)
2154 );
2155 assert_eq!(
2156 parse_frame(Bytes::from("!3\r\nerrXY")),
2157 Err(ParseError::InvalidFormat)
2158 );
2159 }
2160
2161 #[test]
2162 fn test_verbatim_string_malformed_terminator() {
2163 assert_eq!(
2164 parse_frame(Bytes::from("=8\r\ntxt:data")),
2165 Err(ParseError::Incomplete)
2166 );
2167 assert_eq!(
2168 parse_frame(Bytes::from("=8\r\ntxt:dataXY")),
2169 Err(ParseError::InvalidFormat)
2170 );
2171 }
2172
2173 #[test]
2174 fn test_streamed_chunk_malformed_terminator() {
2175 assert_eq!(
2176 parse_frame(Bytes::from(";3\r\nabc")),
2177 Err(ParseError::Incomplete)
2178 );
2179 assert_eq!(
2180 parse_frame(Bytes::from(";3\r\nabcXY")),
2181 Err(ParseError::InvalidFormat)
2182 );
2183 }
2184
2185 #[test]
2186 fn test_bulk_string_size_limit() {
2187 assert_eq!(
2189 parse_frame(Bytes::from("$536870913\r\n")),
2190 Err(ParseError::BadLength)
2191 );
2192 }
2193
2194 #[test]
2195 fn test_blob_error_size_limit() {
2196 assert_eq!(
2197 parse_frame(Bytes::from("!536870913\r\n")),
2198 Err(ParseError::BadLength)
2199 );
2200 }
2201
2202 #[test]
2203 fn test_verbatim_string_size_limit() {
2204 assert_eq!(
2205 parse_frame(Bytes::from("=536870913\r\n")),
2206 Err(ParseError::BadLength)
2207 );
2208 }
2209
2210 #[test]
2211 fn test_streamed_chunk_size_limit() {
2212 assert_eq!(
2213 parse_frame(Bytes::from(";536870913\r\n")),
2214 Err(ParseError::BadLength)
2215 );
2216 }
2217
2218 #[test]
2219 fn test_invalid_double() {
2220 assert_eq!(
2221 parse_frame(Bytes::from(",foo\r\n")),
2222 Err(ParseError::InvalidFormat)
2223 );
2224 }
2225
2226 #[test]
2227 fn test_invalid_boolean() {
2228 assert_eq!(
2229 parse_frame(Bytes::from("#\r\n")),
2230 Err(ParseError::InvalidBoolean)
2231 );
2232 assert_eq!(
2233 parse_frame(Bytes::from("#true\r\n")),
2234 Err(ParseError::InvalidBoolean)
2235 );
2236 }
2237
2238 #[test]
2239 fn test_parser_clears_buffer_on_error() {
2240 let mut parser = Parser::new();
2241 parser.feed(Bytes::from("X\r\n"));
2242 assert_eq!(parser.next_frame(), Err(ParseError::InvalidTag(b'X')));
2243 assert_eq!(parser.buffered_bytes(), 0);
2244 }
2245
2246 #[test]
2247 fn test_parser_recovers_after_error() {
2248 let mut parser = Parser::new();
2249 parser.feed(Bytes::from("X\r\n"));
2250 assert!(parser.next_frame().is_err());
2251 assert_eq!(parser.buffered_bytes(), 0);
2252
2253 parser.feed(Bytes::from("+OK\r\n"));
2254 let frame = parser.next_frame().unwrap().unwrap();
2255 assert_eq!(frame, Frame::SimpleString(Bytes::from("OK")));
2256 }
2257
2258 #[test]
2259 fn test_streaming_set_roundtrip() {
2260 let data = Bytes::from("~?\r\n+a\r\n+b\r\n+c\r\n.\r\n");
2261 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2262 assert_eq!(
2263 frame,
2264 Frame::StreamedSet(vec![
2265 Frame::SimpleString(Bytes::from("a")),
2266 Frame::SimpleString(Bytes::from("b")),
2267 Frame::SimpleString(Bytes::from("c")),
2268 ])
2269 );
2270 assert!(rest.is_empty());
2271 }
2272
2273 #[test]
2274 fn test_streaming_attribute_roundtrip() {
2275 let data = Bytes::from("|?\r\n+key\r\n+val\r\n.\r\n");
2276 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2277 assert_eq!(
2278 frame,
2279 Frame::StreamedAttribute(vec![(
2280 Frame::SimpleString(Bytes::from("key")),
2281 Frame::SimpleString(Bytes::from("val")),
2282 )])
2283 );
2284 assert!(rest.is_empty());
2285 }
2286
2287 #[test]
2288 fn test_streaming_push_roundtrip() {
2289 let data = Bytes::from(">?\r\n+pubsub\r\n+channel\r\n+message\r\n.\r\n");
2290 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2291 assert_eq!(
2292 frame,
2293 Frame::StreamedPush(vec![
2294 Frame::SimpleString(Bytes::from("pubsub")),
2295 Frame::SimpleString(Bytes::from("channel")),
2296 Frame::SimpleString(Bytes::from("message")),
2297 ])
2298 );
2299 assert!(rest.is_empty());
2300 }
2301
2302 #[test]
2303 fn test_empty_streaming_containers() {
2304 let data = Bytes::from("$?\r\n;0\r\n\r\n");
2306 let (frame, _) = parse_streaming_sequence(data).unwrap();
2307 assert_eq!(frame, Frame::StreamedString(vec![]));
2308
2309 let data = Bytes::from("*?\r\n.\r\n");
2311 let (frame, _) = parse_streaming_sequence(data).unwrap();
2312 assert_eq!(frame, Frame::StreamedArray(vec![]));
2313
2314 let data = Bytes::from("~?\r\n.\r\n");
2316 let (frame, _) = parse_streaming_sequence(data).unwrap();
2317 assert_eq!(frame, Frame::StreamedSet(vec![]));
2318
2319 let data = Bytes::from("%?\r\n.\r\n");
2321 let (frame, _) = parse_streaming_sequence(data).unwrap();
2322 assert_eq!(frame, Frame::StreamedMap(vec![]));
2323 }
2324
2325 #[test]
2326 fn test_streaming_attribute_odd_elements_errors() {
2327 let data = Bytes::from("|?\r\n+key\r\n+val\r\n+orphan\r\n.\r\n");
2328 let result = parse_streaming_sequence(data);
2329 assert!(matches!(result, Err(ParseError::InvalidFormat)));
2330 }
2331
2332 #[test]
2333 fn test_streaming_blob_error_header_passthrough() {
2334 let data = Bytes::from("!?\r\n!5\r\nERROR\r\n");
2336 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2337 assert_eq!(frame, Frame::StreamedBlobErrorHeader);
2338 assert!(!rest.is_empty());
2340 }
2341
2342 #[test]
2343 fn test_streaming_verbatim_header_passthrough() {
2344 let data = Bytes::from("=?\r\n=9\r\ntxt:hello\r\n");
2346 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2347 assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
2348 assert!(!rest.is_empty());
2349 }
2350}