1use alloc::string::ToString;
29use alloc::vec::Vec;
30
31use bytes::{BufMut, Bytes, BytesMut};
32
33const MAX_COLLECTION_SIZE: usize = 10_000_000;
35
36const MAX_BULK_STRING_SIZE: usize = 512 * 1024 * 1024;
38
39#[derive(Default, Debug)]
44pub struct Parser {
45 buffer: BytesMut,
46}
47
48impl Parser {
49 pub fn new() -> Self {
51 Self {
52 buffer: BytesMut::new(),
53 }
54 }
55
56 pub fn feed(&mut self, data: Bytes) {
60 self.buffer.extend_from_slice(&data);
61 }
62
63 pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
69 if self.buffer.is_empty() {
70 return Ok(None);
71 }
72
73 let bytes = self.buffer.split().freeze();
74
75 match parse_frame_inner(&bytes, 0) {
76 Ok((frame, consumed)) => {
77 if consumed < bytes.len() {
78 self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
79 }
80 Ok(Some(frame))
81 }
82 Err(ParseError::Incomplete) => {
83 self.buffer.unsplit(bytes.into());
84 Ok(None)
85 }
86 Err(e) => {
87 Err(e)
90 }
91 }
92 }
93
94 pub fn buffered_bytes(&self) -> usize {
96 self.buffer.len()
97 }
98
99 pub fn clear(&mut self) {
101 self.buffer.clear();
102 }
103}
104
105#[derive(Debug, Clone, PartialEq)]
111pub enum Frame {
112 SimpleString(Bytes),
114 Error(Bytes),
116 Integer(i64),
118 BulkString(Option<Bytes>),
121 BlobError(Bytes),
123 StreamedStringHeader,
125 StreamedBlobErrorHeader,
127 StreamedVerbatimStringHeader,
129 StreamedArrayHeader,
131 StreamedSetHeader,
133 StreamedMapHeader,
135 StreamedAttributeHeader,
137 StreamedPushHeader,
139 StreamedStringChunk(Bytes),
155
156 StreamedString(Vec<Bytes>),
174
175 StreamedArray(Vec<Frame>),
193
194 StreamedSet(Vec<Frame>),
199
200 StreamedMap(Vec<(Frame, Frame)>),
216
217 StreamedAttribute(Vec<(Frame, Frame)>),
222
223 StreamedPush(Vec<Frame>),
241 StreamTerminator,
243 Null,
245 Double(f64),
247 SpecialFloat(Bytes),
249 Boolean(bool),
251 BigNumber(Bytes),
253 VerbatimString(Bytes, Bytes),
256 Array(Option<Vec<Frame>>),
258 Set(Vec<Frame>),
260 Map(Vec<(Frame, Frame)>),
262 Attribute(Vec<(Frame, Frame)>),
264 Push(Vec<Frame>),
266}
267
268impl Frame {
269 pub fn as_bytes(&self) -> Option<&Bytes> {
274 match self {
275 Frame::SimpleString(b)
276 | Frame::Error(b)
277 | Frame::BlobError(b)
278 | Frame::BigNumber(b) => Some(b),
279 Frame::BulkString(opt) => opt.as_ref(),
280 _ => None,
281 }
282 }
283
284 pub fn as_str(&self) -> Option<&str> {
287 self.as_bytes().and_then(|b| core::str::from_utf8(b).ok())
288 }
289
290 pub fn as_integer(&self) -> Option<i64> {
292 match self {
293 Frame::Integer(v) => Some(*v),
294 _ => None,
295 }
296 }
297
298 pub fn as_double(&self) -> Option<f64> {
300 match self {
301 Frame::Double(v) => Some(*v),
302 _ => None,
303 }
304 }
305
306 pub fn as_boolean(&self) -> Option<bool> {
308 match self {
309 Frame::Boolean(v) => Some(*v),
310 _ => None,
311 }
312 }
313
314 pub fn as_array(&self) -> Option<&[Frame]> {
318 match self {
319 Frame::Array(Some(items)) => Some(items),
320 _ => None,
321 }
322 }
323
324 pub fn as_set(&self) -> Option<&[Frame]> {
326 match self {
327 Frame::Set(items) => Some(items),
328 _ => None,
329 }
330 }
331
332 pub fn as_map(&self) -> Option<&[(Frame, Frame)]> {
334 match self {
335 Frame::Map(pairs) => Some(pairs),
336 _ => None,
337 }
338 }
339
340 pub fn as_push(&self) -> Option<&[Frame]> {
342 match self {
343 Frame::Push(items) => Some(items),
344 _ => None,
345 }
346 }
347
348 pub fn as_verbatim_string(&self) -> Option<(&Bytes, &Bytes)> {
350 match self {
351 Frame::VerbatimString(format, content) => Some((format, content)),
352 _ => None,
353 }
354 }
355
356 pub fn into_array(self) -> Result<Vec<Frame>, Frame> {
360 match self {
361 Frame::Array(Some(items)) => Ok(items),
362 other => Err(other),
363 }
364 }
365
366 pub fn into_bulk_string(self) -> Result<Bytes, Frame> {
370 match self {
371 Frame::BulkString(Some(b)) => Ok(b),
372 other => Err(other),
373 }
374 }
375
376 pub fn into_map(self) -> Result<Vec<(Frame, Frame)>, Frame> {
380 match self {
381 Frame::Map(pairs) => Ok(pairs),
382 other => Err(other),
383 }
384 }
385
386 pub fn into_set(self) -> Result<Vec<Frame>, Frame> {
390 match self {
391 Frame::Set(items) => Ok(items),
392 other => Err(other),
393 }
394 }
395
396 pub fn is_null(&self) -> bool {
398 matches!(
399 self,
400 Frame::Null | Frame::BulkString(None) | Frame::Array(None)
401 )
402 }
403
404 pub fn is_error(&self) -> bool {
406 matches!(self, Frame::Error(_) | Frame::BlobError(_))
407 }
408}
409
410pub use crate::ParseError;
411
412pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
416 let (frame, consumed) = parse_frame_inner(&input, 0)?;
417 Ok((frame, input.slice(consumed..)))
418}
419
420#[inline(never)]
423fn parse_blob_bounds(
424 buf: &[u8],
425 after_crlf: usize,
426 len: usize,
427) -> Result<(usize, usize), ParseError> {
428 if len == 0 {
429 if after_crlf + 1 >= buf.len() {
430 return Err(ParseError::Incomplete);
431 }
432 if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
433 return Ok((after_crlf, after_crlf));
434 } else {
435 return Err(ParseError::InvalidFormat);
436 }
437 }
438 let data_start = after_crlf;
439 let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
440 if data_end + 1 >= buf.len() {
441 return Err(ParseError::Incomplete);
442 }
443 if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
444 return Err(ParseError::InvalidFormat);
445 }
446 Ok((data_start, data_end))
447}
448
449fn parse_bulk_string(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
451 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
452 let len_bytes = &buf[pos + 1..line_end];
453 if len_bytes == b"?" {
454 return Ok((Frame::StreamedStringHeader, after_crlf));
455 }
456 if len_bytes == b"-1" {
457 return Ok((Frame::BulkString(None), after_crlf));
458 }
459 let len = parse_usize(len_bytes)?;
460 if len > MAX_BULK_STRING_SIZE {
461 return Err(ParseError::BadLength);
462 }
463 let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
464 if data_start == data_end {
465 Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2))
466 } else {
467 Ok((
468 Frame::BulkString(Some(input.slice(data_start..data_end))),
469 data_end + 2,
470 ))
471 }
472}
473
474#[inline(never)]
476fn parse_double_frame(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
477 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
478 let line_bytes = &buf[pos + 1..line_end];
479 if line_bytes == b"inf" || line_bytes == b"-inf" || line_bytes == b"nan" {
480 return Ok((
481 Frame::SpecialFloat(input.slice(pos + 1..line_end)),
482 after_crlf,
483 ));
484 }
485 let s = core::str::from_utf8(line_bytes).map_err(|_| ParseError::Utf8Error)?;
486 let v = s.parse::<f64>().map_err(|_| ParseError::InvalidFormat)?;
487 if v.is_infinite() || v.is_nan() {
488 let canonical = if v.is_nan() {
489 "nan"
490 } else if v.is_sign_negative() {
491 "-inf"
492 } else {
493 "inf"
494 };
495 return Ok((Frame::SpecialFloat(Bytes::from(canonical)), after_crlf));
496 }
497 Ok((Frame::Double(v), after_crlf))
498}
499
500#[inline(never)]
502fn parse_verbatim(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
503 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
504 let len_bytes = &buf[pos + 1..line_end];
505 if len_bytes == b"?" {
506 return Ok((Frame::StreamedVerbatimStringHeader, after_crlf));
507 }
508 if len_bytes == b"-1" {
509 return Err(ParseError::BadLength);
510 }
511 let len = parse_usize(len_bytes)?;
512 if len > MAX_BULK_STRING_SIZE {
513 return Err(ParseError::BadLength);
514 }
515 let data_start = after_crlf;
516 let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
517 if data_end + 1 >= buf.len() {
518 return Err(ParseError::Incomplete);
519 }
520 if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
521 return Err(ParseError::InvalidFormat);
522 }
523 let sep = buf[data_start..data_end]
524 .iter()
525 .position(|&b| b == b':')
526 .ok_or(ParseError::InvalidFormat)?;
527 if sep != 3 {
528 return Err(ParseError::InvalidFormat);
529 }
530 let format = input.slice(data_start..data_start + sep);
531 let content = input.slice(data_start + sep + 1..data_end);
532 Ok((Frame::VerbatimString(format, content), data_end + 2))
533}
534
535#[inline(never)]
537fn parse_blob_error(input: &Bytes, buf: &[u8], pos: usize) -> Result<(Frame, usize), ParseError> {
538 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
539 let len_bytes = &buf[pos + 1..line_end];
540 if len_bytes == b"?" {
541 return Ok((Frame::StreamedBlobErrorHeader, after_crlf));
542 }
543 if len_bytes == b"-1" {
544 return Err(ParseError::BadLength);
545 }
546 let len = parse_usize(len_bytes)?;
547 if len > MAX_BULK_STRING_SIZE {
548 return Err(ParseError::BadLength);
549 }
550 let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
551 if data_start == data_end {
552 Ok((Frame::BlobError(Bytes::new()), after_crlf + 2))
553 } else {
554 Ok((
555 Frame::BlobError(input.slice(data_start..data_end)),
556 data_end + 2,
557 ))
558 }
559}
560
561#[inline(never)]
563fn parse_collection(
564 input: &Bytes,
565 buf: &[u8],
566 pos: usize,
567 tag: u8,
568) -> Result<(Frame, usize), ParseError> {
569 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
570 let len_bytes = &buf[pos + 1..line_end];
571
572 if len_bytes == b"?" {
574 return match tag {
575 b'*' => Ok((Frame::StreamedArrayHeader, after_crlf)),
576 b'~' => Ok((Frame::StreamedSetHeader, after_crlf)),
577 b'>' => Ok((Frame::StreamedPushHeader, after_crlf)),
578 _ => unreachable!(),
579 };
580 }
581 if tag == b'*' && len_bytes == b"-1" {
583 return Ok((Frame::Array(None), after_crlf));
584 }
585 let count = parse_count(len_bytes)?;
586 if count == 0 {
587 return match tag {
588 b'*' => Ok((Frame::Array(Some(Vec::new())), after_crlf)),
589 b'~' => Ok((Frame::Set(Vec::new()), after_crlf)),
590 b'>' => Ok((Frame::Push(Vec::new()), after_crlf)),
591 _ => unreachable!(),
592 };
593 }
594 let mut cursor = after_crlf;
595 let mut items = Vec::with_capacity(count);
596 for _ in 0..count {
597 let (item, next) = parse_frame_inner(input, cursor)?;
598 items.push(item);
599 cursor = next;
600 }
601 match tag {
602 b'*' => Ok((Frame::Array(Some(items)), cursor)),
603 b'~' => Ok((Frame::Set(items), cursor)),
604 b'>' => Ok((Frame::Push(items), cursor)),
605 _ => unreachable!(),
606 }
607}
608
609#[inline(never)]
611fn parse_pairs(
612 input: &Bytes,
613 buf: &[u8],
614 pos: usize,
615 tag: u8,
616) -> Result<(Frame, usize), ParseError> {
617 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
618 let len_bytes = &buf[pos + 1..line_end];
619 if len_bytes == b"?" {
620 return if tag == b'%' {
621 Ok((Frame::StreamedMapHeader, after_crlf))
622 } else {
623 Ok((Frame::StreamedAttributeHeader, after_crlf))
624 };
625 }
626 let count = parse_count(len_bytes)?;
627 let mut cursor = after_crlf;
628 let mut pairs = Vec::with_capacity(count);
629 for _ in 0..count {
630 let (key, next1) = parse_frame_inner(input, cursor)?;
631 let (val, next2) = parse_frame_inner(input, next1)?;
632 pairs.push((key, val));
633 cursor = next2;
634 }
635 if tag == b'%' {
636 Ok((Frame::Map(pairs), cursor))
637 } else {
638 Ok((Frame::Attribute(pairs), cursor))
639 }
640}
641
642#[inline(never)]
644fn parse_streamed_chunk(
645 input: &Bytes,
646 buf: &[u8],
647 pos: usize,
648) -> Result<(Frame, usize), ParseError> {
649 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
650 let len = parse_usize(&buf[pos + 1..line_end])?;
651 if len > MAX_BULK_STRING_SIZE {
652 return Err(ParseError::BadLength);
653 }
654 let (data_start, data_end) = parse_blob_bounds(buf, after_crlf, len)?;
655 if data_start == data_end {
656 Ok((Frame::StreamedStringChunk(Bytes::new()), after_crlf + 2))
657 } else {
658 Ok((
659 Frame::StreamedStringChunk(input.slice(data_start..data_end)),
660 data_end + 2,
661 ))
662 }
663}
664
665pub(crate) fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
669 let buf = input.as_ref();
670 if pos >= buf.len() {
671 return Err(ParseError::Incomplete);
672 }
673
674 let tag = buf[pos];
675
676 match tag {
677 b'+' => {
679 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
680 Ok((
681 Frame::SimpleString(input.slice(pos + 1..line_end)),
682 after_crlf,
683 ))
684 }
685 b'-' => {
686 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
687 Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
688 }
689 b':' => {
690 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
691 let v = parse_i64(&buf[pos + 1..line_end])?;
692 Ok((Frame::Integer(v), after_crlf))
693 }
694 b'#' => {
695 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
696 match &buf[pos + 1..line_end] {
697 b"t" => Ok((Frame::Boolean(true), after_crlf)),
698 b"f" => Ok((Frame::Boolean(false), after_crlf)),
699 _ => Err(ParseError::InvalidBoolean),
700 }
701 }
702 b'(' => {
703 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
704 Ok((Frame::BigNumber(input.slice(pos + 1..line_end)), after_crlf))
705 }
706 b'_' => {
707 if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
708 Ok((Frame::Null, pos + 3))
709 } else {
710 Err(ParseError::Incomplete)
711 }
712 }
713 b'.' => {
714 if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
715 Ok((Frame::StreamTerminator, pos + 3))
716 } else {
717 Err(ParseError::Incomplete)
718 }
719 }
720
721 b'$' => parse_bulk_string(input, buf, pos),
723 b',' => parse_double_frame(input, buf, pos),
724 b'=' => parse_verbatim(input, buf, pos),
725 b'!' => parse_blob_error(input, buf, pos),
726 b';' => parse_streamed_chunk(input, buf, pos),
727
728 b'*' | b'~' | b'>' => parse_collection(input, buf, pos, tag),
730 b'%' | b'|' => parse_pairs(input, buf, pos, tag),
731
732 _ => Err(ParseError::InvalidTag(tag)),
733 }
734}
735
736#[cfg(feature = "unsafe-internals")]
737#[path = "resp3_unchecked.rs"]
738mod unchecked;
739#[cfg(feature = "unsafe-internals")]
740pub use unchecked::parse_frame_unchecked;
741
742#[cfg(feature = "codec")]
743#[path = "resp3_codec.rs"]
744mod codec_impl;
745#[cfg(feature = "codec")]
746pub use codec_impl::Codec;
747
748pub fn parse_streaming_sequence(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
828 if input.is_empty() {
829 return Err(ParseError::Incomplete);
830 }
831
832 let (header, mut rest) = parse_frame(input)?;
833
834 match header {
835 Frame::StreamedStringHeader => {
836 let mut chunks = Vec::new();
838
839 loop {
840 let (frame, new_rest) = parse_frame(rest)?;
841 rest = new_rest;
842
843 match frame {
844 Frame::StreamedStringChunk(chunk) => {
845 if chunk.is_empty() {
846 break;
848 }
849 chunks.push(chunk);
850 }
851 _ => {
852 return Err(ParseError::InvalidFormat);
853 }
854 }
855 }
856
857 Ok((Frame::StreamedString(chunks), rest))
858 }
859 Frame::StreamedBlobErrorHeader | Frame::StreamedVerbatimStringHeader => {
860 Ok((header, rest))
863 }
864 Frame::StreamedArrayHeader => {
865 let mut items = Vec::new();
867
868 loop {
869 let (frame, new_rest) = parse_frame(rest)?;
870 rest = new_rest;
871
872 match frame {
873 Frame::StreamTerminator => {
874 break;
875 }
876 item => {
877 items.push(item);
878 }
879 }
880 }
881
882 Ok((Frame::StreamedArray(items), rest))
883 }
884 Frame::StreamedSetHeader => {
885 let mut items = Vec::new();
887
888 loop {
889 let (frame, new_rest) = parse_frame(rest)?;
890 rest = new_rest;
891
892 match frame {
893 Frame::StreamTerminator => {
894 break;
895 }
896 item => {
897 items.push(item);
898 }
899 }
900 }
901
902 Ok((Frame::StreamedSet(items), rest))
903 }
904 Frame::StreamedMapHeader => {
905 let mut pairs = Vec::new();
907
908 loop {
909 let (frame, new_rest) = parse_frame(rest)?;
910 rest = new_rest;
911
912 match frame {
913 Frame::StreamTerminator => {
914 break;
915 }
916 key => {
917 let (value, newer_rest) = parse_frame(rest)?;
918 if matches!(value, Frame::StreamTerminator) {
919 return Err(ParseError::InvalidFormat);
920 }
921 rest = newer_rest;
922 pairs.push((key, value));
923 }
924 }
925 }
926
927 Ok((Frame::StreamedMap(pairs), rest))
928 }
929 Frame::StreamedAttributeHeader => {
930 let mut pairs = Vec::new();
932
933 loop {
934 let (frame, new_rest) = parse_frame(rest)?;
935 rest = new_rest;
936
937 match frame {
938 Frame::StreamTerminator => {
939 break;
940 }
941 key => {
942 let (value, newer_rest) = parse_frame(rest)?;
943 if matches!(value, Frame::StreamTerminator) {
944 return Err(ParseError::InvalidFormat);
945 }
946 rest = newer_rest;
947 pairs.push((key, value));
948 }
949 }
950 }
951
952 Ok((Frame::StreamedAttribute(pairs), rest))
953 }
954 Frame::StreamedPushHeader => {
955 let mut items = Vec::new();
957
958 loop {
959 let (frame, new_rest) = parse_frame(rest)?;
960 rest = new_rest;
961
962 match frame {
963 Frame::StreamTerminator => {
964 break;
965 }
966 item => {
967 items.push(item);
968 }
969 }
970 }
971
972 Ok((Frame::StreamedPush(items), rest))
973 }
974 _ => {
975 Ok((header, rest))
977 }
978 }
979}
980
981#[inline]
984fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
985 let mut i = from;
986 let len = buf.len();
987 while i + 1 < len {
988 if buf[i] == b'\r' && buf[i + 1] == b'\n' {
989 return Ok((i, i + 2));
990 }
991 i += 1;
992 }
993 Err(ParseError::Incomplete)
994}
995
996#[inline]
998fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
999 if buf.is_empty() {
1000 return Err(ParseError::BadLength);
1001 }
1002 let mut v: usize = 0;
1003 for &b in buf {
1004 if !b.is_ascii_digit() {
1005 return Err(ParseError::BadLength);
1006 }
1007 v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
1008 v = v
1009 .checked_add((b - b'0') as usize)
1010 .ok_or(ParseError::BadLength)?;
1011 }
1012 Ok(v)
1013}
1014
1015#[inline]
1017fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
1018 if buf.is_empty() {
1019 return Err(ParseError::InvalidFormat);
1020 }
1021 let (neg, digits) = if buf[0] == b'-' {
1022 (true, &buf[1..])
1023 } else {
1024 (false, buf)
1025 };
1026 if digits.is_empty() {
1027 return Err(ParseError::InvalidFormat);
1028 }
1029 let mut v: i64 = 0;
1030 for (i, &d) in digits.iter().enumerate() {
1031 if !d.is_ascii_digit() {
1032 return Err(ParseError::InvalidFormat);
1033 }
1034 let digit = (d - b'0') as i64;
1035 if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
1036 return Ok(i64::MIN);
1037 }
1038 if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
1039 return Err(ParseError::Overflow);
1040 }
1041 v = v * 10 + digit;
1042 }
1043 if neg { Ok(-v) } else { Ok(v) }
1044}
1045
1046#[inline]
1048fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
1049 let count = parse_usize(buf)?;
1050 if count > MAX_COLLECTION_SIZE {
1051 return Err(ParseError::BadLength);
1052 }
1053 Ok(count)
1054}
1055
1056pub fn frame_to_bytes(frame: &Frame) -> Bytes {
1060 let mut buf = BytesMut::new();
1061 serialize_frame(frame, &mut buf);
1062 buf.freeze()
1063}
1064
1065fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
1066 match frame {
1067 Frame::SimpleString(s) => {
1068 buf.put_u8(b'+');
1069 buf.extend_from_slice(s);
1070 buf.extend_from_slice(b"\r\n");
1071 }
1072 Frame::Error(e) => {
1073 buf.put_u8(b'-');
1074 buf.extend_from_slice(e);
1075 buf.extend_from_slice(b"\r\n");
1076 }
1077 Frame::Integer(i) => {
1078 buf.put_u8(b':');
1079 let s = i.to_string();
1080 buf.extend_from_slice(s.as_bytes());
1081 buf.extend_from_slice(b"\r\n");
1082 }
1083 Frame::BulkString(opt) => {
1084 buf.put_u8(b'$');
1085 match opt {
1086 Some(data) => {
1087 let len = data.len().to_string();
1088 buf.extend_from_slice(len.as_bytes());
1089 buf.extend_from_slice(b"\r\n");
1090 buf.extend_from_slice(data);
1091 buf.extend_from_slice(b"\r\n");
1092 }
1093 None => {
1094 buf.extend_from_slice(b"-1\r\n");
1095 }
1096 }
1097 }
1098 Frame::BlobError(data) => {
1099 buf.put_u8(b'!');
1100 let len = data.len().to_string();
1101 buf.extend_from_slice(len.as_bytes());
1102 buf.extend_from_slice(b"\r\n");
1103 buf.extend_from_slice(data);
1104 buf.extend_from_slice(b"\r\n");
1105 }
1106 Frame::StreamedStringHeader => {
1107 buf.extend_from_slice(b"$?\r\n");
1108 }
1109 Frame::StreamedBlobErrorHeader => {
1110 buf.extend_from_slice(b"!?\r\n");
1111 }
1112 Frame::StreamedVerbatimStringHeader => {
1113 buf.extend_from_slice(b"=?\r\n");
1114 }
1115 Frame::StreamedArrayHeader => {
1116 buf.extend_from_slice(b"*?\r\n");
1117 }
1118 Frame::StreamedSetHeader => {
1119 buf.extend_from_slice(b"~?\r\n");
1120 }
1121 Frame::StreamedMapHeader => {
1122 buf.extend_from_slice(b"%?\r\n");
1123 }
1124 Frame::StreamedAttributeHeader => {
1125 buf.extend_from_slice(b"|?\r\n");
1126 }
1127 Frame::StreamedPushHeader => {
1128 buf.extend_from_slice(b">?\r\n");
1129 }
1130 Frame::StreamedStringChunk(data) => {
1131 buf.put_u8(b';');
1132 let len = data.len().to_string();
1133 buf.extend_from_slice(len.as_bytes());
1134 buf.extend_from_slice(b"\r\n");
1135 buf.extend_from_slice(data);
1136 buf.extend_from_slice(b"\r\n");
1137 }
1138 Frame::StreamedString(chunks) => {
1139 buf.extend_from_slice(b"$?\r\n");
1141 for chunk in chunks {
1142 buf.put_u8(b';');
1143 let len = chunk.len().to_string();
1144 buf.extend_from_slice(len.as_bytes());
1145 buf.extend_from_slice(b"\r\n");
1146 buf.extend_from_slice(chunk);
1147 buf.extend_from_slice(b"\r\n");
1148 }
1149 buf.extend_from_slice(b";0\r\n\r\n");
1150 }
1151 Frame::StreamedArray(items) => {
1152 buf.extend_from_slice(b"*?\r\n");
1153 for item in items {
1154 serialize_frame(item, buf);
1155 }
1156 buf.extend_from_slice(b".\r\n");
1157 }
1158 Frame::StreamedSet(items) => {
1159 buf.extend_from_slice(b"~?\r\n");
1160 for item in items {
1161 serialize_frame(item, buf);
1162 }
1163 buf.extend_from_slice(b".\r\n");
1164 }
1165 Frame::StreamedMap(pairs) => {
1166 buf.extend_from_slice(b"%?\r\n");
1167 for (key, value) in pairs {
1168 serialize_frame(key, buf);
1169 serialize_frame(value, buf);
1170 }
1171 buf.extend_from_slice(b".\r\n");
1172 }
1173 Frame::StreamedAttribute(pairs) => {
1174 buf.extend_from_slice(b"|?\r\n");
1175 for (key, value) in pairs {
1176 serialize_frame(key, buf);
1177 serialize_frame(value, buf);
1178 }
1179 buf.extend_from_slice(b".\r\n");
1180 }
1181 Frame::StreamedPush(items) => {
1182 buf.extend_from_slice(b">?\r\n");
1183 for item in items {
1184 serialize_frame(item, buf);
1185 }
1186 buf.extend_from_slice(b".\r\n");
1187 }
1188 Frame::StreamTerminator => {
1189 buf.extend_from_slice(b".\r\n");
1190 }
1191 Frame::Null => {
1192 buf.extend_from_slice(b"_\r\n");
1193 }
1194 Frame::Double(d) => {
1195 buf.put_u8(b',');
1196 let s = d.to_string();
1197 buf.extend_from_slice(s.as_bytes());
1198 buf.extend_from_slice(b"\r\n");
1199 }
1200 Frame::SpecialFloat(f) => {
1201 buf.put_u8(b',');
1202 buf.extend_from_slice(f);
1203 buf.extend_from_slice(b"\r\n");
1204 }
1205 Frame::Boolean(b) => {
1206 buf.extend_from_slice(if *b { b"#t\r\n" } else { b"#f\r\n" });
1207 }
1208 Frame::BigNumber(n) => {
1209 buf.put_u8(b'(');
1210 buf.extend_from_slice(n);
1211 buf.extend_from_slice(b"\r\n");
1212 }
1213 Frame::VerbatimString(format, content) => {
1214 buf.put_u8(b'=');
1215 let total_len = format.len() + 1 + content.len(); let len = total_len.to_string();
1217 buf.extend_from_slice(len.as_bytes());
1218 buf.extend_from_slice(b"\r\n");
1219 buf.extend_from_slice(format);
1220 buf.put_u8(b':');
1221 buf.extend_from_slice(content);
1222 buf.extend_from_slice(b"\r\n");
1223 }
1224 Frame::Array(opt) => {
1225 buf.put_u8(b'*');
1226 match opt {
1227 Some(items) => {
1228 let len = items.len().to_string();
1229 buf.extend_from_slice(len.as_bytes());
1230 buf.extend_from_slice(b"\r\n");
1231 for item in items {
1232 serialize_frame(item, buf);
1233 }
1234 }
1235 None => {
1236 buf.extend_from_slice(b"-1\r\n");
1237 }
1238 }
1239 }
1240 Frame::Set(items) => {
1241 buf.put_u8(b'~');
1242 let len = items.len().to_string();
1243 buf.extend_from_slice(len.as_bytes());
1244 buf.extend_from_slice(b"\r\n");
1245 for item in items {
1246 serialize_frame(item, buf);
1247 }
1248 }
1249 Frame::Map(pairs) => {
1250 buf.put_u8(b'%');
1251 let len = pairs.len().to_string();
1252 buf.extend_from_slice(len.as_bytes());
1253 buf.extend_from_slice(b"\r\n");
1254 for (key, value) in pairs {
1255 serialize_frame(key, buf);
1256 serialize_frame(value, buf);
1257 }
1258 }
1259 Frame::Attribute(pairs) => {
1260 buf.put_u8(b'|');
1261 let len = pairs.len().to_string();
1262 buf.extend_from_slice(len.as_bytes());
1263 buf.extend_from_slice(b"\r\n");
1264 for (key, value) in pairs {
1265 serialize_frame(key, buf);
1266 serialize_frame(value, buf);
1267 }
1268 }
1269 Frame::Push(items) => {
1270 buf.put_u8(b'>');
1271 let len = items.len().to_string();
1272 buf.extend_from_slice(len.as_bytes());
1273 buf.extend_from_slice(b"\r\n");
1274 for item in items {
1275 serialize_frame(item, buf);
1276 }
1277 }
1278 }
1279}
1280
1281#[cfg(test)]
1282mod tests {
1283 use super::{Frame, ParseError, Parser, frame_to_bytes, parse_frame, parse_streaming_sequence};
1284 use bytes::Bytes;
1285
1286 #[test]
1287 fn test_parse_frame_simple_string() {
1288 let input = Bytes::from("+HELLO\r\nWORLD");
1289 let (frame, rest) = parse_frame(input.clone()).unwrap();
1290 assert_eq!(frame, Frame::SimpleString(Bytes::from("HELLO")));
1291 assert_eq!(rest, Bytes::from("WORLD"));
1292 }
1293
1294 #[test]
1295 fn test_parse_frame_blob_error() {
1296 let input = Bytes::from("!5\r\nERROR\r\nREST");
1297 let (frame, rest) = parse_frame(input.clone()).unwrap();
1298 assert_eq!(frame, Frame::BlobError(Bytes::from("ERROR")));
1299 assert_eq!(rest, Bytes::from("REST"));
1300 }
1301
1302 #[test]
1303 fn test_parse_frame_error() {
1304 let input = Bytes::from("-ERR fail\r\nLEFT");
1305 let (frame, rest) = parse_frame(input.clone()).unwrap();
1306 assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
1307 assert_eq!(rest, Bytes::from("LEFT"));
1308 }
1309
1310 #[test]
1311 fn test_parse_frame_integer() {
1312 let input = Bytes::from(":42\r\nTAIL");
1313 let (frame, rest) = parse_frame(input.clone()).unwrap();
1314 assert_eq!(frame, Frame::Integer(42));
1315 assert_eq!(rest, Bytes::from("TAIL"));
1316 }
1317
1318 #[test]
1319 fn test_parse_frame_bulk_string() {
1320 let input = Bytes::from("$3\r\nfoo\r\nREST");
1321 let (frame, rest) = parse_frame(input.clone()).unwrap();
1322 assert_eq!(frame, Frame::BulkString(Some(Bytes::from("foo"))));
1323 assert_eq!(rest, Bytes::from("REST"));
1324 let null_input = Bytes::from("$-1\r\nAFTER");
1325 let (frame, rest) = parse_frame(null_input.clone()).unwrap();
1326 assert_eq!(frame, Frame::BulkString(None));
1327 assert_eq!(rest, Bytes::from("AFTER"));
1328 }
1329
1330 #[test]
1331 fn test_parse_frame_null() {
1332 let input = Bytes::from("_\r\nLEFT");
1333 let (frame, rest) = parse_frame(input.clone()).unwrap();
1334 assert_eq!(frame, Frame::Null);
1335 assert_eq!(rest, Bytes::from("LEFT"));
1336 }
1337
1338 #[test]
1339 fn test_parse_frame_double_and_special_float() {
1340 let input = Bytes::from(",3.5\r\nNEXT");
1341 let (frame, rest) = parse_frame(input.clone()).unwrap();
1342 assert_eq!(frame, Frame::Double(3.5));
1343 assert_eq!(rest, Bytes::from("NEXT"));
1344 let input_inf = Bytes::from(",inf\r\nTAIL");
1345 let (frame, rest) = parse_frame(input_inf.clone()).unwrap();
1346 assert_eq!(frame, Frame::SpecialFloat(Bytes::from("inf")));
1347 assert_eq!(rest, Bytes::from("TAIL"));
1348 }
1349
1350 #[test]
1351 fn test_parse_frame_boolean() {
1352 let input_true = Bytes::from("#t\r\nXYZ");
1353 let (frame, rest) = parse_frame(input_true.clone()).unwrap();
1354 assert_eq!(frame, Frame::Boolean(true));
1355 assert_eq!(rest, Bytes::from("XYZ"));
1356 let input_false = Bytes::from("#f\r\nDONE");
1357 let (frame, rest) = parse_frame(input_false.clone()).unwrap();
1358 assert_eq!(frame, Frame::Boolean(false));
1359 assert_eq!(rest, Bytes::from("DONE"));
1360 }
1361
1362 #[test]
1363 fn test_parse_frame_big_number() {
1364 let input = Bytes::from("(123456789\r\nEND");
1365 let (frame, rest) = parse_frame(input.clone()).unwrap();
1366 assert_eq!(frame, Frame::BigNumber(Bytes::from("123456789")));
1367 assert_eq!(rest, Bytes::from("END"));
1368 }
1369
1370 #[test]
1371 fn test_parse_frame_verbatim_string() {
1372 let input = Bytes::from("=12\r\ntxt:hi there\r\nAFTER");
1373 let (frame, rest) = parse_frame(input.clone()).unwrap();
1374 assert_eq!(
1375 frame,
1376 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hi there")) );
1381 assert_eq!(rest, Bytes::from("AFTER"));
1382 }
1383
1384 #[test]
1385 fn test_parse_frame_array_set_push_map_attribute() {
1386 let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\nTAIL");
1388 let (frame, rest) = parse_frame(input.clone()).unwrap();
1389 assert_eq!(
1390 frame,
1391 Frame::Array(Some(vec![
1392 Frame::BulkString(Some(Bytes::from("foo"))),
1393 Frame::BulkString(Some(Bytes::from("bar")))
1394 ]))
1395 );
1396 assert_eq!(rest, Bytes::from("TAIL"));
1397 let input_null = Bytes::from("*-1\r\nEND");
1399 let (frame, rest) = parse_frame(input_null.clone()).unwrap();
1400 assert_eq!(frame, Frame::Array(None));
1401 assert_eq!(rest, Bytes::from("END"));
1402 let input_set = Bytes::from("~2\r\n+foo\r\n+bar\r\nTAIL");
1404 let (frame, rest) = parse_frame(input_set.clone()).unwrap();
1405 assert_eq!(
1406 frame,
1407 Frame::Set(vec![
1408 Frame::SimpleString(Bytes::from("foo")),
1409 Frame::SimpleString(Bytes::from("bar")),
1410 ])
1411 );
1412 assert_eq!(rest, Bytes::from("TAIL"));
1413 let input_map = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\nTRAIL");
1415 let (frame, rest) = parse_frame(input_map.clone()).unwrap();
1416 assert_eq!(
1417 frame,
1418 Frame::Map(vec![
1419 (
1420 Frame::SimpleString(Bytes::from("key1")),
1421 Frame::SimpleString(Bytes::from("val1"))
1422 ),
1423 (
1424 Frame::SimpleString(Bytes::from("key2")),
1425 Frame::SimpleString(Bytes::from("val2"))
1426 ),
1427 ])
1428 );
1429 assert_eq!(rest, Bytes::from("TRAIL"));
1430 let input_attr = Bytes::from("|1\r\n+meta\r\n+data\r\nAFTER");
1432 let (frame, rest) = parse_frame(input_attr.clone()).unwrap();
1433 assert_eq!(
1434 frame,
1435 Frame::Attribute(vec![(
1436 Frame::SimpleString(Bytes::from("meta")),
1437 Frame::SimpleString(Bytes::from("data"))
1438 ),])
1439 );
1440 assert_eq!(rest, Bytes::from("AFTER"));
1441 let input_push = Bytes::from(">2\r\n+type\r\n:1\r\nNEXT");
1443 let (frame, rest) = parse_frame(input_push.clone()).unwrap();
1444 assert_eq!(
1445 frame,
1446 Frame::Push(vec![
1447 Frame::SimpleString(Bytes::from("type")),
1448 Frame::Integer(1),
1449 ])
1450 );
1451 assert_eq!(rest, Bytes::from("NEXT"));
1452 }
1453
1454 #[test]
1455 fn test_parse_frame_empty_input() {
1456 assert!(parse_frame(Bytes::new()).is_err());
1457 }
1458
1459 #[test]
1460 fn test_parse_frame_invalid_tag() {
1461 let input = Bytes::from("X123\r\n");
1462 assert!(parse_frame(input).is_err());
1463 }
1464
1465 #[test]
1466 fn test_parse_frame_malformed_bulk_length() {
1467 let input = Bytes::from("$x\r\nfoo\r\n");
1468 assert!(parse_frame(input).is_err());
1469 }
1470
1471 #[test]
1472 fn test_parse_frame_zero_length_bulk() {
1473 let input = Bytes::from("$0\r\n\r\nTAIL");
1474 let (frame, rest) = parse_frame(input.clone()).unwrap();
1475 assert_eq!(frame, Frame::BulkString(Some(Bytes::from(""))));
1476 assert_eq!(rest, Bytes::from("TAIL"));
1477 }
1478
1479 #[test]
1480 fn test_parse_frame_zero_length_blob_error() {
1481 let input = Bytes::from("!0\r\n\r\nREST");
1482 let (frame, rest) = parse_frame(input.clone()).unwrap();
1483 assert_eq!(frame, Frame::BlobError(Bytes::new()));
1484 assert_eq!(rest, Bytes::from("REST"));
1485 }
1486
1487 #[test]
1488 fn test_parse_frame_missing_crlf() {
1489 let input = Bytes::from(":42\nTAIL");
1490 assert!(parse_frame(input).is_err());
1491 }
1492
1493 #[test]
1494 fn test_parse_frame_unicode_simple_string() {
1495 let input = Bytes::from("+こんにちは\r\nEND");
1496 let (frame, rest) = parse_frame(input.clone()).unwrap();
1497 assert_eq!(frame, Frame::SimpleString(Bytes::from("こんにちは")));
1498 assert_eq!(rest, Bytes::from("END"));
1499 }
1500
1501 #[test]
1502 fn test_parse_frame_chained_frames() {
1503 let combined = Bytes::from("+OK\r\n:1\r\nfoo");
1504 let (f1, rem) = parse_frame(combined.clone()).unwrap();
1505 assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
1506 let (f2, rem2) = parse_frame(rem).unwrap();
1507 assert_eq!(f2, Frame::Integer(1));
1508 assert_eq!(rem2, Bytes::from("foo"));
1509 }
1510
1511 #[test]
1512 fn test_parse_frame_empty_array() {
1513 let input = Bytes::from("*0\r\nTAIL");
1514 let (frame, rest) = parse_frame(input.clone()).unwrap();
1515 assert_eq!(frame, Frame::Array(Some(vec![])));
1516 assert_eq!(rest, Bytes::from("TAIL"));
1517 }
1518
1519 #[test]
1520 fn test_parse_frame_partial_array_data() {
1521 let input = Bytes::from("*2\r\n+OK\r\n");
1522 assert!(parse_frame(input).is_err());
1523 }
1524
1525 #[test]
1526 fn test_parse_frame_streamed_string() {
1527 let input = Bytes::from("$?\r\n$5\r\nhello\r\n$0\r\n\r\nREST");
1528 let (frame, rem) = parse_frame(input.clone()).unwrap();
1529 assert_eq!(frame, Frame::StreamedStringHeader);
1530 let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1531 assert_eq!(chunk, Frame::BulkString(Some(Bytes::from("hello"))));
1532 let (terminator, rest) = parse_frame(rem2.clone()).unwrap();
1533 assert_eq!(terminator, Frame::BulkString(Some(Bytes::from(""))));
1534 assert_eq!(rest, Bytes::from("REST"));
1535 }
1536
1537 #[test]
1538 fn test_parse_frame_streamed_blob_error() {
1539 let input = Bytes::from("!?\r\n!5\r\nERROR\r\nREST");
1540 let (frame, rem) = parse_frame(input.clone()).unwrap();
1541 assert_eq!(frame, Frame::StreamedBlobErrorHeader);
1542 let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1543 assert_eq!(chunk, Frame::BlobError(Bytes::from("ERROR")));
1544 assert_eq!(rem2, Bytes::from("REST"));
1545 }
1546
1547 #[test]
1548 fn test_parse_frame_streamed_verbatim_string() {
1549 let input = Bytes::from("=?\r\n=9\r\ntxt:hello\r\nTAIL");
1550 let (frame, rem) = parse_frame(input.clone()).unwrap();
1551 assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
1552 let (chunk, rest) = parse_frame(rem.clone()).unwrap();
1553 assert_eq!(
1554 chunk,
1555 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello")) );
1560 assert_eq!(rest, Bytes::from("TAIL"));
1561 }
1562
1563 #[test]
1564 fn test_parse_frame_streamed_array() {
1565 let input = Bytes::from("*?\r\n+one\r\n+two\r\n*0\r\nEND");
1566 let (header, rem) = parse_frame(input.clone()).unwrap();
1567 assert_eq!(header, Frame::StreamedArrayHeader);
1568 let (item1, rem2) = parse_frame(rem.clone()).unwrap();
1569 assert_eq!(item1, Frame::SimpleString(Bytes::from("one")));
1570 let (item2, rem3) = parse_frame(rem2.clone()).unwrap();
1571 assert_eq!(item2, Frame::SimpleString(Bytes::from("two")));
1572 let (terminator, rest) = parse_frame(rem3.clone()).unwrap();
1573 assert_eq!(terminator, Frame::Array(Some(vec![])));
1574 assert_eq!(rest, Bytes::from("END"));
1575 }
1576
1577 #[test]
1578 fn test_parse_frame_streamed_set_map_attr_push() {
1579 let input_set = Bytes::from("~?\r\n+foo\r\n+bar\r\n~0\r\nTAIL");
1581 let (h_set, rem0) = parse_frame(input_set.clone()).unwrap();
1582 assert_eq!(h_set, Frame::StreamedSetHeader);
1583 let (s1, rem1) = parse_frame(rem0.clone()).unwrap();
1584 assert_eq!(s1, Frame::SimpleString(Bytes::from("foo")));
1585 let (s2, rem2) = parse_frame(rem1.clone()).unwrap();
1586 assert_eq!(s2, Frame::SimpleString(Bytes::from("bar")));
1587 let (term_set, rest_set) = parse_frame(rem2.clone()).unwrap();
1588 assert_eq!(term_set, Frame::Set(vec![]));
1589 assert_eq!(rest_set, Bytes::from("TAIL"));
1590 let input_map = Bytes::from("%?\r\n+key\r\n+val\r\n%0\r\nNEXT");
1592 let (h_map, rem_map) = parse_frame(input_map.clone()).unwrap();
1593 assert_eq!(h_map, Frame::StreamedMapHeader);
1594 let (k, rem_map2) = parse_frame(rem_map.clone()).unwrap();
1595 assert_eq!(k, Frame::SimpleString(Bytes::from("key")));
1596 let (v, rem_map3) = parse_frame(rem_map2.clone()).unwrap();
1597 assert_eq!(v, Frame::SimpleString(Bytes::from("val")));
1598 let (term_map, rest_map4) = parse_frame(rem_map3.clone()).unwrap();
1599 assert_eq!(term_map, Frame::Map(vec![]));
1600 assert_eq!(rest_map4, Bytes::from("NEXT"));
1601 let input_attr = Bytes::from("|?\r\n+meta\r\n+info\r\n|0\r\nMORE");
1603 let (h_attr, rem_attr) = parse_frame(input_attr.clone()).unwrap();
1604 assert_eq!(h_attr, Frame::StreamedAttributeHeader);
1605 let (a1, rem_attr2) = parse_frame(rem_attr.clone()).unwrap();
1606 assert_eq!(a1, Frame::SimpleString(Bytes::from("meta")));
1607 let (a2, rem_attr3) = parse_frame(rem_attr2.clone()).unwrap();
1608 assert_eq!(a2, Frame::SimpleString(Bytes::from("info")));
1609 let (term_attr, rest_attr) = parse_frame(rem_attr3.clone()).unwrap();
1610 assert_eq!(term_attr, Frame::Attribute(vec![]));
1611 assert_eq!(rest_attr, Bytes::from("MORE"));
1612 let input_push = Bytes::from(">?\r\n:1\r\n:2\r\n>0\r\nEND");
1614 let (h_push, rem_push) = parse_frame(input_push.clone()).unwrap();
1615 assert_eq!(h_push, Frame::StreamedPushHeader);
1616 let (p1, rem_push2) = parse_frame(rem_push.clone()).unwrap();
1617 assert_eq!(p1, Frame::Integer(1));
1618 let (p2, rem_push3) = parse_frame(rem_push2.clone()).unwrap();
1619 assert_eq!(p2, Frame::Integer(2));
1620 let (term_push, rest_push) = parse_frame(rem_push3.clone()).unwrap();
1621 assert_eq!(term_push, Frame::Push(vec![]));
1622 assert_eq!(rest_push, Bytes::from("END"));
1623 }
1624
1625 #[test]
1626 fn test_parse_frame_stream_terminator() {
1627 let input = Bytes::from(".\r\nREST");
1628 let (frame, rest) = parse_frame(input.clone()).unwrap();
1629 assert_eq!(frame, Frame::StreamTerminator);
1630 assert_eq!(rest, Bytes::from("REST"));
1631 }
1632
1633 #[test]
1634 fn test_parse_frame_null_blob_error_rejected() {
1635 let input = Bytes::from("!-1\r\nTAIL");
1636 assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1637 }
1638
1639 #[test]
1640 fn test_parse_frame_null_verbatim_rejected() {
1641 let input = Bytes::from("=-1\r\nTAIL");
1642 assert_eq!(parse_frame(input), Err(ParseError::BadLength));
1643 }
1644
1645 #[test]
1646 fn test_verbatim_string_format_must_be_3_bytes() {
1647 let input = Bytes::from("=6\r\nx:data\r\n");
1649 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1650
1651 let input = Bytes::from("=9\r\ntxtx:data\r\n");
1653 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1654
1655 let input = Bytes::from("=5\r\n:data\r\n");
1657 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
1658
1659 let input = Bytes::from("=8\r\ntxt:data\r\n");
1661 let (frame, _) = parse_frame(input).unwrap();
1662 assert_eq!(
1663 frame,
1664 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("data"))
1665 );
1666 }
1667
1668 #[test]
1669 fn test_parse_frame_special_float_nan() {
1670 let input = Bytes::from(",nan\r\nTAIL");
1671 let (frame, rest) = parse_frame(input.clone()).unwrap();
1672 assert_eq!(frame, Frame::SpecialFloat(Bytes::from("nan")));
1673 assert_eq!(rest, Bytes::from("TAIL"));
1674 }
1675
1676 #[test]
1677 fn test_parse_frame_big_number_zero() {
1678 let input = Bytes::from("(0\r\nEND");
1679 let (frame, rest) = parse_frame(input.clone()).unwrap();
1680 assert_eq!(frame, Frame::BigNumber(Bytes::from("0")));
1681 assert_eq!(rest, Bytes::from("END"));
1682 }
1683
1684 #[test]
1685 fn test_parse_frame_collection_empty() {
1686 let input_push = Bytes::from(">0\r\nTAIL");
1687 let (f_push, r_push) = parse_frame(input_push.clone()).unwrap();
1688 assert_eq!(f_push, Frame::Push(vec![]));
1689 assert_eq!(r_push, Bytes::from("TAIL"));
1690 let input_attr = Bytes::from("|0\r\nAFTER");
1691 let (f_attr, r_attr) = parse_frame(input_attr.clone()).unwrap();
1692 assert_eq!(f_attr, Frame::Attribute(vec![]));
1693 assert_eq!(r_attr, Bytes::from("AFTER"));
1694 let input_map = Bytes::from("%0\r\nEND");
1695 let (f_map, r_map) = parse_frame(input_map.clone()).unwrap();
1696 assert_eq!(f_map, Frame::Map(vec![]));
1697 assert_eq!(r_map, Bytes::from("END"));
1698 let input_set = Bytes::from("~0\r\nDONE");
1699 let (f_set, r_set) = parse_frame(input_set.clone()).unwrap();
1700 assert_eq!(f_set, Frame::Set(vec![]));
1701 assert_eq!(r_set, Bytes::from("DONE"));
1702 let input_arr = Bytes::from("*-1\r\nFIN");
1703 let (f_arr, r_arr) = parse_frame(input_arr.clone()).unwrap();
1704 assert_eq!(f_arr, Frame::Array(None));
1705 assert_eq!(r_arr, Bytes::from("FIN"));
1706 }
1707
1708 #[test]
1711 fn test_roundtrip_simple_string() {
1712 let original = Bytes::from("+hello\r\n");
1713 let (frame, _) = parse_frame(original.clone()).unwrap();
1714 let serialized = frame_to_bytes(&frame);
1715 assert_eq!(original, serialized);
1716
1717 let (reparsed, _) = parse_frame(serialized).unwrap();
1718 assert_eq!(frame, reparsed);
1719 }
1720
1721 #[test]
1722 fn test_roundtrip_error() {
1723 let original = Bytes::from("-ERR error message\r\n");
1724 let (frame, _) = parse_frame(original.clone()).unwrap();
1725 let serialized = frame_to_bytes(&frame);
1726 assert_eq!(original, serialized);
1727
1728 let (reparsed, _) = parse_frame(serialized).unwrap();
1729 assert_eq!(frame, reparsed);
1730 }
1731
1732 #[test]
1733 fn test_roundtrip_integer() {
1734 let original = Bytes::from(":12345\r\n");
1735 let (frame, _) = parse_frame(original.clone()).unwrap();
1736 let serialized = frame_to_bytes(&frame);
1737 assert_eq!(original, serialized);
1738
1739 let (reparsed, _) = parse_frame(serialized).unwrap();
1740 assert_eq!(frame, reparsed);
1741 }
1742
1743 #[test]
1744 fn test_roundtrip_bulk_string() {
1745 let original = Bytes::from("$5\r\nhello\r\n");
1746 let (frame, _) = parse_frame(original.clone()).unwrap();
1747 let serialized = frame_to_bytes(&frame);
1748 assert_eq!(original, serialized);
1749
1750 let (reparsed, _) = parse_frame(serialized).unwrap();
1751 assert_eq!(frame, reparsed);
1752
1753 let original_null = Bytes::from("$-1\r\n");
1755 let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1756 let serialized_null = frame_to_bytes(&frame_null);
1757 assert_eq!(original_null, serialized_null);
1758
1759 let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1760 assert_eq!(frame_null, reparsed_null);
1761 }
1762
1763 #[test]
1764 fn test_roundtrip_blob_error() {
1765 let original = Bytes::from("!5\r\nerror\r\n");
1766 let (frame, _) = parse_frame(original.clone()).unwrap();
1767 let serialized = frame_to_bytes(&frame);
1768 assert_eq!(original, serialized);
1769
1770 let (reparsed, _) = parse_frame(serialized).unwrap();
1771 assert_eq!(frame, reparsed);
1772 }
1773
1774 #[test]
1775 fn test_roundtrip_null() {
1776 let original = Bytes::from("_\r\n");
1777 let (frame, _) = parse_frame(original.clone()).unwrap();
1778 let serialized = frame_to_bytes(&frame);
1779 assert_eq!(original, serialized);
1780
1781 let (reparsed, _) = parse_frame(serialized).unwrap();
1782 assert_eq!(frame, reparsed);
1783 }
1784
1785 #[test]
1786 fn test_roundtrip_double() {
1787 let original = Bytes::from(",3.14159\r\n");
1788 let (frame, _) = parse_frame(original.clone()).unwrap();
1789 let serialized = frame_to_bytes(&frame);
1790
1791 let (reparsed, _) = parse_frame(serialized).unwrap();
1794 assert_eq!(frame, reparsed);
1795 }
1796
1797 #[test]
1798 fn test_roundtrip_special_float() {
1799 let original = Bytes::from(",inf\r\n");
1800 let (frame, _) = parse_frame(original.clone()).unwrap();
1801 let serialized = frame_to_bytes(&frame);
1802 assert_eq!(original, serialized);
1803
1804 let (reparsed, _) = parse_frame(serialized).unwrap();
1805 assert_eq!(frame, reparsed);
1806 }
1807
1808 #[test]
1809 fn test_roundtrip_boolean() {
1810 let original_true = Bytes::from("#t\r\n");
1811 let (frame_true, _) = parse_frame(original_true.clone()).unwrap();
1812 let serialized_true = frame_to_bytes(&frame_true);
1813 assert_eq!(original_true, serialized_true);
1814
1815 let (reparsed_true, _) = parse_frame(serialized_true).unwrap();
1816 assert_eq!(frame_true, reparsed_true);
1817
1818 let original_false = Bytes::from("#f\r\n");
1819 let (frame_false, _) = parse_frame(original_false.clone()).unwrap();
1820 let serialized_false = frame_to_bytes(&frame_false);
1821 assert_eq!(original_false, serialized_false);
1822
1823 let (reparsed_false, _) = parse_frame(serialized_false).unwrap();
1824 assert_eq!(frame_false, reparsed_false);
1825 }
1826
1827 #[test]
1828 fn test_roundtrip_big_number() {
1829 let original = Bytes::from("(12345678901234567890\r\n");
1830 let (frame, _) = parse_frame(original.clone()).unwrap();
1831 let serialized = frame_to_bytes(&frame);
1832 assert_eq!(original, serialized);
1833
1834 let (reparsed, _) = parse_frame(serialized).unwrap();
1835 assert_eq!(frame, reparsed);
1836 }
1837
1838 #[test]
1839 fn test_roundtrip_verbatim_string() {
1840 let original = Bytes::from("=10\r\ntxt:hello!\r\n");
1841 let (frame, _) = parse_frame(original.clone()).unwrap();
1842 let serialized = frame_to_bytes(&frame);
1843 assert_eq!(original, serialized);
1844
1845 let (reparsed, _) = parse_frame(serialized).unwrap();
1846 assert_eq!(frame, reparsed);
1847 }
1848
1849 #[test]
1850 fn test_roundtrip_array() {
1851 let original = Bytes::from("*2\r\n+hello\r\n:123\r\n");
1852 let (frame, _) = parse_frame(original.clone()).unwrap();
1853 let serialized = frame_to_bytes(&frame);
1854 assert_eq!(original, serialized);
1855
1856 let (reparsed, _) = parse_frame(serialized).unwrap();
1857 assert_eq!(frame, reparsed);
1858
1859 let original_null = Bytes::from("*-1\r\n");
1861 let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1862 let serialized_null = frame_to_bytes(&frame_null);
1863 assert_eq!(original_null, serialized_null);
1864
1865 let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1866 assert_eq!(frame_null, reparsed_null);
1867 }
1868
1869 #[test]
1870 fn test_roundtrip_set() {
1871 let original = Bytes::from("~2\r\n+one\r\n+two\r\n");
1872 let (frame, _) = parse_frame(original.clone()).unwrap();
1873 let serialized = frame_to_bytes(&frame);
1874 assert_eq!(original, serialized);
1875
1876 let (reparsed, _) = parse_frame(serialized).unwrap();
1877 assert_eq!(frame, reparsed);
1878 }
1879
1880 #[test]
1881 fn test_roundtrip_map() {
1882 let original = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\n");
1883 let (frame, _) = parse_frame(original.clone()).unwrap();
1884 let serialized = frame_to_bytes(&frame);
1885 assert_eq!(original, serialized);
1886
1887 let (reparsed, _) = parse_frame(serialized).unwrap();
1888 assert_eq!(frame, reparsed);
1889 }
1890
1891 #[test]
1892 fn test_roundtrip_attribute() {
1893 let original = Bytes::from("|1\r\n+key\r\n+val\r\n");
1894 let (frame, _) = parse_frame(original.clone()).unwrap();
1895 let serialized = frame_to_bytes(&frame);
1896 assert_eq!(original, serialized);
1897
1898 let (reparsed, _) = parse_frame(serialized).unwrap();
1899 assert_eq!(frame, reparsed);
1900 }
1901
1902 #[test]
1903 fn test_roundtrip_push() {
1904 let original = Bytes::from(">2\r\n+msg\r\n+data\r\n");
1905 let (frame, _) = parse_frame(original.clone()).unwrap();
1906 let serialized = frame_to_bytes(&frame);
1907 assert_eq!(original, serialized);
1908
1909 let (reparsed, _) = parse_frame(serialized).unwrap();
1910 assert_eq!(frame, reparsed);
1911 }
1912
1913 #[test]
1914 fn test_roundtrip_streaming_headers() {
1915 let headers = [
1916 ("$?\r\n", Frame::StreamedStringHeader),
1917 ("!?\r\n", Frame::StreamedBlobErrorHeader),
1918 ("=?\r\n", Frame::StreamedVerbatimStringHeader),
1919 ("*?\r\n", Frame::StreamedArrayHeader),
1920 ("~?\r\n", Frame::StreamedSetHeader),
1921 ("%?\r\n", Frame::StreamedMapHeader),
1922 ("|?\r\n", Frame::StreamedAttributeHeader),
1923 (">?\r\n", Frame::StreamedPushHeader),
1924 (".\r\n", Frame::StreamTerminator),
1925 ];
1926
1927 for (original_str, expected_frame) in headers {
1928 let original = Bytes::from(original_str);
1929 let (frame, _) = parse_frame(original.clone()).unwrap();
1930 assert_eq!(frame, expected_frame);
1931
1932 let serialized = frame_to_bytes(&frame);
1933 assert_eq!(original, serialized);
1934
1935 let (reparsed, _) = parse_frame(serialized).unwrap();
1936 assert_eq!(frame, reparsed);
1937 }
1938 }
1939
1940 #[test]
1941 fn test_roundtrip_streaming_chunks() {
1942 let chunks = [
1943 (
1944 ";4\r\nHell\r\n",
1945 Frame::StreamedStringChunk(Bytes::from("Hell")),
1946 ),
1947 (
1948 ";5\r\no wor\r\n",
1949 Frame::StreamedStringChunk(Bytes::from("o wor")),
1950 ),
1951 (";1\r\nd\r\n", Frame::StreamedStringChunk(Bytes::from("d"))),
1952 (";0\r\n\r\n", Frame::StreamedStringChunk(Bytes::new())),
1953 (
1954 ";11\r\nHello World\r\n",
1955 Frame::StreamedStringChunk(Bytes::from("Hello World")),
1956 ),
1957 ];
1958
1959 for (original_str, expected_frame) in chunks {
1960 let original = Bytes::from(original_str);
1961 let (frame, rest) = parse_frame(original.clone()).unwrap();
1962 assert_eq!(frame, expected_frame);
1963 assert!(rest.is_empty());
1964
1965 let serialized = frame_to_bytes(&frame);
1966 assert_eq!(original, serialized);
1967
1968 let (reparsed, _) = parse_frame(serialized).unwrap();
1969 assert_eq!(frame, reparsed);
1970 }
1971 }
1972
1973 #[test]
1974 fn test_streaming_chunks_edge_cases() {
1975 let data = Bytes::from(";4\r\nHel");
1977 let result = parse_frame(data);
1978 assert!(matches!(result, Err(ParseError::Incomplete)));
1979
1980 let data = Bytes::from(";4\r\nHell");
1982 let result = parse_frame(data);
1983 assert!(matches!(result, Err(ParseError::Incomplete)));
1984
1985 let data = Bytes::from(";abc\r\ndata\r\n");
1987 let result = parse_frame(data);
1988 assert!(matches!(result, Err(ParseError::BadLength)));
1989
1990 let data = Bytes::from(";-1\r\ndata\r\n");
1992 let result = parse_frame(data);
1993 assert!(matches!(result, Err(ParseError::BadLength)));
1994
1995 let data = Bytes::from(";5\r\nHell\r\n");
1997 let result = parse_frame(data);
1998 assert!(matches!(result, Err(ParseError::Incomplete)));
1999
2000 let data = Bytes::from(";0\r\n");
2002 let result = parse_frame(data);
2003 assert!(matches!(result, Err(ParseError::Incomplete)));
2004
2005 let binary_data = b"\x00\x01\x02\x03\xFF";
2007 let mut chunk_data = Vec::new();
2008 chunk_data.extend_from_slice(b";5\r\n");
2009 chunk_data.extend_from_slice(binary_data);
2010 chunk_data.extend_from_slice(b"\r\n");
2011 let data = Bytes::from(chunk_data);
2012 let result = parse_frame(data);
2013 assert!(result.is_ok());
2014 let (frame, _) = result.unwrap();
2015 if let Frame::StreamedStringChunk(chunk) = frame {
2016 assert_eq!(chunk.as_ref(), binary_data);
2017 }
2018 }
2019
2020 #[test]
2021 fn test_roundtrip_streaming_sequences() {
2022 let streaming_string = Frame::StreamedString(vec![
2024 Bytes::from("Hell"),
2025 Bytes::from("o wor"),
2026 Bytes::from("ld"),
2027 ]);
2028 let serialized = frame_to_bytes(&streaming_string);
2029 let expected = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;2\r\nld\r\n;0\r\n\r\n";
2030 assert_eq!(serialized, Bytes::from(expected));
2031
2032 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
2033 assert_eq!(parsed, streaming_string);
2034
2035 let streaming_array = Frame::StreamedArray(vec![
2037 Frame::SimpleString(Bytes::from("hello")),
2038 Frame::Integer(42),
2039 Frame::Boolean(true),
2040 ]);
2041 let serialized = frame_to_bytes(&streaming_array);
2042 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
2043 assert_eq!(parsed, streaming_array);
2044
2045 let streaming_map = Frame::StreamedMap(vec![
2047 (
2048 Frame::SimpleString(Bytes::from("key1")),
2049 Frame::SimpleString(Bytes::from("val1")),
2050 ),
2051 (
2052 Frame::SimpleString(Bytes::from("key2")),
2053 Frame::Integer(123),
2054 ),
2055 ]);
2056 let serialized = frame_to_bytes(&streaming_map);
2057 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
2058 assert_eq!(parsed, streaming_map);
2059
2060 let empty_streaming = Frame::StreamedString(vec![]);
2062 let serialized = frame_to_bytes(&empty_streaming);
2063 let expected = "$?\r\n;0\r\n\r\n";
2064 assert_eq!(serialized, Bytes::from(expected));
2065 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
2066 assert_eq!(parsed, empty_streaming);
2067
2068 let streaming_set = Frame::StreamedSet(vec![
2070 Frame::SimpleString(Bytes::from("apple")),
2071 Frame::SimpleString(Bytes::from("banana")),
2072 Frame::Integer(42),
2073 ]);
2074 let serialized = frame_to_bytes(&streaming_set);
2075 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
2076 assert_eq!(parsed, streaming_set);
2077
2078 let streaming_attribute = Frame::StreamedAttribute(vec![
2080 (
2081 Frame::SimpleString(Bytes::from("trace-id")),
2082 Frame::SimpleString(Bytes::from("abc123")),
2083 ),
2084 (
2085 Frame::SimpleString(Bytes::from("span-id")),
2086 Frame::SimpleString(Bytes::from("def456")),
2087 ),
2088 ]);
2089 let serialized = frame_to_bytes(&streaming_attribute);
2090 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
2091 assert_eq!(parsed, streaming_attribute);
2092
2093 let streaming_push = Frame::StreamedPush(vec![
2095 Frame::SimpleString(Bytes::from("pubsub")),
2096 Frame::SimpleString(Bytes::from("channel1")),
2097 Frame::SimpleString(Bytes::from("message data")),
2098 ]);
2099 let serialized = frame_to_bytes(&streaming_push);
2100 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
2101 assert_eq!(parsed, streaming_push);
2102
2103 let empty_array = Frame::StreamedArray(vec![]);
2105 let serialized = frame_to_bytes(&empty_array);
2106 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
2107 assert_eq!(parsed, empty_array);
2108
2109 let empty_set = Frame::StreamedSet(vec![]);
2110 let serialized = frame_to_bytes(&empty_set);
2111 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
2112 assert_eq!(parsed, empty_set);
2113 }
2114
2115 #[test]
2116 fn test_streaming_sequences_edge_cases() {
2117 let data = Bytes::from("$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n");
2119 let result = parse_streaming_sequence(data);
2120 assert!(matches!(result, Err(ParseError::Incomplete)));
2121
2122 let data = Bytes::from("$?\r\n;abc\r\nHell\r\n;0\r\n");
2124 let result = parse_streaming_sequence(data);
2125 assert!(matches!(result, Err(ParseError::BadLength)));
2126
2127 let data = Bytes::from("*?\r\n+hello\r\n:42\r\n");
2129 let result = parse_streaming_sequence(data);
2130 assert!(matches!(result, Err(ParseError::Incomplete)));
2131
2132 let data = Bytes::from("*?\r\n+hello\r\n*2\r\n:1\r\n:2\r\n.\r\n");
2134 let result = parse_streaming_sequence(data);
2135 assert!(result.is_ok());
2136 let (frame, _) = result.unwrap();
2137 if let Frame::StreamedArray(items) = frame {
2138 assert_eq!(items.len(), 2);
2139 assert!(matches!(items[0], Frame::SimpleString(_)));
2140 assert!(matches!(items[1], Frame::Array(_)));
2141 }
2142
2143 let data = Bytes::from("*?\r\n.\r\n");
2145 let result = parse_streaming_sequence(data);
2146 assert!(result.is_ok());
2147 let (frame, _) = result.unwrap();
2148 if let Frame::StreamedArray(items) = frame {
2149 assert!(items.is_empty());
2150 }
2151
2152 let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+orphan\r\n.\r\n");
2154 let result = parse_streaming_sequence(data);
2155 assert!(matches!(result, Err(ParseError::InvalidFormat)));
2156
2157 let data = Bytes::from("+simple\r\n");
2159 let result = parse_streaming_sequence(data);
2160 assert!(result.is_ok());
2161 let (frame, _) = result.unwrap();
2162 assert!(matches!(frame, Frame::SimpleString(_)));
2163
2164 let data = Bytes::from(";999999999999999999\r\ndata\r\n");
2166 let result = parse_frame(data);
2167 match &result {
2170 Err(ParseError::BadLength) => {} Err(ParseError::Incomplete) => {} Err(e) => panic!("Got unexpected error type: {e:?}"),
2173 Ok(_) => panic!("Large chunk size should fail"),
2174 }
2175
2176 let data = Bytes::from("$?\r\n+invalid\r\n;0\r\n");
2178 let result = parse_streaming_sequence(data);
2179 assert!(matches!(result, Err(ParseError::InvalidFormat)));
2180
2181 let data = Bytes::from("*?\r\n+hello\r\n.corrupted\r\n");
2183 let result = parse_streaming_sequence(data);
2184 assert!(matches!(result, Err(ParseError::Incomplete)));
2185
2186 let data = Bytes::new();
2188 let result = parse_streaming_sequence(data);
2189 assert!(matches!(result, Err(ParseError::Incomplete)));
2190
2191 let data = Bytes::from("*?\r\n+hello\r\n$5\r\nwo");
2193 let result = parse_streaming_sequence(data);
2194 assert!(matches!(result, Err(ParseError::Incomplete)));
2195 }
2196
2197 #[test]
2198 fn test_roundtrip_nested_structures() {
2199 let original = Bytes::from(
2201 "*3\r\n+hello\r\n%2\r\n+key1\r\n:123\r\n+key2\r\n~1\r\n+item\r\n|1\r\n+meta\r\n+data\r\n",
2202 );
2203 let (frame, _) = parse_frame(original.clone()).unwrap();
2204 let serialized = frame_to_bytes(&frame);
2205
2206 let (reparsed, _) = parse_frame(serialized).unwrap();
2207 assert_eq!(frame, reparsed);
2208 }
2209
2210 #[test]
2211 fn test_zero_length_bulk_string_requires_trailing_crlf() {
2212 let input = Bytes::from("$0\r\n\r\nTAIL");
2214 let (frame, rest) = parse_frame(input).unwrap();
2215 assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
2216 assert_eq!(rest, Bytes::from("TAIL"));
2217
2218 let input = Bytes::from("$0\r\n");
2220 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2221
2222 let input = Bytes::from("$0\r\n\r");
2224 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2225
2226 let input = Bytes::from("$0\r\nXY");
2228 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2229 }
2230
2231 #[test]
2232 fn test_zero_length_streamed_chunk_requires_trailing_crlf() {
2233 let input = Bytes::from(";0\r\n\r\nTAIL");
2235 let (frame, rest) = parse_frame(input).unwrap();
2236 assert_eq!(frame, Frame::StreamedStringChunk(Bytes::new()));
2237 assert_eq!(rest, Bytes::from("TAIL"));
2238
2239 let input = Bytes::from(";0\r\n");
2241 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2242
2243 let input = Bytes::from(";0\r\nXY");
2245 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2246 }
2247
2248 #[test]
2249 fn test_integer_overflow_returns_overflow_error() {
2250 let input = Bytes::from(":9223372036854775808\r\n");
2252 assert_eq!(parse_frame(input), Err(ParseError::Overflow));
2253
2254 let input = Bytes::from(":9223372036854775807\r\n");
2256 let (frame, _) = parse_frame(input).unwrap();
2257 assert_eq!(frame, Frame::Integer(i64::MAX));
2258
2259 let input = Bytes::from(":-9223372036854775808\r\n");
2261 let (frame, _) = parse_frame(input).unwrap();
2262 assert_eq!(frame, Frame::Integer(i64::MIN));
2263 }
2264
2265 #[test]
2266 fn test_parser_propagates_errors() {
2267 let mut parser = Parser::new();
2268 parser.feed(Bytes::from("XINVALID\r\n"));
2269 let result = parser.next_frame();
2270 assert!(result.is_err());
2271 assert_eq!(result.unwrap_err(), ParseError::InvalidTag(b'X'));
2272 }
2273
2274 #[test]
2275 fn test_parser_returns_ok_none_for_incomplete() {
2276 let mut parser = Parser::new();
2277 parser.feed(Bytes::from("+HELL"));
2278 assert_eq!(parser.next_frame().unwrap(), None);
2279 }
2280
2281 #[test]
2282 fn test_integer_negative_overflow() {
2283 assert!(parse_frame(Bytes::from(":-9223372036854775809\r\n")).is_err());
2285 }
2286
2287 #[test]
2288 fn test_nonempty_bulk_malformed_terminator() {
2289 assert_eq!(
2291 parse_frame(Bytes::from("$3\r\nfoo")),
2292 Err(ParseError::Incomplete)
2293 );
2294 assert_eq!(
2296 parse_frame(Bytes::from("$3\r\nfooX")),
2297 Err(ParseError::Incomplete)
2298 );
2299 assert_eq!(
2301 parse_frame(Bytes::from("$3\r\nfooXY")),
2302 Err(ParseError::InvalidFormat)
2303 );
2304 }
2305
2306 #[test]
2307 fn test_blob_error_malformed_terminator() {
2308 assert_eq!(
2309 parse_frame(Bytes::from("!3\r\nerr")),
2310 Err(ParseError::Incomplete)
2311 );
2312 assert_eq!(
2313 parse_frame(Bytes::from("!3\r\nerrXY")),
2314 Err(ParseError::InvalidFormat)
2315 );
2316 }
2317
2318 #[test]
2319 fn test_verbatim_string_malformed_terminator() {
2320 assert_eq!(
2321 parse_frame(Bytes::from("=8\r\ntxt:data")),
2322 Err(ParseError::Incomplete)
2323 );
2324 assert_eq!(
2325 parse_frame(Bytes::from("=8\r\ntxt:dataXY")),
2326 Err(ParseError::InvalidFormat)
2327 );
2328 }
2329
2330 #[test]
2331 fn test_streamed_chunk_malformed_terminator() {
2332 assert_eq!(
2333 parse_frame(Bytes::from(";3\r\nabc")),
2334 Err(ParseError::Incomplete)
2335 );
2336 assert_eq!(
2337 parse_frame(Bytes::from(";3\r\nabcXY")),
2338 Err(ParseError::InvalidFormat)
2339 );
2340 }
2341
2342 #[test]
2343 fn test_bulk_string_size_limit() {
2344 assert_eq!(
2346 parse_frame(Bytes::from("$536870913\r\n")),
2347 Err(ParseError::BadLength)
2348 );
2349 }
2350
2351 #[test]
2352 fn test_blob_error_size_limit() {
2353 assert_eq!(
2354 parse_frame(Bytes::from("!536870913\r\n")),
2355 Err(ParseError::BadLength)
2356 );
2357 }
2358
2359 #[test]
2360 fn test_verbatim_string_size_limit() {
2361 assert_eq!(
2362 parse_frame(Bytes::from("=536870913\r\n")),
2363 Err(ParseError::BadLength)
2364 );
2365 }
2366
2367 #[test]
2368 fn test_streamed_chunk_size_limit() {
2369 assert_eq!(
2370 parse_frame(Bytes::from(";536870913\r\n")),
2371 Err(ParseError::BadLength)
2372 );
2373 }
2374
2375 #[test]
2376 fn test_invalid_double() {
2377 assert_eq!(
2378 parse_frame(Bytes::from(",foo\r\n")),
2379 Err(ParseError::InvalidFormat)
2380 );
2381 }
2382
2383 #[test]
2384 fn test_invalid_boolean() {
2385 assert_eq!(
2386 parse_frame(Bytes::from("#\r\n")),
2387 Err(ParseError::InvalidBoolean)
2388 );
2389 assert_eq!(
2390 parse_frame(Bytes::from("#true\r\n")),
2391 Err(ParseError::InvalidBoolean)
2392 );
2393 }
2394
2395 #[test]
2396 fn test_parser_clears_buffer_on_error() {
2397 let mut parser = Parser::new();
2398 parser.feed(Bytes::from("X\r\n"));
2399 assert_eq!(parser.next_frame(), Err(ParseError::InvalidTag(b'X')));
2400 assert_eq!(parser.buffered_bytes(), 0);
2401 }
2402
2403 #[test]
2404 fn test_parser_recovers_after_error() {
2405 let mut parser = Parser::new();
2406 parser.feed(Bytes::from("X\r\n"));
2407 assert!(parser.next_frame().is_err());
2408 assert_eq!(parser.buffered_bytes(), 0);
2409
2410 parser.feed(Bytes::from("+OK\r\n"));
2411 let frame = parser.next_frame().unwrap().unwrap();
2412 assert_eq!(frame, Frame::SimpleString(Bytes::from("OK")));
2413 }
2414
2415 #[test]
2416 fn test_streaming_set_roundtrip() {
2417 let data = Bytes::from("~?\r\n+a\r\n+b\r\n+c\r\n.\r\n");
2418 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2419 assert_eq!(
2420 frame,
2421 Frame::StreamedSet(vec![
2422 Frame::SimpleString(Bytes::from("a")),
2423 Frame::SimpleString(Bytes::from("b")),
2424 Frame::SimpleString(Bytes::from("c")),
2425 ])
2426 );
2427 assert!(rest.is_empty());
2428 }
2429
2430 #[test]
2431 fn test_streaming_attribute_roundtrip() {
2432 let data = Bytes::from("|?\r\n+key\r\n+val\r\n.\r\n");
2433 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2434 assert_eq!(
2435 frame,
2436 Frame::StreamedAttribute(vec![(
2437 Frame::SimpleString(Bytes::from("key")),
2438 Frame::SimpleString(Bytes::from("val")),
2439 )])
2440 );
2441 assert!(rest.is_empty());
2442 }
2443
2444 #[test]
2445 fn test_streaming_push_roundtrip() {
2446 let data = Bytes::from(">?\r\n+pubsub\r\n+channel\r\n+message\r\n.\r\n");
2447 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2448 assert_eq!(
2449 frame,
2450 Frame::StreamedPush(vec![
2451 Frame::SimpleString(Bytes::from("pubsub")),
2452 Frame::SimpleString(Bytes::from("channel")),
2453 Frame::SimpleString(Bytes::from("message")),
2454 ])
2455 );
2456 assert!(rest.is_empty());
2457 }
2458
2459 #[test]
2460 fn test_empty_streaming_containers() {
2461 let data = Bytes::from("$?\r\n;0\r\n\r\n");
2463 let (frame, _) = parse_streaming_sequence(data).unwrap();
2464 assert_eq!(frame, Frame::StreamedString(vec![]));
2465
2466 let data = Bytes::from("*?\r\n.\r\n");
2468 let (frame, _) = parse_streaming_sequence(data).unwrap();
2469 assert_eq!(frame, Frame::StreamedArray(vec![]));
2470
2471 let data = Bytes::from("~?\r\n.\r\n");
2473 let (frame, _) = parse_streaming_sequence(data).unwrap();
2474 assert_eq!(frame, Frame::StreamedSet(vec![]));
2475
2476 let data = Bytes::from("%?\r\n.\r\n");
2478 let (frame, _) = parse_streaming_sequence(data).unwrap();
2479 assert_eq!(frame, Frame::StreamedMap(vec![]));
2480 }
2481
2482 #[test]
2483 fn test_streaming_attribute_odd_elements_errors() {
2484 let data = Bytes::from("|?\r\n+key\r\n+val\r\n+orphan\r\n.\r\n");
2485 let result = parse_streaming_sequence(data);
2486 assert!(matches!(result, Err(ParseError::InvalidFormat)));
2487 }
2488
2489 #[test]
2490 fn test_streaming_blob_error_header_passthrough() {
2491 let data = Bytes::from("!?\r\n!5\r\nERROR\r\n");
2493 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2494 assert_eq!(frame, Frame::StreamedBlobErrorHeader);
2495 assert!(!rest.is_empty());
2497 }
2498
2499 #[test]
2500 fn test_streaming_verbatim_header_passthrough() {
2501 let data = Bytes::from("=?\r\n=9\r\ntxt:hello\r\n");
2503 let (frame, rest) = parse_streaming_sequence(data).unwrap();
2504 assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
2505 assert!(!rest.is_empty());
2506 }
2507
2508 #[test]
2509 fn frame_as_bytes() {
2510 assert_eq!(
2511 Frame::SimpleString(Bytes::from("OK")).as_bytes(),
2512 Some(&Bytes::from("OK"))
2513 );
2514 assert_eq!(
2515 Frame::BlobError(Bytes::from("ERR")).as_bytes(),
2516 Some(&Bytes::from("ERR"))
2517 );
2518 assert_eq!(
2519 Frame::BigNumber(Bytes::from("123")).as_bytes(),
2520 Some(&Bytes::from("123"))
2521 );
2522 assert_eq!(Frame::BulkString(None).as_bytes(), None);
2523 assert_eq!(Frame::Null.as_bytes(), None);
2524 assert_eq!(Frame::Boolean(true).as_bytes(), None);
2525 }
2526
2527 #[test]
2528 fn frame_as_str() {
2529 assert_eq!(Frame::SimpleString(Bytes::from("OK")).as_str(), Some("OK"));
2530 assert_eq!(
2531 Frame::BulkString(Some(Bytes::from_static(&[0xFF]))).as_str(),
2532 None
2533 );
2534 }
2535
2536 #[test]
2537 fn frame_scalar_accessors() {
2538 assert_eq!(Frame::Integer(42).as_integer(), Some(42));
2539 assert_eq!(Frame::Double(3.5).as_double(), Some(3.5));
2540 assert_eq!(Frame::Boolean(true).as_boolean(), Some(true));
2541 assert_eq!(Frame::Integer(42).as_double(), None);
2543 assert_eq!(Frame::Double(3.5).as_integer(), None);
2544 }
2545
2546 #[test]
2547 fn frame_collection_accessors() {
2548 let arr = Frame::Array(Some(vec![Frame::Integer(1)]));
2549 assert_eq!(arr.as_array(), Some([Frame::Integer(1)].as_slice()));
2550 assert_eq!(Frame::Array(None).as_array(), None);
2551
2552 let set = Frame::Set(vec![Frame::Integer(1)]);
2553 assert_eq!(set.as_set(), Some([Frame::Integer(1)].as_slice()));
2554
2555 let map = Frame::Map(vec![(
2556 Frame::SimpleString(Bytes::from("k")),
2557 Frame::Integer(1),
2558 )]);
2559 assert!(map.as_map().is_some());
2560
2561 let push = Frame::Push(vec![Frame::Integer(1)]);
2562 assert_eq!(push.as_push(), Some([Frame::Integer(1)].as_slice()));
2563 }
2564
2565 #[test]
2566 fn frame_verbatim_string_accessor() {
2567 let v = Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello"));
2568 let (fmt, content) = v.as_verbatim_string().unwrap();
2569 assert_eq!(fmt, &Bytes::from("txt"));
2570 assert_eq!(content, &Bytes::from("hello"));
2571 assert_eq!(Frame::Null.as_verbatim_string(), None);
2572 }
2573
2574 #[test]
2575 fn frame_into_conversions() {
2576 assert_eq!(
2577 Frame::Array(Some(vec![Frame::Integer(1)])).into_array(),
2578 Ok(vec![Frame::Integer(1)])
2579 );
2580 assert!(Frame::Array(None).into_array().is_err());
2581
2582 assert_eq!(
2583 Frame::BulkString(Some(Bytes::from("x"))).into_bulk_string(),
2584 Ok(Bytes::from("x"))
2585 );
2586 assert!(Frame::BulkString(None).into_bulk_string().is_err());
2587
2588 let map = Frame::Map(vec![(Frame::Integer(1), Frame::Integer(2))]);
2589 assert!(map.into_map().is_ok());
2590
2591 let set = Frame::Set(vec![Frame::Integer(1)]);
2592 assert!(set.into_set().is_ok());
2593 }
2594
2595 #[test]
2596 fn frame_is_null() {
2597 assert!(Frame::Null.is_null());
2598 assert!(Frame::BulkString(None).is_null());
2599 assert!(Frame::Array(None).is_null());
2600 assert!(!Frame::BulkString(Some(Bytes::new())).is_null());
2601 assert!(!Frame::Integer(0).is_null());
2602 }
2603
2604 #[test]
2605 fn frame_is_error() {
2606 assert!(Frame::Error(Bytes::from("ERR")).is_error());
2607 assert!(Frame::BlobError(Bytes::from("ERR")).is_error());
2608 assert!(!Frame::SimpleString(Bytes::from("OK")).is_error());
2609 }
2610}