1use bytes::{BufMut, Bytes, BytesMut};
7
8const MAX_COLLECTION_SIZE: usize = 10_000_000;
11
12#[derive(Default, Debug)]
17pub struct Parser {
18 buffer: BytesMut,
19}
20
21impl Parser {
22 pub fn new() -> Self {
24 Self {
25 buffer: BytesMut::new(),
26 }
27 }
28
29 pub fn feed(&mut self, data: Bytes) {
33 self.buffer.extend_from_slice(&data);
34 }
35
36 pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
42 if self.buffer.is_empty() {
43 return Ok(None);
44 }
45
46 let bytes = self.buffer.split().freeze();
47
48 match parse_frame_inner(&bytes, 0) {
49 Ok((frame, consumed)) => {
50 if consumed < bytes.len() {
51 self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
52 }
53 Ok(Some(frame))
54 }
55 Err(ParseError::Incomplete) => {
56 self.buffer.unsplit(bytes.into());
57 Ok(None)
58 }
59 Err(e) => Err(e),
60 }
61 }
62
63 pub fn buffered_bytes(&self) -> usize {
65 self.buffer.len()
66 }
67
68 pub fn clear(&mut self) {
70 self.buffer.clear();
71 }
72}
73
74#[derive(Debug, Clone, PartialEq)]
80pub enum Frame {
81 SimpleString(Bytes),
83 Error(Bytes),
85 Integer(i64),
87 BulkString(Option<Bytes>),
90 BlobError(Bytes),
92 StreamedStringHeader,
94 StreamedBlobErrorHeader,
96 StreamedVerbatimStringHeader,
98 StreamedArrayHeader,
100 StreamedSetHeader,
102 StreamedMapHeader,
104 StreamedAttributeHeader,
106 StreamedPushHeader,
108 StreamedStringChunk(Bytes),
124
125 StreamedString(Vec<Bytes>),
143
144 StreamedArray(Vec<Frame>),
162
163 StreamedSet(Vec<Frame>),
168
169 StreamedMap(Vec<(Frame, Frame)>),
185
186 StreamedAttribute(Vec<(Frame, Frame)>),
191
192 StreamedPush(Vec<Frame>),
210 StreamTerminator,
212 Null,
214 Double(f64),
216 SpecialFloat(Bytes),
218 Boolean(bool),
220 BigNumber(Bytes),
222 VerbatimString(Bytes, Bytes),
225 Array(Option<Vec<Frame>>),
227 Set(Vec<Frame>),
229 Map(Vec<(Frame, Frame)>),
231 Attribute(Vec<(Frame, Frame)>),
233 Push(Vec<Frame>),
235}
236
237pub use crate::ParseError;
238
239pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
243 let (frame, consumed) = parse_frame_inner(&input, 0)?;
244 Ok((frame, input.slice(consumed..)))
245}
246
247fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
250 let buf = input.as_ref();
251 if pos >= buf.len() {
252 return Err(ParseError::Incomplete);
253 }
254
255 let tag = buf[pos];
256
257 match tag {
258 b'+' => {
259 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
260 Ok((
261 Frame::SimpleString(input.slice(pos + 1..line_end)),
262 after_crlf,
263 ))
264 }
265 b'-' => {
266 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
267 Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
268 }
269 b':' => {
270 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
271 let v = parse_i64(&buf[pos + 1..line_end])?;
272 Ok((Frame::Integer(v), after_crlf))
273 }
274 b'$' => {
275 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
276 let len_bytes = &buf[pos + 1..line_end];
277 if len_bytes == b"?" {
279 return Ok((Frame::StreamedStringHeader, after_crlf));
280 }
281 if len_bytes == b"-1" {
283 return Ok((Frame::BulkString(None), after_crlf));
284 }
285 let len = parse_usize(len_bytes)?;
286 if len == 0 {
287 if after_crlf + 1 >= buf.len() {
288 return Err(ParseError::Incomplete);
289 }
290 if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
291 return Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2));
292 } else {
293 return Err(ParseError::InvalidFormat);
294 }
295 }
296 let data_start = after_crlf;
297 let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
298 if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
299 return Err(ParseError::Incomplete);
300 }
301 Ok((
302 Frame::BulkString(Some(input.slice(data_start..data_end))),
303 data_end + 2,
304 ))
305 }
306 b'_' => {
307 if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
308 Ok((Frame::Null, pos + 3))
309 } else {
310 Err(ParseError::Incomplete)
311 }
312 }
313 b',' => {
314 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
315 let line_bytes = &buf[pos + 1..line_end];
316 if line_bytes == b"inf" || line_bytes == b"-inf" || line_bytes == b"nan" {
318 return Ok((
319 Frame::SpecialFloat(input.slice(pos + 1..line_end)),
320 after_crlf,
321 ));
322 }
323 let s = std::str::from_utf8(line_bytes).map_err(|_| ParseError::Utf8Error)?;
325 let v = s.parse::<f64>().map_err(|_| ParseError::InvalidFormat)?;
326 if v.is_infinite() || v.is_nan() {
329 let canonical = if v.is_nan() {
330 "nan"
331 } else if v.is_sign_negative() {
332 "-inf"
333 } else {
334 "inf"
335 };
336 return Ok((Frame::SpecialFloat(Bytes::from(canonical)), after_crlf));
337 }
338 Ok((Frame::Double(v), after_crlf))
339 }
340 b'#' => {
341 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
342 match &buf[pos + 1..line_end] {
343 b"t" => Ok((Frame::Boolean(true), after_crlf)),
344 b"f" => Ok((Frame::Boolean(false), after_crlf)),
345 _ => Err(ParseError::InvalidBoolean),
346 }
347 }
348 b'(' => {
349 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
350 Ok((Frame::BigNumber(input.slice(pos + 1..line_end)), after_crlf))
351 }
352 b'=' => {
353 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
354 let len_bytes = &buf[pos + 1..line_end];
355 if len_bytes == b"?" {
357 return Ok((Frame::StreamedVerbatimStringHeader, after_crlf));
358 }
359 if len_bytes == b"-1" {
361 return Ok((
362 Frame::VerbatimString(Bytes::new(), Bytes::new()),
363 after_crlf,
364 ));
365 }
366 let len = parse_usize(len_bytes)?;
367 let data_start = after_crlf;
368 let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
369 if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
370 return Err(ParseError::Incomplete);
371 }
372 let sep = buf[data_start..data_end]
374 .iter()
375 .position(|&b| b == b':')
376 .ok_or(ParseError::InvalidFormat)?;
377 let format = input.slice(data_start..data_start + sep);
378 let content = input.slice(data_start + sep + 1..data_end);
379 Ok((Frame::VerbatimString(format, content), data_end + 2))
380 }
381 b'!' => {
382 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
383 let len_bytes = &buf[pos + 1..line_end];
384 if len_bytes == b"?" {
386 return Ok((Frame::StreamedBlobErrorHeader, after_crlf));
387 }
388 if len_bytes == b"-1" {
389 return Ok((Frame::BlobError(Bytes::new()), after_crlf));
390 }
391 let len = parse_usize(len_bytes)?;
392 let data_start = after_crlf;
393 let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
394 if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
395 return Err(ParseError::Incomplete);
396 }
397 Ok((
398 Frame::BlobError(input.slice(data_start..data_end)),
399 data_end + 2,
400 ))
401 }
402 b'*' => {
403 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
404 let len_bytes = &buf[pos + 1..line_end];
405 if len_bytes == b"?" {
406 return Ok((Frame::StreamedArrayHeader, after_crlf));
407 }
408 if len_bytes == b"-1" {
409 return Ok((Frame::Array(None), after_crlf));
410 }
411 let count = parse_count(len_bytes)?;
412 if count == 0 {
413 return Ok((Frame::Array(Some(Vec::new())), after_crlf));
414 }
415 let mut cursor = after_crlf;
416 let mut items = Vec::with_capacity(count);
417 for _ in 0..count {
418 let (item, next) = parse_frame_inner(input, cursor)?;
419 items.push(item);
420 cursor = next;
421 }
422 Ok((Frame::Array(Some(items)), cursor))
423 }
424 b'~' => {
425 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
426 let len_bytes = &buf[pos + 1..line_end];
427 if len_bytes == b"?" {
428 return Ok((Frame::StreamedSetHeader, after_crlf));
429 }
430 let count = parse_count(len_bytes)?;
431 let mut cursor = after_crlf;
432 let mut items = Vec::with_capacity(count);
433 for _ in 0..count {
434 let (item, next) = parse_frame_inner(input, cursor)?;
435 items.push(item);
436 cursor = next;
437 }
438 Ok((Frame::Set(items), cursor))
439 }
440 b'%' | b'|' => {
441 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
442 let len_bytes = &buf[pos + 1..line_end];
443 if len_bytes == b"?" {
444 return if tag == b'%' {
445 Ok((Frame::StreamedMapHeader, after_crlf))
446 } else {
447 Ok((Frame::StreamedAttributeHeader, after_crlf))
448 };
449 }
450 let count = parse_count(len_bytes)?;
451 let mut cursor = after_crlf;
452 let mut pairs = Vec::with_capacity(count);
453 for _ in 0..count {
454 let (key, next1) = parse_frame_inner(input, cursor)?;
455 let (val, next2) = parse_frame_inner(input, next1)?;
456 pairs.push((key, val));
457 cursor = next2;
458 }
459 if tag == b'%' {
460 Ok((Frame::Map(pairs), cursor))
461 } else {
462 Ok((Frame::Attribute(pairs), cursor))
463 }
464 }
465 b'>' => {
466 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
467 let len_bytes = &buf[pos + 1..line_end];
468 if len_bytes == b"?" {
469 return Ok((Frame::StreamedPushHeader, after_crlf));
470 }
471 let count = parse_count(len_bytes)?;
472 let mut cursor = after_crlf;
473 let mut items = Vec::with_capacity(count);
474 for _ in 0..count {
475 let (item, next) = parse_frame_inner(input, cursor)?;
476 items.push(item);
477 cursor = next;
478 }
479 Ok((Frame::Push(items), cursor))
480 }
481 b';' => {
482 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
483 let len = parse_usize(&buf[pos + 1..line_end])?;
484 if len == 0 {
485 if after_crlf + 1 >= buf.len() {
486 return Err(ParseError::Incomplete);
487 }
488 if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
489 return Ok((Frame::StreamedStringChunk(Bytes::new()), after_crlf + 2));
490 } else {
491 return Err(ParseError::InvalidFormat);
492 }
493 }
494 let data_start = after_crlf;
495 let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
496 if data_end + 1 >= buf.len() || buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
497 return Err(ParseError::Incomplete);
498 }
499 Ok((
500 Frame::StreamedStringChunk(input.slice(data_start..data_end)),
501 data_end + 2,
502 ))
503 }
504 b'.' => {
505 if pos + 2 < buf.len() && buf[pos + 1] == b'\r' && buf[pos + 2] == b'\n' {
506 Ok((Frame::StreamTerminator, pos + 3))
507 } else {
508 Err(ParseError::Incomplete)
509 }
510 }
511 _ => Err(ParseError::InvalidTag(tag)),
512 }
513}
514
515pub fn parse_streaming_sequence(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
595 if input.is_empty() {
596 return Err(ParseError::Incomplete);
597 }
598
599 let (header, mut rest) = parse_frame(input)?;
600
601 match header {
602 Frame::StreamedStringHeader => {
603 let mut chunks = Vec::new();
605
606 loop {
607 let (frame, new_rest) = parse_frame(rest)?;
608 rest = new_rest;
609
610 match frame {
611 Frame::StreamedStringChunk(chunk) => {
612 if chunk.is_empty() {
613 break;
615 }
616 chunks.push(chunk);
617 }
618 _ => {
619 return Err(ParseError::InvalidFormat);
620 }
621 }
622 }
623
624 Ok((Frame::StreamedString(chunks), rest))
625 }
626 Frame::StreamedBlobErrorHeader => {
627 let mut chunks = Vec::new();
630
631 loop {
632 let (frame, new_rest) = parse_frame(rest)?;
633 rest = new_rest;
634
635 match frame {
636 Frame::BlobError(chunk) => {
637 if chunk.is_empty() {
640 break;
641 }
642 chunks.push(chunk);
643 }
644 _ => {
645 return Err(ParseError::InvalidFormat);
646 }
647 }
648 }
649
650 let total_len: usize = chunks.iter().map(|c| c.len()).sum();
652 let mut combined = Vec::with_capacity(total_len);
653 for chunk in chunks {
654 combined.extend_from_slice(&chunk);
655 }
656 Ok((Frame::BlobError(Bytes::from(combined)), rest))
657 }
658 Frame::StreamedVerbatimStringHeader => {
659 let mut chunks = Vec::new();
661
662 loop {
663 let (frame, new_rest) = parse_frame(rest)?;
664 rest = new_rest;
665
666 match frame {
667 Frame::VerbatimString(format, content) => {
668 if content.is_empty() && format.len() == 3 {
670 break;
671 }
672 chunks.push((format, content));
675 }
676 _ => {
677 return Err(ParseError::InvalidFormat);
678 }
679 }
680 }
681
682 if chunks.is_empty() {
684 return Ok((Frame::VerbatimString(Bytes::new(), Bytes::new()), rest));
685 }
686
687 let format = chunks[0].0.clone();
688 let total_len: usize = chunks.iter().map(|(_, c)| c.len()).sum();
689 let mut combined = Vec::with_capacity(total_len);
690 for (_, content) in chunks {
691 combined.extend_from_slice(&content);
692 }
693 Ok((Frame::VerbatimString(format, Bytes::from(combined)), rest))
694 }
695 Frame::StreamedArrayHeader => {
696 let mut items = Vec::new();
698
699 loop {
700 let (frame, new_rest) = parse_frame(rest)?;
701 rest = new_rest;
702
703 match frame {
704 Frame::StreamTerminator => {
705 break;
706 }
707 item => {
708 items.push(item);
709 }
710 }
711 }
712
713 Ok((Frame::StreamedArray(items), rest))
714 }
715 Frame::StreamedSetHeader => {
716 let mut items = Vec::new();
718
719 loop {
720 let (frame, new_rest) = parse_frame(rest)?;
721 rest = new_rest;
722
723 match frame {
724 Frame::StreamTerminator => {
725 break;
726 }
727 item => {
728 items.push(item);
729 }
730 }
731 }
732
733 Ok((Frame::StreamedSet(items), rest))
734 }
735 Frame::StreamedMapHeader => {
736 let mut pairs = Vec::new();
738
739 loop {
740 let (frame, new_rest) = parse_frame(rest)?;
741 rest = new_rest;
742
743 match frame {
744 Frame::StreamTerminator => {
745 break;
746 }
747 key => {
748 let (value, newer_rest) = parse_frame(rest)?;
749 rest = newer_rest;
750 pairs.push((key, value));
751 }
752 }
753 }
754
755 Ok((Frame::StreamedMap(pairs), rest))
756 }
757 Frame::StreamedAttributeHeader => {
758 let mut pairs = Vec::new();
760
761 loop {
762 let (frame, new_rest) = parse_frame(rest)?;
763 rest = new_rest;
764
765 match frame {
766 Frame::StreamTerminator => {
767 break;
768 }
769 key => {
770 let (value, newer_rest) = parse_frame(rest)?;
771 rest = newer_rest;
772 pairs.push((key, value));
773 }
774 }
775 }
776
777 Ok((Frame::StreamedAttribute(pairs), rest))
778 }
779 Frame::StreamedPushHeader => {
780 let mut items = Vec::new();
782
783 loop {
784 let (frame, new_rest) = parse_frame(rest)?;
785 rest = new_rest;
786
787 match frame {
788 Frame::StreamTerminator => {
789 break;
790 }
791 item => {
792 items.push(item);
793 }
794 }
795 }
796
797 Ok((Frame::StreamedPush(items), rest))
798 }
799 _ => {
800 Ok((header, rest))
802 }
803 }
804}
805
806#[inline]
809fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
810 let mut i = from;
811 let len = buf.len();
812 while i + 1 < len {
813 if buf[i] == b'\r' && buf[i + 1] == b'\n' {
814 return Ok((i, i + 2));
815 }
816 i += 1;
817 }
818 Err(ParseError::Incomplete)
819}
820
821#[inline]
823fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
824 if buf.is_empty() {
825 return Err(ParseError::BadLength);
826 }
827 let mut v: usize = 0;
828 for &b in buf {
829 if !b.is_ascii_digit() {
830 return Err(ParseError::BadLength);
831 }
832 v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
833 v = v
834 .checked_add((b - b'0') as usize)
835 .ok_or(ParseError::BadLength)?;
836 }
837 Ok(v)
838}
839
840#[inline]
842fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
843 if buf.is_empty() {
844 return Err(ParseError::InvalidFormat);
845 }
846 let (neg, digits) = if buf[0] == b'-' {
847 (true, &buf[1..])
848 } else {
849 (false, buf)
850 };
851 if digits.is_empty() {
852 return Err(ParseError::InvalidFormat);
853 }
854 let mut v: i64 = 0;
855 for (i, &d) in digits.iter().enumerate() {
856 if !d.is_ascii_digit() {
857 return Err(ParseError::InvalidFormat);
858 }
859 let digit = (d - b'0') as i64;
860 if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
861 return Ok(i64::MIN);
862 }
863 if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
864 return Err(ParseError::Overflow);
865 }
866 v = v * 10 + digit;
867 }
868 if neg { Ok(-v) } else { Ok(v) }
869}
870
871#[inline]
873fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
874 let count = parse_usize(buf)?;
875 if count > MAX_COLLECTION_SIZE {
876 return Err(ParseError::BadLength);
877 }
878 Ok(count)
879}
880
881pub fn frame_to_bytes(frame: &Frame) -> Bytes {
885 let mut buf = BytesMut::new();
886 serialize_frame(frame, &mut buf);
887 buf.freeze()
888}
889
890fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
891 match frame {
892 Frame::SimpleString(s) => {
893 buf.put_u8(b'+');
894 buf.extend_from_slice(s);
895 buf.extend_from_slice(b"\r\n");
896 }
897 Frame::Error(e) => {
898 buf.put_u8(b'-');
899 buf.extend_from_slice(e);
900 buf.extend_from_slice(b"\r\n");
901 }
902 Frame::Integer(i) => {
903 buf.put_u8(b':');
904 let s = i.to_string();
905 buf.extend_from_slice(s.as_bytes());
906 buf.extend_from_slice(b"\r\n");
907 }
908 Frame::BulkString(opt) => {
909 buf.put_u8(b'$');
910 match opt {
911 Some(data) => {
912 let len = data.len().to_string();
913 buf.extend_from_slice(len.as_bytes());
914 buf.extend_from_slice(b"\r\n");
915 buf.extend_from_slice(data);
916 buf.extend_from_slice(b"\r\n");
917 }
918 None => {
919 buf.extend_from_slice(b"-1\r\n");
920 }
921 }
922 }
923 Frame::BlobError(data) => {
924 buf.put_u8(b'!');
925 let len = data.len().to_string();
926 buf.extend_from_slice(len.as_bytes());
927 buf.extend_from_slice(b"\r\n");
928 buf.extend_from_slice(data);
929 buf.extend_from_slice(b"\r\n");
930 }
931 Frame::StreamedStringHeader => {
932 buf.extend_from_slice(b"$?\r\n");
933 }
934 Frame::StreamedBlobErrorHeader => {
935 buf.extend_from_slice(b"!?\r\n");
936 }
937 Frame::StreamedVerbatimStringHeader => {
938 buf.extend_from_slice(b"=?\r\n");
939 }
940 Frame::StreamedArrayHeader => {
941 buf.extend_from_slice(b"*?\r\n");
942 }
943 Frame::StreamedSetHeader => {
944 buf.extend_from_slice(b"~?\r\n");
945 }
946 Frame::StreamedMapHeader => {
947 buf.extend_from_slice(b"%?\r\n");
948 }
949 Frame::StreamedAttributeHeader => {
950 buf.extend_from_slice(b"|?\r\n");
951 }
952 Frame::StreamedPushHeader => {
953 buf.extend_from_slice(b">?\r\n");
954 }
955 Frame::StreamedStringChunk(data) => {
956 buf.put_u8(b';');
957 let len = data.len().to_string();
958 buf.extend_from_slice(len.as_bytes());
959 buf.extend_from_slice(b"\r\n");
960 buf.extend_from_slice(data);
961 buf.extend_from_slice(b"\r\n");
962 }
963 Frame::StreamedString(chunks) => {
964 buf.extend_from_slice(b"$?\r\n");
966 for chunk in chunks {
967 buf.put_u8(b';');
968 let len = chunk.len().to_string();
969 buf.extend_from_slice(len.as_bytes());
970 buf.extend_from_slice(b"\r\n");
971 buf.extend_from_slice(chunk);
972 buf.extend_from_slice(b"\r\n");
973 }
974 buf.extend_from_slice(b";0\r\n\r\n");
975 }
976 Frame::StreamedArray(items) => {
977 buf.extend_from_slice(b"*?\r\n");
978 for item in items {
979 serialize_frame(item, buf);
980 }
981 buf.extend_from_slice(b".\r\n");
982 }
983 Frame::StreamedSet(items) => {
984 buf.extend_from_slice(b"~?\r\n");
985 for item in items {
986 serialize_frame(item, buf);
987 }
988 buf.extend_from_slice(b".\r\n");
989 }
990 Frame::StreamedMap(pairs) => {
991 buf.extend_from_slice(b"%?\r\n");
992 for (key, value) in pairs {
993 serialize_frame(key, buf);
994 serialize_frame(value, buf);
995 }
996 buf.extend_from_slice(b".\r\n");
997 }
998 Frame::StreamedAttribute(pairs) => {
999 buf.extend_from_slice(b"|?\r\n");
1000 for (key, value) in pairs {
1001 serialize_frame(key, buf);
1002 serialize_frame(value, buf);
1003 }
1004 buf.extend_from_slice(b".\r\n");
1005 }
1006 Frame::StreamedPush(items) => {
1007 buf.extend_from_slice(b">?\r\n");
1008 for item in items {
1009 serialize_frame(item, buf);
1010 }
1011 buf.extend_from_slice(b".\r\n");
1012 }
1013 Frame::StreamTerminator => {
1014 buf.extend_from_slice(b".\r\n");
1015 }
1016 Frame::Null => {
1017 buf.extend_from_slice(b"_\r\n");
1018 }
1019 Frame::Double(d) => {
1020 buf.put_u8(b',');
1021 let s = d.to_string();
1022 buf.extend_from_slice(s.as_bytes());
1023 buf.extend_from_slice(b"\r\n");
1024 }
1025 Frame::SpecialFloat(f) => {
1026 buf.put_u8(b',');
1027 buf.extend_from_slice(f);
1028 buf.extend_from_slice(b"\r\n");
1029 }
1030 Frame::Boolean(b) => {
1031 buf.extend_from_slice(if *b { b"#t\r\n" } else { b"#f\r\n" });
1032 }
1033 Frame::BigNumber(n) => {
1034 buf.put_u8(b'(');
1035 buf.extend_from_slice(n);
1036 buf.extend_from_slice(b"\r\n");
1037 }
1038 Frame::VerbatimString(format, content) => {
1039 buf.put_u8(b'=');
1040 let total_len = format.len() + 1 + content.len(); let len = total_len.to_string();
1042 buf.extend_from_slice(len.as_bytes());
1043 buf.extend_from_slice(b"\r\n");
1044 buf.extend_from_slice(format);
1045 buf.put_u8(b':');
1046 buf.extend_from_slice(content);
1047 buf.extend_from_slice(b"\r\n");
1048 }
1049 Frame::Array(opt) => {
1050 buf.put_u8(b'*');
1051 match opt {
1052 Some(items) => {
1053 let len = items.len().to_string();
1054 buf.extend_from_slice(len.as_bytes());
1055 buf.extend_from_slice(b"\r\n");
1056 for item in items {
1057 serialize_frame(item, buf);
1058 }
1059 }
1060 None => {
1061 buf.extend_from_slice(b"-1\r\n");
1062 }
1063 }
1064 }
1065 Frame::Set(items) => {
1066 buf.put_u8(b'~');
1067 let len = items.len().to_string();
1068 buf.extend_from_slice(len.as_bytes());
1069 buf.extend_from_slice(b"\r\n");
1070 for item in items {
1071 serialize_frame(item, buf);
1072 }
1073 }
1074 Frame::Map(pairs) => {
1075 buf.put_u8(b'%');
1076 let len = pairs.len().to_string();
1077 buf.extend_from_slice(len.as_bytes());
1078 buf.extend_from_slice(b"\r\n");
1079 for (key, value) in pairs {
1080 serialize_frame(key, buf);
1081 serialize_frame(value, buf);
1082 }
1083 }
1084 Frame::Attribute(pairs) => {
1085 buf.put_u8(b'|');
1086 let len = pairs.len().to_string();
1087 buf.extend_from_slice(len.as_bytes());
1088 buf.extend_from_slice(b"\r\n");
1089 for (key, value) in pairs {
1090 serialize_frame(key, buf);
1091 serialize_frame(value, buf);
1092 }
1093 }
1094 Frame::Push(items) => {
1095 buf.put_u8(b'>');
1096 let len = items.len().to_string();
1097 buf.extend_from_slice(len.as_bytes());
1098 buf.extend_from_slice(b"\r\n");
1099 for item in items {
1100 serialize_frame(item, buf);
1101 }
1102 }
1103 }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108 use super::{Frame, ParseError, Parser, frame_to_bytes, parse_frame, parse_streaming_sequence};
1109 use bytes::Bytes;
1110
1111 #[test]
1112 fn test_parse_frame_simple_string() {
1113 let input = Bytes::from("+HELLO\r\nWORLD");
1114 let (frame, rest) = parse_frame(input.clone()).unwrap();
1115 assert_eq!(frame, Frame::SimpleString(Bytes::from("HELLO")));
1116 assert_eq!(rest, Bytes::from("WORLD"));
1117 }
1118
1119 #[test]
1120 fn test_parse_frame_blob_error() {
1121 let input = Bytes::from("!5\r\nERROR\r\nREST");
1122 let (frame, rest) = parse_frame(input.clone()).unwrap();
1123 assert_eq!(frame, Frame::BlobError(Bytes::from("ERROR")));
1124 assert_eq!(rest, Bytes::from("REST"));
1125 }
1126
1127 #[test]
1128 fn test_parse_frame_error() {
1129 let input = Bytes::from("-ERR fail\r\nLEFT");
1130 let (frame, rest) = parse_frame(input.clone()).unwrap();
1131 assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
1132 assert_eq!(rest, Bytes::from("LEFT"));
1133 }
1134
1135 #[test]
1136 fn test_parse_frame_integer() {
1137 let input = Bytes::from(":42\r\nTAIL");
1138 let (frame, rest) = parse_frame(input.clone()).unwrap();
1139 assert_eq!(frame, Frame::Integer(42));
1140 assert_eq!(rest, Bytes::from("TAIL"));
1141 }
1142
1143 #[test]
1144 fn test_parse_frame_bulk_string() {
1145 let input = Bytes::from("$3\r\nfoo\r\nREST");
1146 let (frame, rest) = parse_frame(input.clone()).unwrap();
1147 assert_eq!(frame, Frame::BulkString(Some(Bytes::from("foo"))));
1148 assert_eq!(rest, Bytes::from("REST"));
1149 let null_input = Bytes::from("$-1\r\nAFTER");
1150 let (frame, rest) = parse_frame(null_input.clone()).unwrap();
1151 assert_eq!(frame, Frame::BulkString(None));
1152 assert_eq!(rest, Bytes::from("AFTER"));
1153 }
1154
1155 #[test]
1156 fn test_parse_frame_null() {
1157 let input = Bytes::from("_\r\nLEFT");
1158 let (frame, rest) = parse_frame(input.clone()).unwrap();
1159 assert_eq!(frame, Frame::Null);
1160 assert_eq!(rest, Bytes::from("LEFT"));
1161 }
1162
1163 #[test]
1164 fn test_parse_frame_double_and_special_float() {
1165 let input = Bytes::from(",3.5\r\nNEXT");
1166 let (frame, rest) = parse_frame(input.clone()).unwrap();
1167 assert_eq!(frame, Frame::Double(3.5));
1168 assert_eq!(rest, Bytes::from("NEXT"));
1169 let input_inf = Bytes::from(",inf\r\nTAIL");
1170 let (frame, rest) = parse_frame(input_inf.clone()).unwrap();
1171 assert_eq!(frame, Frame::SpecialFloat(Bytes::from("inf")));
1172 assert_eq!(rest, Bytes::from("TAIL"));
1173 }
1174
1175 #[test]
1176 fn test_parse_frame_boolean() {
1177 let input_true = Bytes::from("#t\r\nXYZ");
1178 let (frame, rest) = parse_frame(input_true.clone()).unwrap();
1179 assert_eq!(frame, Frame::Boolean(true));
1180 assert_eq!(rest, Bytes::from("XYZ"));
1181 let input_false = Bytes::from("#f\r\nDONE");
1182 let (frame, rest) = parse_frame(input_false.clone()).unwrap();
1183 assert_eq!(frame, Frame::Boolean(false));
1184 assert_eq!(rest, Bytes::from("DONE"));
1185 }
1186
1187 #[test]
1188 fn test_parse_frame_big_number() {
1189 let input = Bytes::from("(123456789\r\nEND");
1190 let (frame, rest) = parse_frame(input.clone()).unwrap();
1191 assert_eq!(frame, Frame::BigNumber(Bytes::from("123456789")));
1192 assert_eq!(rest, Bytes::from("END"));
1193 }
1194
1195 #[test]
1196 fn test_parse_frame_verbatim_string() {
1197 let input = Bytes::from("=12\r\ntxt:hi there\r\nAFTER");
1198 let (frame, rest) = parse_frame(input.clone()).unwrap();
1199 assert_eq!(
1200 frame,
1201 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hi there")) );
1206 assert_eq!(rest, Bytes::from("AFTER"));
1207 }
1208
1209 #[test]
1210 fn test_parse_frame_array_set_push_map_attribute() {
1211 let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\nTAIL");
1213 let (frame, rest) = parse_frame(input.clone()).unwrap();
1214 assert_eq!(
1215 frame,
1216 Frame::Array(Some(vec![
1217 Frame::BulkString(Some(Bytes::from("foo"))),
1218 Frame::BulkString(Some(Bytes::from("bar")))
1219 ]))
1220 );
1221 assert_eq!(rest, Bytes::from("TAIL"));
1222 let input_null = Bytes::from("*-1\r\nEND");
1224 let (frame, rest) = parse_frame(input_null.clone()).unwrap();
1225 assert_eq!(frame, Frame::Array(None));
1226 assert_eq!(rest, Bytes::from("END"));
1227 let input_set = Bytes::from("~2\r\n+foo\r\n+bar\r\nTAIL");
1229 let (frame, rest) = parse_frame(input_set.clone()).unwrap();
1230 assert_eq!(
1231 frame,
1232 Frame::Set(vec![
1233 Frame::SimpleString(Bytes::from("foo")),
1234 Frame::SimpleString(Bytes::from("bar")),
1235 ])
1236 );
1237 assert_eq!(rest, Bytes::from("TAIL"));
1238 let input_map = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\nTRAIL");
1240 let (frame, rest) = parse_frame(input_map.clone()).unwrap();
1241 assert_eq!(
1242 frame,
1243 Frame::Map(vec![
1244 (
1245 Frame::SimpleString(Bytes::from("key1")),
1246 Frame::SimpleString(Bytes::from("val1"))
1247 ),
1248 (
1249 Frame::SimpleString(Bytes::from("key2")),
1250 Frame::SimpleString(Bytes::from("val2"))
1251 ),
1252 ])
1253 );
1254 assert_eq!(rest, Bytes::from("TRAIL"));
1255 let input_attr = Bytes::from("|1\r\n+meta\r\n+data\r\nAFTER");
1257 let (frame, rest) = parse_frame(input_attr.clone()).unwrap();
1258 assert_eq!(
1259 frame,
1260 Frame::Attribute(vec![(
1261 Frame::SimpleString(Bytes::from("meta")),
1262 Frame::SimpleString(Bytes::from("data"))
1263 ),])
1264 );
1265 assert_eq!(rest, Bytes::from("AFTER"));
1266 let input_push = Bytes::from(">2\r\n+type\r\n:1\r\nNEXT");
1268 let (frame, rest) = parse_frame(input_push.clone()).unwrap();
1269 assert_eq!(
1270 frame,
1271 Frame::Push(vec![
1272 Frame::SimpleString(Bytes::from("type")),
1273 Frame::Integer(1),
1274 ])
1275 );
1276 assert_eq!(rest, Bytes::from("NEXT"));
1277 }
1278
1279 #[test]
1280 fn test_parse_frame_empty_input() {
1281 assert!(parse_frame(Bytes::new()).is_err());
1282 }
1283
1284 #[test]
1285 fn test_parse_frame_invalid_tag() {
1286 let input = Bytes::from("X123\r\n");
1287 assert!(parse_frame(input).is_err());
1288 }
1289
1290 #[test]
1291 fn test_parse_frame_malformed_bulk_length() {
1292 let input = Bytes::from("$x\r\nfoo\r\n");
1293 assert!(parse_frame(input).is_err());
1294 }
1295
1296 #[test]
1297 fn test_parse_frame_zero_length_bulk() {
1298 let input = Bytes::from("$0\r\n\r\nTAIL");
1299 let (frame, rest) = parse_frame(input.clone()).unwrap();
1300 assert_eq!(frame, Frame::BulkString(Some(Bytes::from(""))));
1301 assert_eq!(rest, Bytes::from("TAIL"));
1302 }
1303
1304 #[test]
1305 fn test_parse_frame_zero_length_blob_error() {
1306 let input = Bytes::from("!0\r\n\r\nREST");
1307 let (frame, rest) = parse_frame(input.clone()).unwrap();
1308 assert_eq!(frame, Frame::BlobError(Bytes::new()));
1309 assert_eq!(rest, Bytes::from("REST"));
1310 }
1311
1312 #[test]
1313 fn test_parse_frame_missing_crlf() {
1314 let input = Bytes::from(":42\nTAIL");
1315 assert!(parse_frame(input).is_err());
1316 }
1317
1318 #[test]
1319 fn test_parse_frame_unicode_simple_string() {
1320 let input = Bytes::from("+こんにちは\r\nEND");
1321 let (frame, rest) = parse_frame(input.clone()).unwrap();
1322 assert_eq!(frame, Frame::SimpleString(Bytes::from("こんにちは")));
1323 assert_eq!(rest, Bytes::from("END"));
1324 }
1325
1326 #[test]
1327 fn test_parse_frame_chained_frames() {
1328 let combined = Bytes::from("+OK\r\n:1\r\nfoo");
1329 let (f1, rem) = parse_frame(combined.clone()).unwrap();
1330 assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
1331 let (f2, rem2) = parse_frame(rem).unwrap();
1332 assert_eq!(f2, Frame::Integer(1));
1333 assert_eq!(rem2, Bytes::from("foo"));
1334 }
1335
1336 #[test]
1337 fn test_parse_frame_empty_array() {
1338 let input = Bytes::from("*0\r\nTAIL");
1339 let (frame, rest) = parse_frame(input.clone()).unwrap();
1340 assert_eq!(frame, Frame::Array(Some(vec![])));
1341 assert_eq!(rest, Bytes::from("TAIL"));
1342 }
1343
1344 #[test]
1345 fn test_parse_frame_partial_array_data() {
1346 let input = Bytes::from("*2\r\n+OK\r\n");
1347 assert!(parse_frame(input).is_err());
1348 }
1349
1350 #[test]
1351 fn test_parse_frame_streamed_string() {
1352 let input = Bytes::from("$?\r\n$5\r\nhello\r\n$0\r\n\r\nREST");
1353 let (frame, rem) = parse_frame(input.clone()).unwrap();
1354 assert_eq!(frame, Frame::StreamedStringHeader);
1355 let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1356 assert_eq!(chunk, Frame::BulkString(Some(Bytes::from("hello"))));
1357 let (terminator, rest) = parse_frame(rem2.clone()).unwrap();
1358 assert_eq!(terminator, Frame::BulkString(Some(Bytes::from(""))));
1359 assert_eq!(rest, Bytes::from("REST"));
1360 }
1361
1362 #[test]
1363 fn test_parse_frame_streamed_blob_error() {
1364 let input = Bytes::from("!?\r\n!5\r\nERROR\r\nREST");
1365 let (frame, rem) = parse_frame(input.clone()).unwrap();
1366 assert_eq!(frame, Frame::StreamedBlobErrorHeader);
1367 let (chunk, rem2) = parse_frame(rem.clone()).unwrap();
1368 assert_eq!(chunk, Frame::BlobError(Bytes::from("ERROR")));
1369 assert_eq!(rem2, Bytes::from("REST"));
1370 }
1371
1372 #[test]
1373 fn test_parse_frame_streamed_verbatim_string() {
1374 let input = Bytes::from("=?\r\n=9\r\ntxt:hello\r\nTAIL");
1375 let (frame, rem) = parse_frame(input.clone()).unwrap();
1376 assert_eq!(frame, Frame::StreamedVerbatimStringHeader);
1377 let (chunk, rest) = parse_frame(rem.clone()).unwrap();
1378 assert_eq!(
1379 chunk,
1380 Frame::VerbatimString(Bytes::from("txt"), Bytes::from("hello")) );
1385 assert_eq!(rest, Bytes::from("TAIL"));
1386 }
1387
1388 #[test]
1389 fn test_parse_frame_streamed_array() {
1390 let input = Bytes::from("*?\r\n+one\r\n+two\r\n*0\r\nEND");
1391 let (header, rem) = parse_frame(input.clone()).unwrap();
1392 assert_eq!(header, Frame::StreamedArrayHeader);
1393 let (item1, rem2) = parse_frame(rem.clone()).unwrap();
1394 assert_eq!(item1, Frame::SimpleString(Bytes::from("one")));
1395 let (item2, rem3) = parse_frame(rem2.clone()).unwrap();
1396 assert_eq!(item2, Frame::SimpleString(Bytes::from("two")));
1397 let (terminator, rest) = parse_frame(rem3.clone()).unwrap();
1398 assert_eq!(terminator, Frame::Array(Some(vec![])));
1399 assert_eq!(rest, Bytes::from("END"));
1400 }
1401
1402 #[test]
1403 fn test_parse_frame_streamed_set_map_attr_push() {
1404 let input_set = Bytes::from("~?\r\n+foo\r\n+bar\r\n~0\r\nTAIL");
1406 let (h_set, rem0) = parse_frame(input_set.clone()).unwrap();
1407 assert_eq!(h_set, Frame::StreamedSetHeader);
1408 let (s1, rem1) = parse_frame(rem0.clone()).unwrap();
1409 assert_eq!(s1, Frame::SimpleString(Bytes::from("foo")));
1410 let (s2, rem2) = parse_frame(rem1.clone()).unwrap();
1411 assert_eq!(s2, Frame::SimpleString(Bytes::from("bar")));
1412 let (term_set, rest_set) = parse_frame(rem2.clone()).unwrap();
1413 assert_eq!(term_set, Frame::Set(vec![]));
1414 assert_eq!(rest_set, Bytes::from("TAIL"));
1415 let input_map = Bytes::from("%?\r\n+key\r\n+val\r\n%0\r\nNEXT");
1417 let (h_map, rem_map) = parse_frame(input_map.clone()).unwrap();
1418 assert_eq!(h_map, Frame::StreamedMapHeader);
1419 let (k, rem_map2) = parse_frame(rem_map.clone()).unwrap();
1420 assert_eq!(k, Frame::SimpleString(Bytes::from("key")));
1421 let (v, rem_map3) = parse_frame(rem_map2.clone()).unwrap();
1422 assert_eq!(v, Frame::SimpleString(Bytes::from("val")));
1423 let (term_map, rest_map4) = parse_frame(rem_map3.clone()).unwrap();
1424 assert_eq!(term_map, Frame::Map(vec![]));
1425 assert_eq!(rest_map4, Bytes::from("NEXT"));
1426 let input_attr = Bytes::from("|?\r\n+meta\r\n+info\r\n|0\r\nMORE");
1428 let (h_attr, rem_attr) = parse_frame(input_attr.clone()).unwrap();
1429 assert_eq!(h_attr, Frame::StreamedAttributeHeader);
1430 let (a1, rem_attr2) = parse_frame(rem_attr.clone()).unwrap();
1431 assert_eq!(a1, Frame::SimpleString(Bytes::from("meta")));
1432 let (a2, rem_attr3) = parse_frame(rem_attr2.clone()).unwrap();
1433 assert_eq!(a2, Frame::SimpleString(Bytes::from("info")));
1434 let (term_attr, rest_attr) = parse_frame(rem_attr3.clone()).unwrap();
1435 assert_eq!(term_attr, Frame::Attribute(vec![]));
1436 assert_eq!(rest_attr, Bytes::from("MORE"));
1437 let input_push = Bytes::from(">?\r\n:1\r\n:2\r\n>0\r\nEND");
1439 let (h_push, rem_push) = parse_frame(input_push.clone()).unwrap();
1440 assert_eq!(h_push, Frame::StreamedPushHeader);
1441 let (p1, rem_push2) = parse_frame(rem_push.clone()).unwrap();
1442 assert_eq!(p1, Frame::Integer(1));
1443 let (p2, rem_push3) = parse_frame(rem_push2.clone()).unwrap();
1444 assert_eq!(p2, Frame::Integer(2));
1445 let (term_push, rest_push) = parse_frame(rem_push3.clone()).unwrap();
1446 assert_eq!(term_push, Frame::Push(vec![]));
1447 assert_eq!(rest_push, Bytes::from("END"));
1448 }
1449
1450 #[test]
1451 fn test_parse_frame_stream_terminator() {
1452 let input = Bytes::from(".\r\nREST");
1453 let (frame, rest) = parse_frame(input.clone()).unwrap();
1454 assert_eq!(frame, Frame::StreamTerminator);
1455 assert_eq!(rest, Bytes::from("REST"));
1456 }
1457
1458 #[test]
1459 fn test_parse_frame_null_bulk_and_error() {
1460 let input1 = Bytes::from("!-1\r\nTAIL");
1461 let (f1, r1) = parse_frame(input1.clone()).unwrap();
1462 assert_eq!(f1, Frame::BlobError(Bytes::new()));
1463 assert_eq!(r1, Bytes::from("TAIL"));
1464 let input2 = Bytes::from("=-1\r\nTAIL");
1465 let (f2, r2) = parse_frame(input2.clone()).unwrap();
1466 assert_eq!(f2, Frame::VerbatimString(Bytes::new(), Bytes::new()));
1467 assert_eq!(r2, Bytes::from("TAIL"));
1468 }
1469
1470 #[test]
1471 fn test_parse_frame_special_float_nan() {
1472 let input = Bytes::from(",nan\r\nTAIL");
1473 let (frame, rest) = parse_frame(input.clone()).unwrap();
1474 assert_eq!(frame, Frame::SpecialFloat(Bytes::from("nan")));
1475 assert_eq!(rest, Bytes::from("TAIL"));
1476 }
1477
1478 #[test]
1479 fn test_parse_frame_big_number_zero() {
1480 let input = Bytes::from("(0\r\nEND");
1481 let (frame, rest) = parse_frame(input.clone()).unwrap();
1482 assert_eq!(frame, Frame::BigNumber(Bytes::from("0")));
1483 assert_eq!(rest, Bytes::from("END"));
1484 }
1485
1486 #[test]
1487 fn test_parse_frame_collection_empty() {
1488 let input_push = Bytes::from(">0\r\nTAIL");
1489 let (f_push, r_push) = parse_frame(input_push.clone()).unwrap();
1490 assert_eq!(f_push, Frame::Push(vec![]));
1491 assert_eq!(r_push, Bytes::from("TAIL"));
1492 let input_attr = Bytes::from("|0\r\nAFTER");
1493 let (f_attr, r_attr) = parse_frame(input_attr.clone()).unwrap();
1494 assert_eq!(f_attr, Frame::Attribute(vec![]));
1495 assert_eq!(r_attr, Bytes::from("AFTER"));
1496 let input_map = Bytes::from("%0\r\nEND");
1497 let (f_map, r_map) = parse_frame(input_map.clone()).unwrap();
1498 assert_eq!(f_map, Frame::Map(vec![]));
1499 assert_eq!(r_map, Bytes::from("END"));
1500 let input_set = Bytes::from("~0\r\nDONE");
1501 let (f_set, r_set) = parse_frame(input_set.clone()).unwrap();
1502 assert_eq!(f_set, Frame::Set(vec![]));
1503 assert_eq!(r_set, Bytes::from("DONE"));
1504 let input_arr = Bytes::from("*-1\r\nFIN");
1505 let (f_arr, r_arr) = parse_frame(input_arr.clone()).unwrap();
1506 assert_eq!(f_arr, Frame::Array(None));
1507 assert_eq!(r_arr, Bytes::from("FIN"));
1508 }
1509
1510 #[test]
1513 fn test_roundtrip_simple_string() {
1514 let original = Bytes::from("+hello\r\n");
1515 let (frame, _) = parse_frame(original.clone()).unwrap();
1516 let serialized = frame_to_bytes(&frame);
1517 assert_eq!(original, serialized);
1518
1519 let (reparsed, _) = parse_frame(serialized).unwrap();
1520 assert_eq!(frame, reparsed);
1521 }
1522
1523 #[test]
1524 fn test_roundtrip_error() {
1525 let original = Bytes::from("-ERR error message\r\n");
1526 let (frame, _) = parse_frame(original.clone()).unwrap();
1527 let serialized = frame_to_bytes(&frame);
1528 assert_eq!(original, serialized);
1529
1530 let (reparsed, _) = parse_frame(serialized).unwrap();
1531 assert_eq!(frame, reparsed);
1532 }
1533
1534 #[test]
1535 fn test_roundtrip_integer() {
1536 let original = Bytes::from(":12345\r\n");
1537 let (frame, _) = parse_frame(original.clone()).unwrap();
1538 let serialized = frame_to_bytes(&frame);
1539 assert_eq!(original, serialized);
1540
1541 let (reparsed, _) = parse_frame(serialized).unwrap();
1542 assert_eq!(frame, reparsed);
1543 }
1544
1545 #[test]
1546 fn test_roundtrip_bulk_string() {
1547 let original = Bytes::from("$5\r\nhello\r\n");
1548 let (frame, _) = parse_frame(original.clone()).unwrap();
1549 let serialized = frame_to_bytes(&frame);
1550 assert_eq!(original, serialized);
1551
1552 let (reparsed, _) = parse_frame(serialized).unwrap();
1553 assert_eq!(frame, reparsed);
1554
1555 let original_null = Bytes::from("$-1\r\n");
1557 let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1558 let serialized_null = frame_to_bytes(&frame_null);
1559 assert_eq!(original_null, serialized_null);
1560
1561 let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1562 assert_eq!(frame_null, reparsed_null);
1563 }
1564
1565 #[test]
1566 fn test_roundtrip_blob_error() {
1567 let original = Bytes::from("!5\r\nerror\r\n");
1568 let (frame, _) = parse_frame(original.clone()).unwrap();
1569 let serialized = frame_to_bytes(&frame);
1570 assert_eq!(original, serialized);
1571
1572 let (reparsed, _) = parse_frame(serialized).unwrap();
1573 assert_eq!(frame, reparsed);
1574 }
1575
1576 #[test]
1577 fn test_roundtrip_null() {
1578 let original = Bytes::from("_\r\n");
1579 let (frame, _) = parse_frame(original.clone()).unwrap();
1580 let serialized = frame_to_bytes(&frame);
1581 assert_eq!(original, serialized);
1582
1583 let (reparsed, _) = parse_frame(serialized).unwrap();
1584 assert_eq!(frame, reparsed);
1585 }
1586
1587 #[test]
1588 fn test_roundtrip_double() {
1589 let original = Bytes::from(",3.14159\r\n");
1590 let (frame, _) = parse_frame(original.clone()).unwrap();
1591 let serialized = frame_to_bytes(&frame);
1592
1593 let (reparsed, _) = parse_frame(serialized).unwrap();
1596 assert_eq!(frame, reparsed);
1597 }
1598
1599 #[test]
1600 fn test_roundtrip_special_float() {
1601 let original = Bytes::from(",inf\r\n");
1602 let (frame, _) = parse_frame(original.clone()).unwrap();
1603 let serialized = frame_to_bytes(&frame);
1604 assert_eq!(original, serialized);
1605
1606 let (reparsed, _) = parse_frame(serialized).unwrap();
1607 assert_eq!(frame, reparsed);
1608 }
1609
1610 #[test]
1611 fn test_roundtrip_boolean() {
1612 let original_true = Bytes::from("#t\r\n");
1613 let (frame_true, _) = parse_frame(original_true.clone()).unwrap();
1614 let serialized_true = frame_to_bytes(&frame_true);
1615 assert_eq!(original_true, serialized_true);
1616
1617 let (reparsed_true, _) = parse_frame(serialized_true).unwrap();
1618 assert_eq!(frame_true, reparsed_true);
1619
1620 let original_false = Bytes::from("#f\r\n");
1621 let (frame_false, _) = parse_frame(original_false.clone()).unwrap();
1622 let serialized_false = frame_to_bytes(&frame_false);
1623 assert_eq!(original_false, serialized_false);
1624
1625 let (reparsed_false, _) = parse_frame(serialized_false).unwrap();
1626 assert_eq!(frame_false, reparsed_false);
1627 }
1628
1629 #[test]
1630 fn test_roundtrip_big_number() {
1631 let original = Bytes::from("(12345678901234567890\r\n");
1632 let (frame, _) = parse_frame(original.clone()).unwrap();
1633 let serialized = frame_to_bytes(&frame);
1634 assert_eq!(original, serialized);
1635
1636 let (reparsed, _) = parse_frame(serialized).unwrap();
1637 assert_eq!(frame, reparsed);
1638 }
1639
1640 #[test]
1641 fn test_roundtrip_verbatim_string() {
1642 let original = Bytes::from("=10\r\ntxt:hello!\r\n");
1643 let (frame, _) = parse_frame(original.clone()).unwrap();
1644 let serialized = frame_to_bytes(&frame);
1645 assert_eq!(original, serialized);
1646
1647 let (reparsed, _) = parse_frame(serialized).unwrap();
1648 assert_eq!(frame, reparsed);
1649 }
1650
1651 #[test]
1652 fn test_roundtrip_array() {
1653 let original = Bytes::from("*2\r\n+hello\r\n:123\r\n");
1654 let (frame, _) = parse_frame(original.clone()).unwrap();
1655 let serialized = frame_to_bytes(&frame);
1656 assert_eq!(original, serialized);
1657
1658 let (reparsed, _) = parse_frame(serialized).unwrap();
1659 assert_eq!(frame, reparsed);
1660
1661 let original_null = Bytes::from("*-1\r\n");
1663 let (frame_null, _) = parse_frame(original_null.clone()).unwrap();
1664 let serialized_null = frame_to_bytes(&frame_null);
1665 assert_eq!(original_null, serialized_null);
1666
1667 let (reparsed_null, _) = parse_frame(serialized_null).unwrap();
1668 assert_eq!(frame_null, reparsed_null);
1669 }
1670
1671 #[test]
1672 fn test_roundtrip_set() {
1673 let original = Bytes::from("~2\r\n+one\r\n+two\r\n");
1674 let (frame, _) = parse_frame(original.clone()).unwrap();
1675 let serialized = frame_to_bytes(&frame);
1676 assert_eq!(original, serialized);
1677
1678 let (reparsed, _) = parse_frame(serialized).unwrap();
1679 assert_eq!(frame, reparsed);
1680 }
1681
1682 #[test]
1683 fn test_roundtrip_map() {
1684 let original = Bytes::from("%2\r\n+key1\r\n+val1\r\n+key2\r\n+val2\r\n");
1685 let (frame, _) = parse_frame(original.clone()).unwrap();
1686 let serialized = frame_to_bytes(&frame);
1687 assert_eq!(original, serialized);
1688
1689 let (reparsed, _) = parse_frame(serialized).unwrap();
1690 assert_eq!(frame, reparsed);
1691 }
1692
1693 #[test]
1694 fn test_roundtrip_attribute() {
1695 let original = Bytes::from("|1\r\n+key\r\n+val\r\n");
1696 let (frame, _) = parse_frame(original.clone()).unwrap();
1697 let serialized = frame_to_bytes(&frame);
1698 assert_eq!(original, serialized);
1699
1700 let (reparsed, _) = parse_frame(serialized).unwrap();
1701 assert_eq!(frame, reparsed);
1702 }
1703
1704 #[test]
1705 fn test_roundtrip_push() {
1706 let original = Bytes::from(">2\r\n+msg\r\n+data\r\n");
1707 let (frame, _) = parse_frame(original.clone()).unwrap();
1708 let serialized = frame_to_bytes(&frame);
1709 assert_eq!(original, serialized);
1710
1711 let (reparsed, _) = parse_frame(serialized).unwrap();
1712 assert_eq!(frame, reparsed);
1713 }
1714
1715 #[test]
1716 fn test_roundtrip_streaming_headers() {
1717 let headers = [
1718 ("$?\r\n", Frame::StreamedStringHeader),
1719 ("!?\r\n", Frame::StreamedBlobErrorHeader),
1720 ("=?\r\n", Frame::StreamedVerbatimStringHeader),
1721 ("*?\r\n", Frame::StreamedArrayHeader),
1722 ("~?\r\n", Frame::StreamedSetHeader),
1723 ("%?\r\n", Frame::StreamedMapHeader),
1724 ("|?\r\n", Frame::StreamedAttributeHeader),
1725 (">?\r\n", Frame::StreamedPushHeader),
1726 (".\r\n", Frame::StreamTerminator),
1727 ];
1728
1729 for (original_str, expected_frame) in headers {
1730 let original = Bytes::from(original_str);
1731 let (frame, _) = parse_frame(original.clone()).unwrap();
1732 assert_eq!(frame, expected_frame);
1733
1734 let serialized = frame_to_bytes(&frame);
1735 assert_eq!(original, serialized);
1736
1737 let (reparsed, _) = parse_frame(serialized).unwrap();
1738 assert_eq!(frame, reparsed);
1739 }
1740 }
1741
1742 #[test]
1743 fn test_roundtrip_streaming_chunks() {
1744 let chunks = [
1745 (
1746 ";4\r\nHell\r\n",
1747 Frame::StreamedStringChunk(Bytes::from("Hell")),
1748 ),
1749 (
1750 ";5\r\no wor\r\n",
1751 Frame::StreamedStringChunk(Bytes::from("o wor")),
1752 ),
1753 (";1\r\nd\r\n", Frame::StreamedStringChunk(Bytes::from("d"))),
1754 (";0\r\n\r\n", Frame::StreamedStringChunk(Bytes::new())),
1755 (
1756 ";11\r\nHello World\r\n",
1757 Frame::StreamedStringChunk(Bytes::from("Hello World")),
1758 ),
1759 ];
1760
1761 for (original_str, expected_frame) in chunks {
1762 let original = Bytes::from(original_str);
1763 let (frame, rest) = parse_frame(original.clone()).unwrap();
1764 assert_eq!(frame, expected_frame);
1765 assert!(rest.is_empty());
1766
1767 let serialized = frame_to_bytes(&frame);
1768 assert_eq!(original, serialized);
1769
1770 let (reparsed, _) = parse_frame(serialized).unwrap();
1771 assert_eq!(frame, reparsed);
1772 }
1773 }
1774
1775 #[test]
1776 fn test_streaming_chunks_edge_cases() {
1777 let data = Bytes::from(";4\r\nHel");
1779 let result = parse_frame(data);
1780 assert!(matches!(result, Err(ParseError::Incomplete)));
1781
1782 let data = Bytes::from(";4\r\nHell");
1784 let result = parse_frame(data);
1785 assert!(matches!(result, Err(ParseError::Incomplete)));
1786
1787 let data = Bytes::from(";abc\r\ndata\r\n");
1789 let result = parse_frame(data);
1790 assert!(matches!(result, Err(ParseError::BadLength)));
1791
1792 let data = Bytes::from(";-1\r\ndata\r\n");
1794 let result = parse_frame(data);
1795 assert!(matches!(result, Err(ParseError::BadLength)));
1796
1797 let data = Bytes::from(";5\r\nHell\r\n");
1799 let result = parse_frame(data);
1800 assert!(matches!(result, Err(ParseError::Incomplete)));
1801
1802 let data = Bytes::from(";0\r\n");
1804 let result = parse_frame(data);
1805 assert!(matches!(result, Err(ParseError::Incomplete)));
1806
1807 let binary_data = b"\x00\x01\x02\x03\xFF";
1809 let mut chunk_data = Vec::new();
1810 chunk_data.extend_from_slice(b";5\r\n");
1811 chunk_data.extend_from_slice(binary_data);
1812 chunk_data.extend_from_slice(b"\r\n");
1813 let data = Bytes::from(chunk_data);
1814 let result = parse_frame(data);
1815 assert!(result.is_ok());
1816 let (frame, _) = result.unwrap();
1817 if let Frame::StreamedStringChunk(chunk) = frame {
1818 assert_eq!(chunk.as_ref(), binary_data);
1819 }
1820 }
1821
1822 #[test]
1823 fn test_roundtrip_streaming_sequences() {
1824 let streaming_string = Frame::StreamedString(vec![
1826 Bytes::from("Hell"),
1827 Bytes::from("o wor"),
1828 Bytes::from("ld"),
1829 ]);
1830 let serialized = frame_to_bytes(&streaming_string);
1831 let expected = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;2\r\nld\r\n;0\r\n\r\n";
1832 assert_eq!(serialized, Bytes::from(expected));
1833
1834 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1835 assert_eq!(parsed, streaming_string);
1836
1837 let streaming_array = Frame::StreamedArray(vec![
1839 Frame::SimpleString(Bytes::from("hello")),
1840 Frame::Integer(42),
1841 Frame::Boolean(true),
1842 ]);
1843 let serialized = frame_to_bytes(&streaming_array);
1844 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1845 assert_eq!(parsed, streaming_array);
1846
1847 let streaming_map = Frame::StreamedMap(vec![
1849 (
1850 Frame::SimpleString(Bytes::from("key1")),
1851 Frame::SimpleString(Bytes::from("val1")),
1852 ),
1853 (
1854 Frame::SimpleString(Bytes::from("key2")),
1855 Frame::Integer(123),
1856 ),
1857 ]);
1858 let serialized = frame_to_bytes(&streaming_map);
1859 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1860 assert_eq!(parsed, streaming_map);
1861
1862 let empty_streaming = Frame::StreamedString(vec![]);
1864 let serialized = frame_to_bytes(&empty_streaming);
1865 let expected = "$?\r\n;0\r\n\r\n";
1866 assert_eq!(serialized, Bytes::from(expected));
1867 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1868 assert_eq!(parsed, empty_streaming);
1869
1870 let streaming_set = Frame::StreamedSet(vec![
1872 Frame::SimpleString(Bytes::from("apple")),
1873 Frame::SimpleString(Bytes::from("banana")),
1874 Frame::Integer(42),
1875 ]);
1876 let serialized = frame_to_bytes(&streaming_set);
1877 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1878 assert_eq!(parsed, streaming_set);
1879
1880 let streaming_attribute = Frame::StreamedAttribute(vec![
1882 (
1883 Frame::SimpleString(Bytes::from("trace-id")),
1884 Frame::SimpleString(Bytes::from("abc123")),
1885 ),
1886 (
1887 Frame::SimpleString(Bytes::from("span-id")),
1888 Frame::SimpleString(Bytes::from("def456")),
1889 ),
1890 ]);
1891 let serialized = frame_to_bytes(&streaming_attribute);
1892 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1893 assert_eq!(parsed, streaming_attribute);
1894
1895 let streaming_push = Frame::StreamedPush(vec![
1897 Frame::SimpleString(Bytes::from("pubsub")),
1898 Frame::SimpleString(Bytes::from("channel1")),
1899 Frame::SimpleString(Bytes::from("message data")),
1900 ]);
1901 let serialized = frame_to_bytes(&streaming_push);
1902 let (parsed, _) = parse_streaming_sequence(serialized.clone()).unwrap();
1903 assert_eq!(parsed, streaming_push);
1904
1905 let empty_array = Frame::StreamedArray(vec![]);
1907 let serialized = frame_to_bytes(&empty_array);
1908 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1909 assert_eq!(parsed, empty_array);
1910
1911 let empty_set = Frame::StreamedSet(vec![]);
1912 let serialized = frame_to_bytes(&empty_set);
1913 let (parsed, _) = parse_streaming_sequence(serialized).unwrap();
1914 assert_eq!(parsed, empty_set);
1915 }
1916
1917 #[test]
1918 fn test_streaming_sequences_edge_cases() {
1919 let data = Bytes::from("$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n");
1921 let result = parse_streaming_sequence(data);
1922 assert!(matches!(result, Err(ParseError::Incomplete)));
1923
1924 let data = Bytes::from("$?\r\n;abc\r\nHell\r\n;0\r\n");
1926 let result = parse_streaming_sequence(data);
1927 assert!(matches!(result, Err(ParseError::BadLength)));
1928
1929 let data = Bytes::from("*?\r\n+hello\r\n:42\r\n");
1931 let result = parse_streaming_sequence(data);
1932 assert!(matches!(result, Err(ParseError::Incomplete)));
1933
1934 let data = Bytes::from("*?\r\n+hello\r\n*2\r\n:1\r\n:2\r\n.\r\n");
1936 let result = parse_streaming_sequence(data);
1937 assert!(result.is_ok());
1938 let (frame, _) = result.unwrap();
1939 if let Frame::StreamedArray(items) = frame {
1940 assert_eq!(items.len(), 2);
1941 assert!(matches!(items[0], Frame::SimpleString(_)));
1942 assert!(matches!(items[1], Frame::Array(_)));
1943 }
1944
1945 let data = Bytes::from("*?\r\n.\r\n");
1947 let result = parse_streaming_sequence(data);
1948 assert!(result.is_ok());
1949 let (frame, _) = result.unwrap();
1950 if let Frame::StreamedArray(items) = frame {
1951 assert!(items.is_empty());
1952 }
1953
1954 let data = Bytes::from("%?\r\n+key1\r\n+val1\r\n+orphan\r\n.\r\n");
1956 let result = parse_streaming_sequence(data);
1957 assert!(matches!(result, Err(ParseError::Incomplete)));
1958
1959 let data = Bytes::from("+simple\r\n");
1961 let result = parse_streaming_sequence(data);
1962 assert!(result.is_ok());
1963 let (frame, _) = result.unwrap();
1964 assert!(matches!(frame, Frame::SimpleString(_)));
1965
1966 let data = Bytes::from(";999999999999999999\r\ndata\r\n");
1968 let result = parse_frame(data);
1969 match &result {
1972 Err(ParseError::BadLength) => {} Err(ParseError::Incomplete) => {} Err(e) => panic!("Got unexpected error type: {e:?}"),
1975 Ok(_) => panic!("Large chunk size should fail"),
1976 }
1977
1978 let data = Bytes::from("$?\r\n+invalid\r\n;0\r\n");
1980 let result = parse_streaming_sequence(data);
1981 assert!(matches!(result, Err(ParseError::InvalidFormat)));
1982
1983 let data = Bytes::from("*?\r\n+hello\r\n.corrupted\r\n");
1985 let result = parse_streaming_sequence(data);
1986 assert!(matches!(result, Err(ParseError::Incomplete)));
1987
1988 let data = Bytes::new();
1990 let result = parse_streaming_sequence(data);
1991 assert!(matches!(result, Err(ParseError::Incomplete)));
1992
1993 let data = Bytes::from("*?\r\n+hello\r\n$5\r\nwo");
1995 let result = parse_streaming_sequence(data);
1996 assert!(matches!(result, Err(ParseError::Incomplete)));
1997 }
1998
1999 #[test]
2000 fn test_roundtrip_nested_structures() {
2001 let original = Bytes::from(
2003 "*3\r\n+hello\r\n%2\r\n+key1\r\n:123\r\n+key2\r\n~1\r\n+item\r\n|1\r\n+meta\r\n+data\r\n",
2004 );
2005 let (frame, _) = parse_frame(original.clone()).unwrap();
2006 let serialized = frame_to_bytes(&frame);
2007
2008 let (reparsed, _) = parse_frame(serialized).unwrap();
2009 assert_eq!(frame, reparsed);
2010 }
2011
2012 #[test]
2013 fn test_zero_length_bulk_string_requires_trailing_crlf() {
2014 let input = Bytes::from("$0\r\n\r\nTAIL");
2016 let (frame, rest) = parse_frame(input).unwrap();
2017 assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
2018 assert_eq!(rest, Bytes::from("TAIL"));
2019
2020 let input = Bytes::from("$0\r\n");
2022 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2023
2024 let input = Bytes::from("$0\r\n\r");
2026 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2027
2028 let input = Bytes::from("$0\r\nXY");
2030 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2031 }
2032
2033 #[test]
2034 fn test_zero_length_streamed_chunk_requires_trailing_crlf() {
2035 let input = Bytes::from(";0\r\n\r\nTAIL");
2037 let (frame, rest) = parse_frame(input).unwrap();
2038 assert_eq!(frame, Frame::StreamedStringChunk(Bytes::new()));
2039 assert_eq!(rest, Bytes::from("TAIL"));
2040
2041 let input = Bytes::from(";0\r\n");
2043 assert_eq!(parse_frame(input), Err(ParseError::Incomplete));
2044
2045 let input = Bytes::from(";0\r\nXY");
2047 assert_eq!(parse_frame(input), Err(ParseError::InvalidFormat));
2048 }
2049
2050 #[test]
2051 fn test_integer_overflow_returns_overflow_error() {
2052 let input = Bytes::from(":9223372036854775808\r\n");
2054 assert_eq!(parse_frame(input), Err(ParseError::Overflow));
2055
2056 let input = Bytes::from(":9223372036854775807\r\n");
2058 let (frame, _) = parse_frame(input).unwrap();
2059 assert_eq!(frame, Frame::Integer(i64::MAX));
2060
2061 let input = Bytes::from(":-9223372036854775808\r\n");
2063 let (frame, _) = parse_frame(input).unwrap();
2064 assert_eq!(frame, Frame::Integer(i64::MIN));
2065 }
2066
2067 #[test]
2068 fn test_parser_propagates_errors() {
2069 let mut parser = Parser::new();
2070 parser.feed(Bytes::from("XINVALID\r\n"));
2071 let result = parser.next_frame();
2072 assert!(result.is_err());
2073 assert_eq!(result.unwrap_err(), ParseError::InvalidTag(b'X'));
2074 }
2075
2076 #[test]
2077 fn test_parser_returns_ok_none_for_incomplete() {
2078 let mut parser = Parser::new();
2079 parser.feed(Bytes::from("+HELL"));
2080 assert_eq!(parser.next_frame().unwrap(), None);
2081 }
2082}