1use bytes::{BufMut, Bytes, BytesMut};
7
8const MAX_COLLECTION_SIZE: usize = 10_000_000;
11
12#[derive(Default, Debug)]
17pub struct Parser {
18 buffer: BytesMut,
19}
20
21impl Parser {
22 pub fn new() -> Self {
24 Self {
25 buffer: BytesMut::new(),
26 }
27 }
28
29 pub fn feed(&mut self, data: Bytes) {
33 self.buffer.extend_from_slice(&data);
34 }
35
36 pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
42 if self.buffer.is_empty() {
43 return Ok(None);
44 }
45
46 let bytes = self.buffer.split().freeze();
47
48 match parse_frame_inner(&bytes, 0) {
49 Ok((frame, consumed)) => {
50 if consumed < bytes.len() {
51 self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
52 }
53 Ok(Some(frame))
54 }
55 Err(ParseError::Incomplete) => {
56 self.buffer.unsplit(bytes.into());
57 Ok(None)
58 }
59 Err(e) => Err(e),
60 }
61 }
62
63 pub fn buffered_bytes(&self) -> usize {
65 self.buffer.len()
66 }
67
68 pub fn clear(&mut self) {
70 self.buffer.clear();
71 }
72}
73
74#[derive(Debug, Clone, PartialEq)]
80pub enum Frame {
81 SimpleString(Bytes),
83 Error(Bytes),
85 Integer(i64),
87 BulkString(Option<Bytes>),
90 BlobError(Bytes),
92 StreamedStringHeader,
94 StreamedBlobErrorHeader,
96 StreamedVerbatimStringHeader,
98 StreamedArrayHeader,
100 StreamedSetHeader,
102 StreamedMapHeader,
104 StreamedAttributeHeader,
106 StreamedPushHeader,
108 StreamedStringChunk(Bytes),
124
125 StreamedString(Vec<Bytes>),
143
144 StreamedArray(Vec<Frame>),
162
163 StreamedSet(Vec<Frame>),
168
169 StreamedMap(Vec<(Frame, Frame)>),
185
186 StreamedAttribute(Vec<(Frame, Frame)>),
191
192 StreamedPush(Vec<Frame>),
210 StreamTerminator,
212 Null,
214 Double(f64),
216 SpecialFloat(Bytes),
218 Boolean(bool),
220 BigNumber(Bytes),
222 VerbatimString(Bytes, Bytes),
225 Array(Option<Vec<Frame>>),
227 Set(Vec<Frame>),
229 Map(Vec<(Frame, Frame)>),
231 Attribute(Vec<(Frame, Frame)>),
233 Push(Vec<Frame>),
235}
236
237pub use crate::ParseError;
238
239pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
243 let (frame, consumed) = parse_frame_inner(&input, 0)?;
244 Ok((frame, input.slice(consumed..)))
245}
246
247fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
250 let buf = input.as_ref();
251 if pos >= buf.len() {
252 return Err(ParseError::Incomplete);
253 }
254
255 let tag = buf[pos];
256
257 match tag {
258 b'+' => {
259 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
260 Ok((
261 Frame::SimpleString(input.slice(pos + 1..line_end)),
262 after_crlf,
263 ))
264 }
265 b'-' => {
266 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
267 Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
268 }
269 b':' => {
270 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
271 let v = parse_i64(&buf[pos + 1..line_end])?;
272 Ok((Frame::Integer(v), after_crlf))
273 }
274 b'$' => {
275 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
276 let len_bytes = &buf[pos + 1..line_end];
277 if len_bytes == b"?" {
279 return Ok((Frame::StreamedStringHeader, after_crlf));
280 }
281 if len_bytes == b"-1" {
283 return Ok((Frame::BulkString(None), after_crlf));
284 }
285 let len = parse_usize(len_bytes)?;
286 if len == 0 {
287 if after_crlf + 1 >= buf.len() {
288 return Err(ParseError::Incomplete);
289 }
290 if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
291 return Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2));
292 } else {
293 return Err(ParseError::InvalidFormat);
294 }
295 }
296 let data_start = after_crlf;
297 let data_end = data_start + len;
298 if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
299 return Err(ParseError::Incomplete);
300 }
301 Ok((
302 Frame::BulkString(Some(input.slice(data_start..data_end))),
303 data_end + 2,
304 ))
305 }
306 b'_' => {
307 if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
308 Ok((Frame::Null, pos + 3))
309 } else {
310 Err(ParseError::Incomplete)
311 }
312 }
313 b',' => {
314 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
315 let line_bytes = &buf[pos + 1..line_end];
316 if line_bytes == b"inf" || line_bytes == b"-inf" || line_bytes == b"nan" {
318 return Ok((
319 Frame::SpecialFloat(input.slice(pos + 1..line_end)),
320 after_crlf,
321 ));
322 }
323 let s = std::str::from_utf8(line_bytes).map_err(|_| ParseError::Utf8Error)?;
325 let v = s.parse::<f64>().map_err(|_| ParseError::InvalidFormat)?;
326 Ok((Frame::Double(v), after_crlf))
327 }
328 b'#' => {
329 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
330 match &buf[pos + 1..line_end] {
331 b"t" => Ok((Frame::Boolean(true), after_crlf)),
332 b"f" => Ok((Frame::Boolean(false), after_crlf)),
333 _ => Err(ParseError::InvalidBoolean),
334 }
335 }
336 b'(' => {
337 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
338 Ok((Frame::BigNumber(input.slice(pos + 1..line_end)), after_crlf))
339 }
340 b'=' => {
341 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
342 let len_bytes = &buf[pos + 1..line_end];
343 if len_bytes == b"?" {
345 return Ok((Frame::StreamedVerbatimStringHeader, after_crlf));
346 }
347 if len_bytes == b"-1" {
349 return Ok((
350 Frame::VerbatimString(Bytes::new(), Bytes::new()),
351 after_crlf,
352 ));
353 }
354 let len = parse_usize(len_bytes)?;
355 let data_start = after_crlf;
356 let data_end = data_start + len;
357 if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
358 return Err(ParseError::Incomplete);
359 }
360 let sep = buf[data_start..data_end]
362 .iter()
363 .position(|&b| b == b':')
364 .ok_or(ParseError::InvalidFormat)?;
365 let format = input.slice(data_start..data_start + sep);
366 let content = input.slice(data_start + sep + 1..data_end);
367 Ok((Frame::VerbatimString(format, content), data_end + 2))
368 }
369 b'!' => {
370 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
371 let len_bytes = &buf[pos + 1..line_end];
372 if len_bytes == b"?" {
374 return Ok((Frame::StreamedBlobErrorHeader, after_crlf));
375 }
376 if len_bytes == b"-1" {
377 return Ok((Frame::BlobError(Bytes::new()), after_crlf));
378 }
379 let len = parse_usize(len_bytes)?;
380 let data_start = after_crlf;
381 let data_end = data_start + len;
382 if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
383 return Err(ParseError::Incomplete);
384 }
385 Ok((
386 Frame::BlobError(input.slice(data_start..data_end)),
387 data_end + 2,
388 ))
389 }
390 b'*' => {
391 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
392 let len_bytes = &buf[pos + 1..line_end];
393 if len_bytes == b"?" {
394 return Ok((Frame::StreamedArrayHeader, after_crlf));
395 }
396 if len_bytes == b"-1" {
397 return Ok((Frame::Array(None), after_crlf));
398 }
399 let count = parse_count(len_bytes)?;
400 if count == 0 {
401 return Ok((Frame::Array(Some(Vec::new())), after_crlf));
402 }
403 let mut cursor = after_crlf;
404 let mut items = Vec::with_capacity(count);
405 for _ in 0..count {
406 let (item, next) = parse_frame_inner(input, cursor)?;
407 items.push(item);
408 cursor = next;
409 }
410 Ok((Frame::Array(Some(items)), cursor))
411 }
412 b'~' => {
413 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
414 let len_bytes = &buf[pos + 1..line_end];
415 if len_bytes == b"?" {
416 return Ok((Frame::StreamedSetHeader, after_crlf));
417 }
418 let count = parse_count(len_bytes)?;
419 let mut cursor = after_crlf;
420 let mut items = Vec::with_capacity(count);
421 for _ in 0..count {
422 let (item, next) = parse_frame_inner(input, cursor)?;
423 items.push(item);
424 cursor = next;
425 }
426 Ok((Frame::Set(items), cursor))
427 }
428 b'%' | b'|' => {
429 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
430 let len_bytes = &buf[pos + 1..line_end];
431 if len_bytes == b"?" {
432 return if tag == b'%' {
433 Ok((Frame::StreamedMapHeader, after_crlf))
434 } else {
435 Ok((Frame::StreamedAttributeHeader, after_crlf))
436 };
437 }
438 let count = parse_count(len_bytes)?;
439 let mut cursor = after_crlf;
440 let mut pairs = Vec::with_capacity(count);
441 for _ in 0..count {
442 let (key, next1) = parse_frame_inner(input, cursor)?;
443 let (val, next2) = parse_frame_inner(input, next1)?;
444 pairs.push((key, val));
445 cursor = next2;
446 }
447 if tag == b'%' {
448 Ok((Frame::Map(pairs), cursor))
449 } else {
450 Ok((Frame::Attribute(pairs), cursor))
451 }
452 }
453 b'>' => {
454 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
455 let len_bytes = &buf[pos + 1..line_end];
456 if len_bytes == b"?" {
457 return Ok((Frame::StreamedPushHeader, after_crlf));
458 }
459 let count = parse_count(len_bytes)?;
460 let mut cursor = after_crlf;
461 let mut items = Vec::with_capacity(count);
462 for _ in 0..count {
463 let (item, next) = parse_frame_inner(input, cursor)?;
464 items.push(item);
465 cursor = next;
466 }
467 Ok((Frame::Push(items), cursor))
468 }
469 b';' => {
470 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
471 let len = parse_usize(&buf[pos + 1..line_end])?;
472 if len == 0 {
473 if after_crlf + 1 >= buf.len() {
474 return Err(ParseError::Incomplete);
475 }
476 if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
477 return Ok((Frame::StreamedStringChunk(Bytes::new()), after_crlf + 2));
478 } else {
479 return Err(ParseError::InvalidFormat);
480 }
481 }
482 let data_start = after_crlf;
483 let data_end = data_start + len;
484 if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
485 return Err(ParseError::Incomplete);
486 }
487 Ok((
488 Frame::StreamedStringChunk(input.slice(data_start..data_end)),
489 data_end + 2,
490 ))
491 }
492 b'.' => {
493 if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
494 Ok((Frame::StreamTerminator, pos + 3))
495 } else {
496 Err(ParseError::Incomplete)
497 }
498 }
499 _ => Err(ParseError::InvalidTag(tag)),
500 }
501}
502
503pub fn parse_streaming_sequence(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
583 if input.is_empty() {
584 return Err(ParseError::Incomplete);
585 }
586
587 let (header, mut rest) = parse_frame(input)?;
588
589 match header {
590 Frame::StreamedStringHeader => {
591 let mut chunks = Vec::new();
593
594 loop {
595 let (frame, new_rest) = parse_frame(rest)?;
596 rest = new_rest;
597
598 match frame {
599 Frame::StreamedStringChunk(chunk) => {
600 if chunk.is_empty() {
601 break;
603 }
604 chunks.push(chunk);
605 }
606 _ => {
607 return Err(ParseError::InvalidFormat);
608 }
609 }
610 }
611
612 Ok((Frame::StreamedString(chunks), rest))
613 }
614 Frame::StreamedBlobErrorHeader => {
615 let mut chunks = Vec::new();
618
619 loop {
620 let (frame, new_rest) = parse_frame(rest)?;
621 rest = new_rest;
622
623 match frame {
624 Frame::BlobError(chunk) => {
625 if chunk.is_empty() {
628 break;
629 }
630 chunks.push(chunk);
631 }
632 _ => {
633 return Err(ParseError::InvalidFormat);
634 }
635 }
636 }
637
638 let total_len: usize = chunks.iter().map(|c| c.len()).sum();
640 let mut combined = Vec::with_capacity(total_len);
641 for chunk in chunks {
642 combined.extend_from_slice(&chunk);
643 }
644 Ok((Frame::BlobError(Bytes::from(combined)), rest))
645 }
646 Frame::StreamedVerbatimStringHeader => {
647 let mut chunks = Vec::new();
649
650 loop {
651 let (frame, new_rest) = parse_frame(rest)?;
652 rest = new_rest;
653
654 match frame {
655 Frame::VerbatimString(format, content) => {
656 if content.is_empty() && format.len() == 3 {
658 break;
659 }
660 chunks.push((format, content));
663 }
664 _ => {
665 return Err(ParseError::InvalidFormat);
666 }
667 }
668 }
669
670 if chunks.is_empty() {
672 return Ok((Frame::VerbatimString(Bytes::new(), Bytes::new()), rest));
673 }
674
675 let format = chunks[0].0.clone();
676 let total_len: usize = chunks.iter().map(|(_, c)| c.len()).sum();
677 let mut combined = Vec::with_capacity(total_len);
678 for (_, content) in chunks {
679 combined.extend_from_slice(&content);
680 }
681 Ok((Frame::VerbatimString(format, Bytes::from(combined)), rest))
682 }
683 Frame::StreamedArrayHeader => {
684 let mut items = Vec::new();
686
687 loop {
688 let (frame, new_rest) = parse_frame(rest)?;
689 rest = new_rest;
690
691 match frame {
692 Frame::StreamTerminator => {
693 break;
694 }
695 item => {
696 items.push(item);
697 }
698 }
699 }
700
701 Ok((Frame::StreamedArray(items), rest))
702 }
703 Frame::StreamedSetHeader => {
704 let mut items = Vec::new();
706
707 loop {
708 let (frame, new_rest) = parse_frame(rest)?;
709 rest = new_rest;
710
711 match frame {
712 Frame::StreamTerminator => {
713 break;
714 }
715 item => {
716 items.push(item);
717 }
718 }
719 }
720
721 Ok((Frame::StreamedSet(items), rest))
722 }
723 Frame::StreamedMapHeader => {
724 let mut pairs = Vec::new();
726
727 loop {
728 let (frame, new_rest) = parse_frame(rest)?;
729 rest = new_rest;
730
731 match frame {
732 Frame::StreamTerminator => {
733 break;
734 }
735 key => {
736 let (value, newer_rest) = parse_frame(rest)?;
737 rest = newer_rest;
738 pairs.push((key, value));
739 }
740 }
741 }
742
743 Ok((Frame::StreamedMap(pairs), rest))
744 }
745 Frame::StreamedAttributeHeader => {
746 let mut pairs = Vec::new();
748
749 loop {
750 let (frame, new_rest) = parse_frame(rest)?;
751 rest = new_rest;
752
753 match frame {
754 Frame::StreamTerminator => {
755 break;
756 }
757 key => {
758 let (value, newer_rest) = parse_frame(rest)?;
759 rest = newer_rest;
760 pairs.push((key, value));
761 }
762 }
763 }
764
765 Ok((Frame::StreamedAttribute(pairs), rest))
766 }
767 Frame::StreamedPushHeader => {
768 let mut items = Vec::new();
770
771 loop {
772 let (frame, new_rest) = parse_frame(rest)?;
773 rest = new_rest;
774
775 match frame {
776 Frame::StreamTerminator => {
777 break;
778 }
779 item => {
780 items.push(item);
781 }
782 }
783 }
784
785 Ok((Frame::StreamedPush(items), rest))
786 }
787 _ => {
788 Ok((header, rest))
790 }
791 }
792}
793
794#[inline]
797fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
798 let mut i = from;
799 let len = buf.len();
800 while i + 1 < len {
801 if buf[i] == b'\r' && buf[i + 1] == b'\n' {
802 return Ok((i, i + 2));
803 }
804 i += 1;
805 }
806 Err(ParseError::Incomplete)
807}
808
809#[inline]
811fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
812 if buf.is_empty() {
813 return Err(ParseError::BadLength);
814 }
815 let mut v: usize = 0;
816 for &b in buf {
817 if !b.is_ascii_digit() {
818 return Err(ParseError::BadLength);
819 }
820 v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
821 v = v
822 .checked_add((b - b'0') as usize)
823 .ok_or(ParseError::BadLength)?;
824 }
825 Ok(v)
826}
827
828#[inline]
830fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
831 if buf.is_empty() {
832 return Err(ParseError::InvalidFormat);
833 }
834 let (neg, digits) = if buf[0] == b'-' {
835 (true, &buf[1..])
836 } else {
837 (false, buf)
838 };
839 if digits.is_empty() {
840 return Err(ParseError::InvalidFormat);
841 }
842 let mut v: i64 = 0;
843 for (i, &d) in digits.iter().enumerate() {
844 if !d.is_ascii_digit() {
845 return Err(ParseError::InvalidFormat);
846 }
847 let digit = (d - b'0') as i64;
848 if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
849 return Ok(i64::MIN);
850 }
851 if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
852 return Err(ParseError::Overflow);
853 }
854 v = v * 10 + digit;
855 }
856 if neg { Ok(-v) } else { Ok(v) }
857}
858
859#[inline]
861fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
862 let count = parse_usize(buf)?;
863 if count > MAX_COLLECTION_SIZE {
864 return Err(ParseError::BadLength);
865 }
866 Ok(count)
867}
868
869pub fn frame_to_bytes(frame: &Frame) -> Bytes {
873 let mut buf = BytesMut::new();
874 serialize_frame(frame, &mut buf);
875 buf.freeze()
876}
877
878fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
879 match frame {
880 Frame::SimpleString(s) => {
881 buf.put_u8(b'+');
882 buf.extend_from_slice(s);
883 buf.extend_from_slice(b"\r\n");
884 }
885 Frame::Error(e) => {
886 buf.put_u8(b'-');
887 buf.extend_from_slice(e);
888 buf.extend_from_slice(b"\r\n");
889 }
890 Frame::Integer(i) => {
891 buf.put_u8(b':');
892 let s = i.to_string();
893 buf.extend_from_slice(s.as_bytes());
894 buf.extend_from_slice(b"\r\n");
895 }
896 Frame::BulkString(opt) => {
897 buf.put_u8(b'$');
898 match opt {
899 Some(data) => {
900 let len = data.len().to_string();
901 buf.extend_from_slice(len.as_bytes());
902 buf.extend_from_slice(b"\r\n");
903 buf.extend_from_slice(data);
904 buf.extend_from_slice(b"\r\n");
905 }
906 None => {
907 buf.extend_from_slice(b"-1\r\n");
908 }
909 }
910 }
911 Frame::BlobError(data) => {
912 buf.put_u8(b'!');
913 let len = data.len().to_string();
914 buf.extend_from_slice(len.as_bytes());
915 buf.extend_from_slice(b"\r\n");
916 buf.extend_from_slice(data);
917 buf.extend_from_slice(b"\r\n");
918 }
919 Frame::StreamedStringHeader => {
920 buf.extend_from_slice(b"$?\r\n");
921 }
922 Frame::StreamedBlobErrorHeader => {
923 buf.extend_from_slice(b"!?\r\n");
924 }
925 Frame::StreamedVerbatimStringHeader => {
926 buf.extend_from_slice(b"=?\r\n");
927 }
928 Frame::StreamedArrayHeader => {
929 buf.extend_from_slice(b"*?\r\n");
930 }
931 Frame::StreamedSetHeader => {
932 buf.extend_from_slice(b"~?\r\n");
933 }
934 Frame::StreamedMapHeader => {
935 buf.extend_from_slice(b"%?\r\n");
936 }
937 Frame::StreamedAttributeHeader => {
938 buf.extend_from_slice(b"|?\r\n");
939 }
940 Frame::StreamedPushHeader => {
941 buf.extend_from_slice(b">?\r\n");
942 }
943 Frame::StreamedStringChunk(data) => {
944 buf.put_u8(b';');
945 let len = data.len().to_string();
946 buf.extend_from_slice(len.as_bytes());
947 buf.extend_from_slice(b"\r\n");
948 buf.extend_from_slice(data);
949 buf.extend_from_slice(b"\r\n");
950 }
951 Frame::StreamedString(chunks) => {
952 buf.extend_from_slice(b"$?\r\n");
954 for chunk in chunks {
955 buf.put_u8(b';');
956 let len = chunk.len().to_string();
957 buf.extend_from_slice(len.as_bytes());
958 buf.extend_from_slice(b"\r\n");
959 buf.extend_from_slice(chunk);
960 buf.extend_from_slice(b"\r\n");
961 }
962 buf.extend_from_slice(b";0\r\n\r\n");
963 }
964 Frame::StreamedArray(items) => {
965 buf.extend_from_slice(b"*?\r\n");
966 for item in items {
967 serialize_frame(item, buf);
968 }
969 buf.extend_from_slice(b".\r\n");
970 }
971 Frame::StreamedSet(items) => {
972 buf.extend_from_slice(b"~?\r\n");
973 for item in items {
974 serialize_frame(item, buf);
975 }
976 buf.extend_from_slice(b".\r\n");
977 }
978 Frame::StreamedMap(pairs) => {
979 buf.extend_from_slice(b"%?\r\n");
980 for (key, value) in pairs {
981 serialize_frame(key, buf);
982 serialize_frame(value, buf);
983 }
984 buf.extend_from_slice(b".\r\n");
985 }
986 Frame::StreamedAttribute(pairs) => {
987 buf.extend_from_slice(b"|?\r\n");
988 for (key, value) in pairs {
989 serialize_frame(key, buf);
990 serialize_frame(value, buf);
991 }
992 buf.extend_from_slice(b".\r\n");
993 }
994 Frame::StreamedPush(items) => {
995 buf.extend_from_slice(b">?\r\n");
996 for item in items {
997 serialize_frame(item, buf);
998 }
999 buf.extend_from_slice(b".\r\n");
1000 }
1001 Frame::StreamTerminator => {
1002 buf.extend_from_slice(b".\r\n");
1003 }
1004 Frame::Null => {
1005 buf.extend_from_slice(b"_\r\n");
1006 }
1007 Frame::Double(d) => {
1008 buf.put_u8(b',');
1009 let s = d.to_string();
1010 buf.extend_from_slice(s.as_bytes());
1011 buf.extend_from_slice(b"\r\n");
1012 }
1013 Frame::SpecialFloat(f) => {
1014 buf.put_u8(b',');
1015 buf.extend_from_slice(f);
1016 buf.extend_from_slice(b"\r\n");
1017 }
1018 Frame::Boolean(b) => {
1019 buf.extend_from_slice(if *b { b"#t\r\n" } else { b"#f\r\n" });
1020 }
1021 Frame::BigNumber(n) => {
1022 buf.put_u8(b'(');
1023 buf.extend_from_slice(n);
1024 buf.extend_from_slice(b"\r\n");
1025 }
1026 Frame::VerbatimString(format, content) => {
1027 buf.put_u8(b'=');
1028 let total_len = format.len() + 1 + content.len(); let len = total_len.to_string();
1030 buf.extend_from_slice(len.as_bytes());
1031 buf.extend_from_slice(b"\r\n");
1032 buf.extend_from_slice(format);
1033 buf.put_u8(b':');
1034 buf.extend_from_slice(content);
1035 buf.extend_from_slice(b"\r\n");
1036 }
1037 Frame::Array(opt) => {
1038 buf.put_u8(b'*');
1039 match opt {
1040 Some(items) => {
1041 let len = items.len().to_string();
1042 buf.extend_from_slice(len.as_bytes());
1043 buf.extend_from_slice(b"\r\n");
1044 for item in items {
1045 serialize_frame(item, buf);
1046 }
1047 }
1048 None => {
1049 buf.extend_from_slice(b"-1\r\n");
1050 }
1051 }
1052 }
1053 Frame::Set(items) => {
1054 buf.put_u8(b'~');
1055 let len = items.len().to_string();
1056 buf.extend_from_slice(len.as_bytes());
1057 buf.extend_from_slice(b"\r\n");
1058 for item in items {
1059 serialize_frame(item, buf);
1060 }
1061 }
1062 Frame::Map(pairs) => {
1063 buf.put_u8(b'%');
1064 let len = pairs.len().to_string();
1065 buf.extend_from_slice(len.as_bytes());
1066 buf.extend_from_slice(b"\r\n");
1067 for (key, value) in pairs {
1068 serialize_frame(key, buf);
1069 serialize_frame(value, buf);
1070 }
1071 }
1072 Frame::Attribute(pairs) => {
1073 buf.put_u8(b'|');
1074 let len = pairs.len().to_string();
1075 buf.extend_from_slice(len.as_bytes());
1076 buf.extend_from_slice(b"\r\n");
1077 for (key, value) in pairs {
1078 serialize_frame(key, buf);
1079 serialize_frame(value, buf);
1080 }
1081 }
1082 Frame::Push(items) => {
1083 buf.put_u8(b'>');
1084 let len = items.len().to_string();
1085 buf.extend_from_slice(len.as_bytes());
1086 buf.extend_from_slice(b"\r\n");
1087 for item in items {
1088 serialize_frame(item, buf);
1089 }
1090 }
1091 }
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096 use super::{Frame, ParseError, Parser, frame_to_bytes, parse_frame, parse_streaming_sequence};
1097 use bytes::Bytes;
1098
1099 #[test]
1100 fn test_parse_frame_simple_string() {
1101 let input = Bytes::from("+HELLO\r\nWORLD");
1102 let (frame, rest) = parse_frame(input.clone()).unwrap();
1103 assert_eq!(frame, Frame::SimpleString(Bytes::from("HELLO")));
1104 assert_eq!(rest, Bytes::from("WORLD"));
1105 }
1106
1107 #[test]
1108 fn test_parse_frame_blob_error() {
1109 let input = Bytes::from("!5\r\nERROR\r\nREST");
1110 let (frame, rest) = parse_frame(input.clone()).unwrap();
1111 assert_eq!(frame, Frame::BlobError(Bytes::from("ERROR")));
1112 assert_eq!(rest, Bytes::from("REST"));
1113 }
1114
1115 #[test]
1116 fn test_parse_frame_error() {
1117 let input = Bytes::from("-ERR fail\r\nLEFT");
1118 let (frame, rest) = parse_frame(input.clone()).unwrap();
1119 assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
1120 assert_eq!(rest, Bytes::from("LEFT"));
1121 }
1122
1123 #[test]
1124 fn test_parse_frame_integer() {
1125 let input = Bytes::from(":42\r\nTAIL");
1126 let (frame, rest) = parse_frame(input.clone()).unwrap();
1127 assert_eq!(frame, Frame::Integer(42));
1128 assert_eq!(rest, Bytes::from("TAIL"));
1129 }
1130
1131 #[test]
1132 fn test_parse_frame_bulk_string() {
1133 let input = Bytes::from("$3\r\nfoo\r\nREST");
1134 let (frame, rest) = parse_frame(input.clone()).unwrap();
1135 assert_eq!(frame, Frame::BulkString(Some(Bytes::from("foo"))));
1136 assert_eq!(rest, Bytes::from("REST"));
1137 let null_input = Bytes::from("$-1\r\nAFTER");
1138 let (frame, rest) = parse_frame(null_input.clone()).unwrap();
1139 assert_eq!(frame, Frame::BulkString(None));
1140 assert_eq!(rest, Bytes::from("AFTER"));
1141 }
1142
1143 #[test]
1144 fn test_parse_frame_null() {
1145 let input = Bytes::from("_\r\nLEFT");
1146 let (frame, rest) = parse_frame(input.clone()).unwrap();
1147 assert_eq!(frame, Frame::Null);
1148 assert_eq!(rest, Bytes::from("LEFT"));
1149 }
1150
1151 #[test]
1152 fn test_parse_frame_double_and_special_float() {
1153 let input = Bytes::from(",3.5\r\nNEXT");
1154 let (frame, rest) = parse_frame(input.clone()).unwrap();
1155 assert_eq!(frame, Frame::Double(3.5));
1156 assert_eq!(rest, Bytes::from("NEXT"));
1157 let input_inf = Bytes::from(",inf\r\nTAIL");
1158 let (frame, rest) = parse_frame(input_inf.clone()).unwrap();
1159 assert_eq!(frame, Frame::SpecialFloat(Bytes::from("inf")));
1160 assert_eq!(rest, Bytes::from("TAIL"));
1161 }
1162
1163 #[test]
1164 fn test_parse_frame_boolean() {
1165 let input_true = Bytes::from("#t\r\nXYZ");
1166 let (frame, rest) = parse_frame(input_true.clone()).unwrap();
1167 assert_eq!(frame, Frame::Boolean(true));
1168 assert_eq!(rest, Bytes::from("XYZ"));
1169 let input_false = Bytes::from("#f\r\nDONE");
1170 let (frame, rest) = parse_frame(input_false.clone()).unwrap();
1171 assert_eq!(frame, Frame::Boolean(false));
1172 assert_eq!(rest, Bytes::from("DONE"));
1173 }
1174
1175 #[test]
1176 fn test_parse_frame_big_number() {
1177 let input = Bytes::from("(123456789\r\nEND");
1178 let (frame, rest) = parse_frame(input.clone()).unwrap();
1179 assert_eq!(frame, Frame::BigNumber(Bytes::from("123456789")));
1180 assert_eq!(rest, Bytes::from("END"));
1181 }
1182
1183 #[test]
1184 fn test_parse_frame_verbatim_string() {
1185 let input = Bytes::from("=12\r\ntxt:hi there\r\nAFTER");
1186 let (frame, rest) = parse_frame(input.clone()).unwrap();
1187 assert_eq!(
1188 frame,
1189 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hi there")) );
1194 assert_eq!(rest, Bytes::from("AFTER"));
1195 }
1196
1197 #[test]
1198 fn test_parse_frame_array_set_push_map_attribute() {
1199 let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\nTAIL");
1201 let (frame, rest) = parse_frame(input.clone()).unwrap();
1202 assert_eq!(
1203 frame,
1204 Frame::Array(Some(vec![
1205 Frame::BulkString(Some(Bytes::from("foo"))),
1206 Frame::BulkString(Some(Bytes::from("bar")))
1207 ]))
1208 );
1209 assert_eq!(rest, Bytes::from("TAIL"));
1210 let input_null = Bytes::from("*-1\r\nEND");
1212 let (frame, rest) = parse_frame(input_null.clone()).unwrap();
1213 assert_eq!(frame, Frame::Array(None));
1214 assert_eq!(rest, Bytes::from("END"));
1215 let input_set = Bytes::from("~2\r\n+foo\r\n+bar\r\nTAIL");
1217 let (frame, rest) = parse_frame(input_set.clone()).unwrap();
1218 assert_eq!(
1219 frame,
1220 Frame::Set(vec![
1221 Frame::SimpleString(Bytes::from("foo")),
1222 Frame::SimpleString(Bytes::from("bar")),
1223 ])
1224 );
1225 assert_eq!(rest, Bytes::from("TAIL"));
1226 let input_map = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\nTRAIL");
1228 let (frame, rest) = parse_frame(input_map.clone()).unwrap();
1229 assert_eq!(
1230 frame,
1231 Frame::Map(vec![
1232 (
1233 Frame::SimpleString(Bytes::from("key1")),
1234 Frame::SimpleString(Bytes::from("val1"))
1235 ),
1236 (
1237 Frame::SimpleString(Bytes::from("key2")),
1238 Frame::SimpleString(Bytes::from("val2"))
1239 ),
1240 ])
1241 );
1242 assert_eq!(rest, Bytes::from("TRAIL"));
1243 let input_attr = Bytes::from("|1\r\n+meta\r\n+data\r\nAFTER");
1245 let (frame, rest) = parse_frame(input_attr.clone()).unwrap();
1246 assert_eq!(
1247 frame,
1248 Frame::Attribute(vec![(
1249 Frame::SimpleString(Bytes::from("meta")),
1250 Frame::SimpleString(Bytes::from("data"))
1251 ),])
1252 );
1253 assert_eq!(rest, Bytes::from("AFTER"));
1254 let input_push = Bytes::from(">2\r\n+type\r\n:1\r\nNEXT");
1256 let (frame, rest) = parse_frame(input_push.clone()).unwrap();
1257 assert_eq!(
1258 frame,
1259 Frame::Push(vec![
1260 Frame::SimpleString(Bytes::from("type")),
1261 Frame::Integer(1),
1262 ])
1263 );
1264 assert_eq!(rest, Bytes::from("NEXT"));
1265 }
1266
1267 #[test]
1268 fn test_parse_frame_empty_input() {
1269 assert!(parse_frame(Bytes::new()).is_err());
1270 }
1271
1272 #[test]
1273 fn test_parse_frame_invalid_tag() {
1274 let input = Bytes::from("X123\r\n");
1275 assert!(parse_frame(input).is_err());
1276 }
1277
1278 #[test]
1279 fn test_parse_frame_malformed_bulk_length() {
1280 let input = Bytes::from("$x\r\nfoo\r\n");
1281 assert!(parse_frame(input).is_err());
1282 }
1283
1284 #[test]
1285 fn test_parse_frame_zero_length_bulk() {
1286 let input = Bytes::from("$0\r\n\r\nTAIL");
1287 let (frame, rest) = parse_frame(input.clone()).unwrap();
1288 assert_eq!(frame, Frame::BulkString(Some(Bytes::from(""))));
1289 assert_eq!(rest, Bytes::from("TAIL"));
1290 }
1291
1292 #[test]
1293 fn test_parse_frame_zero_length_blob_error() {
1294 let input = Bytes::from("!0\r\n\r\nREST");
1295 let (frame, rest) = parse_frame(input.clone()).unwrap();
1296 assert_eq!(frame, Frame::BlobError(Bytes::new()));
1297 assert_eq!(rest, Bytes::from("REST"));
1298 }
1299
1300 #[test]
1301 fn test_parse_frame_missing_crlf() {
1302 let input = Bytes::from(":42\nTAIL");
1303 assert!(parse_frame(input).is_err());
1304 }
1305
1306 #[test]
1307 fn test_parse_frame_unicode_simple_string() {
1308 let input = Bytes::from("+こんにちは\r\nEND");
1309 let (frame, rest) = parse_frame(input.clone()).unwrap();
1310 assert_eq!(frame, Frame::SimpleString(Bytes::from("こんにちは")));
1311 assert_eq!(rest, Bytes::from("END"));
1312 }
1313
1314 #[test]
1315 fn test_parse_frame_chained_frames() {
1316 let combined = Bytes::from("+OK\r\n:1\r\nfoo");
1317 let (f1, rem) = parse_frame(combined.clone()).unwrap();
1318 assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
1319 let (f2, rem2) = parse_frame(rem).unwrap();
1320 assert_eq!(f2, Frame::Integer(1));
1321 assert_eq!(rem2, Bytes::from("foo"));
1322 }
1323
1324 #[test]
1325 fn test_parse_frame_empty_array() {
1326 let input = Bytes::from("*0\r\nTAIL");
1327 let (frame, rest) = parse_frame(input.clone()).unwrap();
1328 assert_eq!(frame, Frame::Array(Some(vec![])));
1329 assert_eq!(rest, Bytes::from("TAIL"));
1330 }
1331
1332 #[test]
1333 fn test_parse_frame_partial_array_data() {
1334 let input = Bytes::from("*2\r\n+OK\r\n");
1335 assert!(parse_frame(input).is_err());
1336 }
1337
1338 #[test]
1339 fn test_parse_frame_streamed_string() {
1340 let input = Bytes::from("$?\r\n$5\r\nhello\r\n$0\r\n\r\nREST");
1341 let (frame, rem) = parse_frame(input.clone()).unwrap();
1342 assert_eq!(frame, Frame::StreamedStringHeader);
1343 let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1344 assert_eq!(chunk, Frame::BulkString(Some(Bytes::from("hello"))));
1345 let (terminator, rest) = parse_frame(rem2.clone()).unwrap();
1346 assert_eq!(terminator, Frame::BulkString(Some(Bytes::from(""))));
1347 assert_eq!(rest, Bytes::from("REST"));
1348 }
1349
1350 #[test]
1351 fn test_parse_frame_streamed_blob_error() {
1352 let input = Bytes::from("!?\r\n!5\r\nERROR\r\nREST");
1353 let (frame, rem) = parse_frame(input.clone()).unwrap();
1354 assert_eq!(frame, Frame::StreamedBlobErrorHeader);
1355 let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1356 assert_eq!(chunk, Frame::BlobError(Bytes::from("ERROR")));
1357 assert_eq!(rem2, Bytes::from("REST"));
1358 }
1359
1360 #[test]
1361 fn test_parse_frame_streamed_verbatim_string() {
1362 let input = Bytes::from("=?\r\n=9\r\ntxt:hello\r\nTAIL");
1363 let (frame, rem) = parse_frame(input.clone()).unwrap();
1364 assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
1365 let (chunk, rest) = parse_frame(rem.clone()).unwrap();
1366 assert_eq!(
1367 chunk,
1368 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello")) );
1373 assert_eq!(rest, Bytes::from("TAIL"));
1374 }
1375
1376 #[test]
1377 fn test_parse_frame_streamed_array() {
1378 let input = Bytes::from("*?\r\n+one\r\n+two\r\n*0\r\nEND");
1379 let (header, rem) = parse_frame(input.clone()).unwrap();
1380 assert_eq!(header, Frame::StreamedArrayHeader);
1381 let (item1, rem2) = parse_frame(rem.clone()).unwrap();
1382 assert_eq!(item1, Frame::SimpleString(Bytes::from("one")));
1383 let (item2, rem3) = parse_frame(rem2.clone()).unwrap();
1384 assert_eq!(item2, Frame::SimpleString(Bytes::from("two")));
1385 let (terminator, rest) = parse_frame(rem3.clone()).unwrap();
1386 assert_eq!(terminator, Frame::Array(Some(vec![])));
1387 assert_eq!(rest, Bytes::from("END"));
1388 }
1389
1390 #[test]
1391 fn test_parse_frame_streamed_set_map_attr_push() {
1392 let input_set = Bytes::from("~?\r\n+foo\r\n+bar\r\n~0\r\nTAIL");
1394 let (h_set, rem0) = parse_frame(input_set.clone()).unwrap();
1395 assert_eq!(h_set, Frame::StreamedSetHeader);
1396 let (s1, rem1) = parse_frame(rem0.clone()).unwrap();
1397 assert_eq!(s1, Frame::SimpleString(Bytes::from("foo")));
1398 let (s2, rem2) = parse_frame(rem1.clone()).unwrap();
1399 assert_eq!(s2, Frame::SimpleString(Bytes::from("bar")));
1400 let (term_set, rest_set) = parse_frame(rem2.clone()).unwrap();
1401 assert_eq!(term_set, Frame::Set(vec![]));
1402 assert_eq!(rest_set, Bytes::from("TAIL"));
1403 let input_map = Bytes::from("%?\r\n+key\r\n+val\r\n%0\r\nNEXT");
1405 let (h_map, rem_map) = parse_frame(input_map.clone()).unwrap();
1406 assert_eq!(h_map, Frame::StreamedMapHeader);
1407 let (k, rem_map2) = parse_frame(rem_map.clone()).unwrap();
1408 assert_eq!(k, Frame::SimpleString(Bytes::from("key")));
1409 let (v, rem_map3) = parse_frame(rem_map2.clone()).unwrap();
1410 assert_eq!(v, Frame::SimpleString(Bytes::from("val")));
1411 let (term_map, rest_map4) = parse_frame(rem_map3.clone()).unwrap();
1412 assert_eq!(term_map, Frame::Map(vec![]));
1413 assert_eq!(rest_map4, Bytes::from("NEXT"));
1414 let input_attr = Bytes::from("|?\r\n+meta\r\n+info\r\n|0\r\nMORE");
1416 let (h_attr, rem_attr) = parse_frame(input_attr.clone()).unwrap();
1417 assert_eq!(h_attr, Frame::StreamedAttributeHeader);
1418 let (a1, rem_attr2) = parse_frame(rem_attr.clone()).unwrap();
1419 assert_eq!(a1, Frame::SimpleString(Bytes::from("meta")));
1420 let (a2, rem_attr3) = parse_frame(rem_attr2.clone()).unwrap();
1421 assert_eq!(a2, Frame::SimpleString(Bytes::from("info")));
1422 let (term_attr, rest_attr) = parse_frame(rem_attr3.clone()).unwrap();
1423 assert_eq!(term_attr, Frame::Attribute(vec![]));
1424 assert_eq!(rest_attr, Bytes::from("MORE"));
1425 let input_push = Bytes::from(">?\r\n:1\r\n:2\r\n>0\r\nEND");
1427 let (h_push, rem_push) = parse_frame(input_push.clone()).unwrap();
1428 assert_eq!(h_push, Frame::StreamedPushHeader);
1429 let (p1, rem_push2) = parse_frame(rem_push.clone()).unwrap();
1430 assert_eq!(p1, Frame::Integer(1));
1431 let (p2, rem_push3) = parse_frame(rem_push2.clone()).unwrap();
1432 assert_eq!(p2, Frame::Integer(2));
1433 let (term_push, rest_push) = parse_frame(rem_push3.clone()).unwrap();
1434 assert_eq!(term_push, Frame::Push(vec![]));
1435 assert_eq!(rest_push, Bytes::from("END"));
1436 }
1437
1438 #[test]
1439 fn test_parse_frame_stream_terminator() {
1440 let input = Bytes::from(".\r\nREST");
1441 let (frame, rest) = parse_frame(input.clone()).unwrap();
1442 assert_eq!(frame, Frame::StreamTerminator);
1443 assert_eq!(rest, Bytes::from("REST"));
1444 }
1445
1446 #[test]
1447 fn test_parse_frame_null_bulk_and_error() {
1448 let input1 = Bytes::from("!-1\r\nTAIL");
1449 let (f1, r1) = parse_frame(input1.clone()).unwrap();
1450 assert_eq!(f1, Frame::BlobError(Bytes::new()));
1451 assert_eq!(r1, Bytes::from("TAIL"));
1452 let input2 = Bytes::from("=-1\r\nTAIL");
1453 let (f2, r2) = parse_frame(input2.clone()).unwrap();
1454 assert_eq!(f2, Frame::VerbatimString(Bytes::new(), Bytes::new()));
1455 assert_eq!(r2, Bytes::from("TAIL"));
1456 }
1457
1458 #[test]
1459 fn test_parse_frame_special_float_nan() {
1460 let input = Bytes::from(",nan\r\nTAIL");
1461 let (frame, rest) = parse_frame(input.clone()).unwrap();
1462 assert_eq!(frame, Frame::SpecialFloat(Bytes::from("nan")));
1463 assert_eq!(rest, Bytes::from("TAIL"));
1464 }
1465
1466 #[test]
1467 fn test_parse_frame_big_number_zero() {
1468 let input = Bytes::from("(0\r\nEND");
1469 let (frame, rest) = parse_frame(input.clone()).unwrap();
1470 assert_eq!(frame, Frame::BigNumber(Bytes::from("0")));
1471 assert_eq!(rest, Bytes::from("END"));
1472 }
1473
1474 #[test]
1475 fn test_parse_frame_collection_empty() {
1476 let input_push = Bytes::from(">0\r\nTAIL");
1477 let (f_push, r_push) = parse_frame(input_push.clone()).unwrap();
1478 assert_eq!(f_push, Frame::Push(vec![]));
1479 assert_eq!(r_push, Bytes::from("TAIL"));
1480 let input_attr = Bytes::from("|0\r\nAFTER");
1481 let (f_attr, r_attr) = parse_frame(input_attr.clone()).unwrap();
1482 assert_eq!(f_attr, Frame::Attribute(vec![]));
1483 assert_eq!(r_attr, Bytes::from("AFTER"));
1484 let input_map = Bytes::from("%0\r\nEND");
1485 let (f_map, r_map) = parse_frame(input_map.clone()).unwrap();
1486 assert_eq!(f_map, Frame::Map(vec![]));
1487 assert_eq!(r_map, Bytes::from("END"));
1488 let input_set = Bytes::from("~0\r\nDONE");
1489 let (f_set, r_set) = parse_frame(input_set.clone()).unwrap();
1490 assert_eq!(f_set, Frame::Set(vec![]));
1491 assert_eq!(r_set, Bytes::from("DONE"));
1492 let input_arr = Bytes::from("*-1\r\nFIN");
1493 let (f_arr, r_arr) = parse_frame(input_arr.clone()).unwrap();
1494 assert_eq!(f_arr, Frame::Array(None));
1495 assert_eq!(r_arr, Bytes::from("FIN"));
1496 }
1497
1498 #[test]
1501 fn test_roundtrip_simple_string() {
1502 let original = Bytes::from("+hello\r\n");
1503 let (frame, _) = parse_frame(original.clone()).unwrap();
1504 let serialized = frame_to_bytes(&frame);
1505 assert_eq!(original, serialized);
1506
1507 let (reparsed, _) = parse_frame(serialized).unwrap();
1508 assert_eq!(frame, reparsed);
1509 }
1510
1511 #[test]
1512 fn test_roundtrip_error() {
1513 let original = Bytes::from("-ERR error message\r\n");
1514 let (frame, _) = parse_frame(original.clone()).unwrap();
1515 let serialized = frame_to_bytes(&frame);
1516 assert_eq!(original, serialized);
1517
1518 let (reparsed, _) = parse_frame(serialized).unwrap();
1519 assert_eq!(frame, reparsed);
1520 }
1521
1522 #[test]
1523 fn test_roundtrip_integer() {
1524 let original = Bytes::from(":12345\r\n");
1525 let (frame, _) = parse_frame(original.clone()).unwrap();
1526 let serialized = frame_to_bytes(&frame);
1527 assert_eq!(original, serialized);
1528
1529 let (reparsed, _) = parse_frame(serialized).unwrap();
1530 assert_eq!(frame, reparsed);
1531 }
1532
1533 #[test]
1534 fn test_roundtrip_bulk_string() {
1535 let original = Bytes::from("$5\r\nhello\r\n");
1536 let (frame, _) = parse_frame(original.clone()).unwrap();
1537 let serialized = frame_to_bytes(&frame);
1538 assert_eq!(original, serialized);
1539
1540 let (reparsed, _) = parse_frame(serialized).unwrap();
1541 assert_eq!(frame, reparsed);
1542
1543 let original_null = Bytes::from("$-1\r\n");
1545 let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1546 let serialized_null = frame_to_bytes(&frame_null);
1547 assert_eq!(original_null, serialized_null);
1548
1549 let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1550 assert_eq!(frame_null, reparsed_null);
1551 }
1552
1553 #[test]
1554 fn test_roundtrip_blob_error() {
1555 let original = Bytes::from("!5\r\nerror\r\n");
1556 let (frame, _) = parse_frame(original.clone()).unwrap();
1557 let serialized = frame_to_bytes(&frame);
1558 assert_eq!(original, serialized);
1559
1560 let (reparsed, _) = parse_frame(serialized).unwrap();
1561 assert_eq!(frame, reparsed);
1562 }
1563
1564 #[test]
1565 fn test_roundtrip_null() {
1566 let original = Bytes::from("_\r\n");
1567 let (frame, _) = parse_frame(original.clone()).unwrap();
1568 let serialized = frame_to_bytes(&frame);
1569 assert_eq!(original, serialized);
1570
1571 let (reparsed, _) = parse_frame(serialized).unwrap();
1572 assert_eq!(frame, reparsed);
1573 }
1574
1575 #[test]
1576 fn test_roundtrip_double() {
1577 let original = Bytes::from(",3.14159\r\n");
1578 let (frame, _) = parse_frame(original.clone()).unwrap();
1579 let serialized = frame_to_bytes(&frame);
1580
1581 let (reparsed, _) = parse_frame(serialized).unwrap();
1584 assert_eq!(frame, reparsed);
1585 }
1586
1587 #[test]
1588 fn test_roundtrip_special_float() {
1589 let original = Bytes::from(",inf\r\n");
1590 let (frame, _) = parse_frame(original.clone()).unwrap();
1591 let serialized = frame_to_bytes(&frame);
1592 assert_eq!(original, serialized);
1593
1594 let (reparsed, _) = parse_frame(serialized).unwrap();
1595 assert_eq!(frame, reparsed);
1596 }
1597
1598 #[test]
1599 fn test_roundtrip_boolean() {
1600 let original_true = Bytes::from("#t\r\n");
1601 let (frame_true, _) = parse_frame(original_true.clone()).unwrap();
1602 let serialized_true = frame_to_bytes(&frame_true);
1603 assert_eq!(original_true, serialized_true);
1604
1605 let (reparsed_true, _) = parse_frame(serialized_true).unwrap();
1606 assert_eq!(frame_true, reparsed_true);
1607
1608 let original_false = Bytes::from("#f\r\n");
1609 let (frame_false, _) = parse_frame(original_false.clone()).unwrap();
1610 let serialized_false = frame_to_bytes(&frame_false);
1611 assert_eq!(original_false, serialized_false);
1612
1613 let (reparsed_false, _) = parse_frame(serialized_false).unwrap();
1614 assert_eq!(frame_false, reparsed_false);
1615 }
1616
1617 #[test]
1618 fn test_roundtrip_big_number() {
1619 let original = Bytes::from("(12345678901234567890\r\n");
1620 let (frame, _) = parse_frame(original.clone()).unwrap();
1621 let serialized = frame_to_bytes(&frame);
1622 assert_eq!(original, serialized);
1623
1624 let (reparsed, _) = parse_frame(serialized).unwrap();
1625 assert_eq!(frame, reparsed);
1626 }
1627
1628 #[test]
1629 fn test_roundtrip_verbatim_string() {
1630 let original = Bytes::from("=10\r\ntxt:hello!\r\n");
1631 let (frame, _) = parse_frame(original.clone()).unwrap();
1632 let serialized = frame_to_bytes(&frame);
1633 assert_eq!(original, serialized);
1634
1635 let (reparsed, _) = parse_frame(serialized).unwrap();
1636 assert_eq!(frame, reparsed);
1637 }
1638
1639 #[test]
1640 fn test_roundtrip_array() {
1641 let original = Bytes::from("*2\r\n+hello\r\n:123\r\n");
1642 let (frame, _) = parse_frame(original.clone()).unwrap();
1643 let serialized = frame_to_bytes(&frame);
1644 assert_eq!(original, serialized);
1645
1646 let (reparsed, _) = parse_frame(serialized).unwrap();
1647 assert_eq!(frame, reparsed);
1648
1649 let original_null = Bytes::from("*-1\r\n");
1651 let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1652 let serialized_null = frame_to_bytes(&frame_null);
1653 assert_eq!(original_null, serialized_null);
1654
1655 let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1656 assert_eq!(frame_null, reparsed_null);
1657 }
1658
1659 #[test]
1660 fn test_roundtrip_set() {
1661 let original = Bytes::from("~2\r\n+one\r\n+two\r\n");
1662 let (frame, _) = parse_frame(original.clone()).unwrap();
1663 let serialized = frame_to_bytes(&frame);
1664 assert_eq!(original, serialized);
1665
1666 let (reparsed, _) = parse_frame(serialized).unwrap();
1667 assert_eq!(frame, reparsed);
1668 }
1669
1670 #[test]
1671 fn test_roundtrip_map() {
1672 let original = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\n");
1673 let (frame, _) = parse_frame(original.clone()).unwrap();
1674 let serialized = frame_to_bytes(&frame);
1675 assert_eq!(original, serialized);
1676
1677 let (reparsed, _) = parse_frame(serialized).unwrap();
1678 assert_eq!(frame, reparsed);
1679 }
1680
1681 #[test]
1682 fn test_roundtrip_attribute() {
1683 let original = Bytes::from("|1\r\n+key\r\n+val\r\n");
1684 let (frame, _) = parse_frame(original.clone()).unwrap();
1685 let serialized = frame_to_bytes(&frame);
1686 assert_eq!(original, serialized);
1687
1688 let (reparsed, _) = parse_frame(serialized).unwrap();
1689 assert_eq!(frame, reparsed);
1690 }
1691
1692 #[test]
1693 fn test_roundtrip_push() {
1694 let original = Bytes::from(">2\r\n+msg\r\n+data\r\n");
1695 let (frame, _) = parse_frame(original.clone()).unwrap();
1696 let serialized = frame_to_bytes(&frame);
1697 assert_eq!(original, serialized);
1698
1699 let (reparsed, _) = parse_frame(serialized).unwrap();
1700 assert_eq!(frame, reparsed);
1701 }
1702
1703 #[test]
1704 fn test_roundtrip_streaming_headers() {
1705 let headers = [
1706 ("$?\r\n", Frame::StreamedStringHeader),
1707 ("!?\r\n", Frame::StreamedBlobErrorHeader),
1708 ("=?\r\n", Frame::StreamedVerbatimStringHeader),
1709 ("*?\r\n", Frame::StreamedArrayHeader),
1710 ("~?\r\n", Frame::StreamedSetHeader),
1711 ("%?\r\n", Frame::StreamedMapHeader),
1712 ("|?\r\n", Frame::StreamedAttributeHeader),
1713 (">?\r\n", Frame::StreamedPushHeader),
1714 (".\r\n", Frame::StreamTerminator),
1715 ];
1716
1717 for (original_str, expected_frame) in headers {
1718 let original = Bytes::from(original_str);
1719 let (frame, _) = parse_frame(original.clone()).unwrap();
1720 assert_eq!(frame, expected_frame);
1721
1722 let serialized = frame_to_bytes(&frame);
1723 assert_eq!(original, serialized);
1724
1725 let (reparsed, _) = parse_frame(serialized).unwrap();
1726 assert_eq!(frame, reparsed);
1727 }
1728 }
1729
1730 #[test]
1731 fn test_roundtrip_streaming_chunks() {
1732 let chunks = [
1733 (
1734 ";4\r\nHell\r\n",
1735 Frame::StreamedStringChunk(Bytes::from("Hell")),
1736 ),
1737 (
1738 ";5\r\no wor\r\n",
1739 Frame::StreamedStringChunk(Bytes::from("o wor")),
1740 ),
1741 (";1\r\nd\r\n", Frame::StreamedStringChunk(Bytes::from("d"))),
1742 (";0\r\n\r\n", Frame::StreamedStringChunk(Bytes::new())),
1743 (
1744 ";11\r\nHello World\r\n",
1745 Frame::StreamedStringChunk(Bytes::from("Hello World")),
1746 ),
1747 ];
1748
1749 for (original_str, expected_frame) in chunks {
1750 let original = Bytes::from(original_str);
1751 let (frame, rest) = parse_frame(original.clone()).unwrap();
1752 assert_eq!(frame, expected_frame);
1753 assert!(rest.is_empty());
1754
1755 let serialized = frame_to_bytes(&frame);
1756 assert_eq!(original, serialized);
1757
1758 let (reparsed, _) = parse_frame(serialized).unwrap();
1759 assert_eq!(frame, reparsed);
1760 }
1761 }
1762
1763 #[test]
1764 fn test_streaming_chunks_edge_cases() {
1765 let data = Bytes::from(";4\r\nHel");
1767 let result = parse_frame(data);
1768 assert!(matches!(result, Err(ParseError::Incomplete)));
1769
1770 let data = Bytes::from(";4\r\nHell");
1772 let result = parse_frame(data);
1773 assert!(matches!(result, Err(ParseError::Incomplete)));
1774
1775 let data = Bytes::from(";abc\r\ndata\r\n");
1777 let result = parse_frame(data);
1778 assert!(matches!(result, Err(ParseError::BadLength)));
1779
1780 let data = Bytes::from(";-1\r\ndata\r\n");
1782 let result = parse_frame(data);
1783 assert!(matches!(result, Err(ParseError::BadLength)));
1784
1785 let data = Bytes::from(";5\r\nHell\r\n");
1787 let result = parse_frame(data);
1788 assert!(matches!(result, Err(ParseError::Incomplete)));
1789
1790 let data = Bytes::from(";0\r\n");
1792 let result = parse_frame(data);
1793 assert!(matches!(result, Err(ParseError::Incomplete)));
1794
1795 let binary_data = b"\x00\x01\x02\x03\xFF";
1797 let mut chunk_data = Vec::new();
1798 chunk_data.extend_from_slice(b";5\r\n");
1799 chunk_data.extend_from_slice(binary_data);
1800 chunk_data.extend_from_slice(b"\r\n");
1801 let data = Bytes::from(chunk_data);
1802 let result = parse_frame(data);
1803 assert!(result.is_ok());
1804 let (frame, _) = result.unwrap();
1805 if let Frame::StreamedStringChunk(chunk) = frame {
1806 assert_eq!(chunk.as_ref(), binary_data);
1807 }
1808 }
1809
1810 #[test]
1811 fn test_roundtrip_streaming_sequences() {
1812 let streaming_string = Frame::StreamedString(vec![
1814 Bytes::from("Hell"),
1815 Bytes::from("o wor"),
1816 Bytes::from("ld"),
1817 ]);
1818 let serialized = frame_to_bytes(&streaming_string);
1819 let expected = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;2\r\nld\r\n;0\r\n\r\n";
1820 assert_eq!(serialized, Bytes::from(expected));
1821
1822 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1823 assert_eq!(parsed, streaming_string);
1824
1825 let streaming_array = Frame::StreamedArray(vec![
1827 Frame::SimpleString(Bytes::from("hello")),
1828 Frame::Integer(42),
1829 Frame::Boolean(true),
1830 ]);
1831 let serialized = frame_to_bytes(&streaming_array);
1832 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1833 assert_eq!(parsed, streaming_array);
1834
1835 let streaming_map = Frame::StreamedMap(vec![
1837 (
1838 Frame::SimpleString(Bytes::from("key1")),
1839 Frame::SimpleString(Bytes::from("val1")),
1840 ),
1841 (
1842 Frame::SimpleString(Bytes::from("key2")),
1843 Frame::Integer(123),
1844 ),
1845 ]);
1846 let serialized = frame_to_bytes(&streaming_map);
1847 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1848 assert_eq!(parsed, streaming_map);
1849
1850 let empty_streaming = Frame::StreamedString(vec![]);
1852 let serialized = frame_to_bytes(&empty_streaming);
1853 let expected = "$?\r\n;0\r\n\r\n";
1854 assert_eq!(serialized, Bytes::from(expected));
1855 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1856 assert_eq!(parsed, empty_streaming);
1857
1858 let streaming_set = Frame::StreamedSet(vec![
1860 Frame::SimpleString(Bytes::from("apple")),
1861 Frame::SimpleString(Bytes::from("banana")),
1862 Frame::Integer(42),
1863 ]);
1864 let serialized = frame_to_bytes(&streaming_set);
1865 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1866 assert_eq!(parsed, streaming_set);
1867
1868 let streaming_attribute = Frame::StreamedAttribute(vec![
1870 (
1871 Frame::SimpleString(Bytes::from("trace-id")),
1872 Frame::SimpleString(Bytes::from("abc123")),
1873 ),
1874 (
1875 Frame::SimpleString(Bytes::from("span-id")),
1876 Frame::SimpleString(Bytes::from("def456")),
1877 ),
1878 ]);
1879 let serialized = frame_to_bytes(&streaming_attribute);
1880 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1881 assert_eq!(parsed, streaming_attribute);
1882
1883 let streaming_push = Frame::StreamedPush(vec![
1885 Frame::SimpleString(Bytes::from("pubsub")),
1886 Frame::SimpleString(Bytes::from("channel1")),
1887 Frame::SimpleString(Bytes::from("message data")),
1888 ]);
1889 let serialized = frame_to_bytes(&streaming_push);
1890 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1891 assert_eq!(parsed, streaming_push);
1892
1893 let empty_array = Frame::StreamedArray(vec![]);
1895 let serialized = frame_to_bytes(&empty_array);
1896 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1897 assert_eq!(parsed, empty_array);
1898
1899 let empty_set = Frame::StreamedSet(vec![]);
1900 let serialized = frame_to_bytes(&empty_set);
1901 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1902 assert_eq!(parsed, empty_set);
1903 }
1904
1905 #[test]
1906 fn test_streaming_sequences_edge_cases() {
1907 let data = Bytes::from("$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n");
1909 let result = parse_streaming_sequence(data);
1910 assert!(matches!(result, Err(ParseError::Incomplete)));
1911
1912 let data = Bytes::from("$?\r\n;abc\r\nHell\r\n;0\r\n");
1914 let result = parse_streaming_sequence(data);
1915 assert!(matches!(result, Err(ParseError::BadLength)));
1916
1917 let data = Bytes::from("*?\r\n+hello\r\n:42\r\n");
1919 let result = parse_streaming_sequence(data);
1920 assert!(matches!(result, Err(ParseError::Incomplete)));
1921
1922 let data = Bytes::from("*?\r\n+hello\r\n*2\r\n:1\r\n:2\r\n.\r\n");
1924 let result = parse_streaming_sequence(data);
1925 assert!(result.is_ok());
1926 let (frame, _) = result.unwrap();
1927 if let Frame::StreamedArray(items) = frame {
1928 assert_eq!(items.len(), 2);
1929 assert!(matches!(items[0], Frame::SimpleString(_)));
1930 assert!(matches!(items[1], Frame::Array(_)));
1931 }
1932
1933 let data = Bytes::from("*?\r\n.\r\n");
1935 let result = parse_streaming_sequence(data);
1936 assert!(result.is_ok());
1937 let (frame, _) = result.unwrap();
1938 if let Frame::StreamedArray(items) = frame {
1939 assert!(items.is_empty());
1940 }
1941
1942 let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+orphan\r\n.\r\n");
1944 let result = parse_streaming_sequence(data);
1945 assert!(matches!(result, Err(ParseError::Incomplete)));
1946
1947 let data = Bytes::from("+simple\r\n");
1949 let result = parse_streaming_sequence(data);
1950 assert!(result.is_ok());
1951 let (frame, _) = result.unwrap();
1952 assert!(matches!(frame, Frame::SimpleString(_)));
1953
1954 let data = Bytes::from(";999999999999999999\r\ndata\r\n");
1956 let result = parse_frame(data);
1957 match &result {
1960 Err(ParseError::BadLength) => {} Err(ParseError::Incomplete) => {} Err(e) => panic!("Got unexpected error type: {e:?}"),
1963 Ok(_) => panic!("Large chunk size should fail"),
1964 }
1965
1966 let data = Bytes::from("$?\r\n+invalid\r\n;0\r\n");
1968 let result = parse_streaming_sequence(data);
1969 assert!(matches!(result, Err(ParseError::InvalidFormat)));
1970
1971 let data = Bytes::from("*?\r\n+hello\r\n.corrupted\r\n");
1973 let result = parse_streaming_sequence(data);
1974 assert!(matches!(result, Err(ParseError::Incomplete)));
1975
1976 let data = Bytes::new();
1978 let result = parse_streaming_sequence(data);
1979 assert!(matches!(result, Err(ParseError::Incomplete)));
1980
1981 let data = Bytes::from("*?\r\n+hello\r\n$5\r\nwo");
1983 let result = parse_streaming_sequence(data);
1984 assert!(matches!(result, Err(ParseError::Incomplete)));
1985 }
1986
1987 #[test]
1988 fn test_roundtrip_nested_structures() {
1989 let original = Bytes::from(
1991 "*3\r\n+hello\r\n%2\r\n+key1\r\n:123\r\n+key2\r\n~1\r\n+item\r\n|1\r\n+meta\r\n+data\r\n",
1992 );
1993 let (frame, _) = parse_frame(original.clone()).unwrap();
1994 let serialized = frame_to_bytes(&frame);
1995
1996 let (reparsed, _) = parse_frame(serialized).unwrap();
1997 assert_eq!(frame, reparsed);
1998 }
1999
2000 #[test]
2001 fn test_zero_length_bulk_string_requires_trailing_crlf() {
2002 let input = Bytes::from("$0\r\n\r\nTAIL");
2004 let (frame, rest) = parse_frame(input).unwrap();
2005 assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
2006 assert_eq!(rest, Bytes::from("TAIL"));
2007
2008 let input = Bytes::from("$0\r\n");
2010 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2011
2012 let input = Bytes::from("$0\r\n\r");
2014 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2015
2016 let input = Bytes::from("$0\r\nXY");
2018 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2019 }
2020
2021 #[test]
2022 fn test_zero_length_streamed_chunk_requires_trailing_crlf() {
2023 let input = Bytes::from(";0\r\n\r\nTAIL");
2025 let (frame, rest) = parse_frame(input).unwrap();
2026 assert_eq!(frame, Frame::StreamedStringChunk(Bytes::new()));
2027 assert_eq!(rest, Bytes::from("TAIL"));
2028
2029 let input = Bytes::from(";0\r\n");
2031 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2032
2033 let input = Bytes::from(";0\r\nXY");
2035 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2036 }
2037
2038 #[test]
2039 fn test_integer_overflow_returns_overflow_error() {
2040 let input = Bytes::from(":9223372036854775808\r\n");
2042 assert_eq!(parse_frame(input), Err(ParseError::Overflow));
2043
2044 let input = Bytes::from(":9223372036854775807\r\n");
2046 let (frame, _) = parse_frame(input).unwrap();
2047 assert_eq!(frame, Frame::Integer(i64::MAX));
2048
2049 let input = Bytes::from(":-9223372036854775808\r\n");
2051 let (frame, _) = parse_frame(input).unwrap();
2052 assert_eq!(frame, Frame::Integer(i64::MIN));
2053 }
2054
2055 #[test]
2056 fn test_parser_propagates_errors() {
2057 let mut parser = Parser::new();
2058 parser.feed(Bytes::from("XINVALID\r\n"));
2059 let result = parser.next_frame();
2060 assert!(result.is_err());
2061 assert_eq!(result.unwrap_err(), ParseError::InvalidTag(b'X'));
2062 }
2063
2064 #[test]
2065 fn test_parser_returns_ok_none_for_incomplete() {
2066 let mut parser = Parser::new();
2067 parser.feed(Bytes::from("+HELL"));
2068 assert_eq!(parser.next_frame().unwrap(), None);
2069 }
2070}