1use alloc::string::ToString;
25use alloc::vec::Vec;
26
27use bytes::{BufMut, Bytes, BytesMut};
28
29use crate::ParseError;
30
31const MAX_COLLECTION_SIZE: usize = 10_000_000;
33
34const MAX_BULK_STRING_SIZE: usize = 512 * 1024 * 1024;
36
37#[derive(Debug, Clone, PartialEq)]
39pub enum Frame {
40 SimpleString(Bytes),
42 Error(Bytes),
44 Integer(i64),
46 BulkString(Option<Bytes>),
48 Array(Option<Vec<Frame>>),
50}
51
52impl Frame {
53 pub fn as_bytes(&self) -> Option<&Bytes> {
57 match self {
58 Frame::SimpleString(b) | Frame::Error(b) => Some(b),
59 Frame::BulkString(opt) => opt.as_ref(),
60 _ => None,
61 }
62 }
63
64 pub fn as_str(&self) -> Option<&str> {
67 self.as_bytes().and_then(|b| core::str::from_utf8(b).ok())
68 }
69
70 pub fn as_integer(&self) -> Option<i64> {
72 match self {
73 Frame::Integer(v) => Some(*v),
74 _ => None,
75 }
76 }
77
78 pub fn as_array(&self) -> Option<&[Frame]> {
82 match self {
83 Frame::Array(Some(items)) => Some(items),
84 _ => None,
85 }
86 }
87
88 pub fn into_array(self) -> Result<Vec<Frame>, Frame> {
92 match self {
93 Frame::Array(Some(items)) => Ok(items),
94 other => Err(other),
95 }
96 }
97
98 pub fn into_bulk_string(self) -> Result<Bytes, Frame> {
102 match self {
103 Frame::BulkString(Some(b)) => Ok(b),
104 other => Err(other),
105 }
106 }
107
108 pub fn is_null(&self) -> bool {
110 matches!(self, Frame::BulkString(None) | Frame::Array(None))
111 }
112
113 pub fn is_error(&self) -> bool {
115 matches!(self, Frame::Error(_))
116 }
117}
118
119pub fn parse_frame(input: Bytes) -> Result<(Frame, Bytes), ParseError> {
139 let (frame, consumed) = parse_frame_inner(&input, 0)?;
140 Ok((frame, input.slice(consumed..)))
141}
142
143pub(crate) fn parse_frame_inner(input: &Bytes, pos: usize) -> Result<(Frame, usize), ParseError> {
146 let buf = input.as_ref();
147 if pos >= buf.len() {
148 return Err(ParseError::Incomplete);
149 }
150
151 let tag = buf[pos];
152
153 match tag {
154 b'+' => {
155 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
156 Ok((
157 Frame::SimpleString(input.slice(pos + 1..line_end)),
158 after_crlf,
159 ))
160 }
161 b'-' => {
162 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
163 Ok((Frame::Error(input.slice(pos + 1..line_end)), after_crlf))
164 }
165 b':' => {
166 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
167 let v = parse_i64(&buf[pos + 1..line_end])?;
168 Ok((Frame::Integer(v), after_crlf))
169 }
170 b'$' => {
171 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
172 let len_bytes = &buf[pos + 1..line_end];
173 if len_bytes == b"-1" {
175 return Ok((Frame::BulkString(None), after_crlf));
176 }
177 let len = parse_usize(len_bytes)?;
178 if len > MAX_BULK_STRING_SIZE {
179 return Err(ParseError::BadLength);
180 }
181 if len == 0 {
182 if after_crlf + 1 >= buf.len() {
183 return Err(ParseError::Incomplete);
184 }
185 if buf[after_crlf] == b'\r' && buf[after_crlf + 1] == b'\n' {
186 return Ok((Frame::BulkString(Some(Bytes::new())), after_crlf + 2));
187 } else {
188 return Err(ParseError::InvalidFormat);
189 }
190 }
191 let data_start = after_crlf;
192 let data_end = data_start.checked_add(len).ok_or(ParseError::BadLength)?;
193 if data_end + 1 >= buf.len() {
194 return Err(ParseError::Incomplete);
195 }
196 if buf[data_end] != b'\r' || buf[data_end + 1] != b'\n' {
197 return Err(ParseError::InvalidFormat);
198 }
199 Ok((
200 Frame::BulkString(Some(input.slice(data_start..data_end))),
201 data_end + 2,
202 ))
203 }
204 b'*' => {
205 let (line_end, after_crlf) = find_crlf(buf, pos + 1)?;
206 let len_bytes = &buf[pos + 1..line_end];
207 if len_bytes == b"-1" {
209 return Ok((Frame::Array(None), after_crlf));
210 }
211 let count = parse_count(len_bytes)?;
212 if count == 0 {
213 return Ok((Frame::Array(Some(Vec::new())), after_crlf));
214 }
215 let mut cursor = after_crlf;
216 let mut items = Vec::with_capacity(count);
217 for _ in 0..count {
218 let (item, next) = parse_frame_inner(input, cursor)?;
219 items.push(item);
220 cursor = next;
221 }
222 Ok((Frame::Array(Some(items)), cursor))
223 }
224 _ => Err(ParseError::InvalidTag(tag)),
225 }
226}
227
228#[cfg(feature = "unsafe-internals")]
229#[path = "resp2_unchecked.rs"]
230mod unchecked;
231#[cfg(feature = "unsafe-internals")]
232pub use unchecked::parse_frame_unchecked;
233
234#[cfg(feature = "codec")]
235#[path = "resp2_codec.rs"]
236mod codec_impl;
237#[cfg(feature = "codec")]
238pub use codec_impl::Codec;
239
240pub fn frame_to_bytes(frame: &Frame) -> Bytes {
252 let mut buf = BytesMut::new();
253 serialize_frame(frame, &mut buf);
254 buf.freeze()
255}
256
257fn serialize_frame(frame: &Frame, buf: &mut BytesMut) {
258 match frame {
259 Frame::SimpleString(s) => {
260 buf.put_u8(b'+');
261 buf.extend_from_slice(s);
262 buf.extend_from_slice(b"\r\n");
263 }
264 Frame::Error(s) => {
265 buf.put_u8(b'-');
266 buf.extend_from_slice(s);
267 buf.extend_from_slice(b"\r\n");
268 }
269 Frame::Integer(i) => {
270 buf.put_u8(b':');
271 buf.extend_from_slice(i.to_string().as_bytes());
272 buf.extend_from_slice(b"\r\n");
273 }
274 Frame::BulkString(opt) => {
275 buf.put_u8(b'$');
276 match opt {
277 Some(data) => {
278 buf.extend_from_slice(data.len().to_string().as_bytes());
279 buf.extend_from_slice(b"\r\n");
280 buf.extend_from_slice(data);
281 buf.extend_from_slice(b"\r\n");
282 }
283 None => buf.extend_from_slice(b"-1\r\n"),
284 }
285 }
286 Frame::Array(opt) => {
287 buf.put_u8(b'*');
288 match opt {
289 Some(items) => {
290 buf.extend_from_slice(items.len().to_string().as_bytes());
291 buf.extend_from_slice(b"\r\n");
292 for item in items {
293 serialize_frame(item, buf);
294 }
295 }
296 None => buf.extend_from_slice(b"-1\r\n"),
297 }
298 }
299 }
300}
301
302#[derive(Default, Debug)]
321pub struct Parser {
322 buffer: BytesMut,
323}
324
325impl Parser {
326 pub fn new() -> Self {
328 Self {
329 buffer: BytesMut::new(),
330 }
331 }
332
333 pub fn feed(&mut self, data: Bytes) {
335 self.buffer.extend_from_slice(&data);
336 }
337
338 pub fn next_frame(&mut self) -> Result<Option<Frame>, ParseError> {
343 if self.buffer.is_empty() {
344 return Ok(None);
345 }
346
347 let bytes = self.buffer.split().freeze();
348
349 match parse_frame_inner(&bytes, 0) {
350 Ok((frame, consumed)) => {
351 if consumed < bytes.len() {
352 self.buffer.unsplit(BytesMut::from(&bytes[consumed..]));
353 }
354 Ok(Some(frame))
355 }
356 Err(ParseError::Incomplete) => {
357 self.buffer.unsplit(bytes.into());
358 Ok(None)
359 }
360 Err(e) => {
361 Err(e)
364 }
365 }
366 }
367
368 pub fn buffered_bytes(&self) -> usize {
370 self.buffer.len()
371 }
372
373 pub fn clear(&mut self) {
375 self.buffer.clear();
376 }
377}
378
379#[inline]
382fn find_crlf(buf: &[u8], from: usize) -> Result<(usize, usize), ParseError> {
383 let mut i = from;
384 let len = buf.len();
385 while i + 1 < len {
386 if buf[i] == b'\r' && buf[i + 1] == b'\n' {
387 return Ok((i, i + 2));
388 }
389 i += 1;
390 }
391 Err(ParseError::Incomplete)
392}
393
394#[inline]
396fn parse_usize(buf: &[u8]) -> Result<usize, ParseError> {
397 if buf.is_empty() {
398 return Err(ParseError::BadLength);
399 }
400 let mut v: usize = 0;
401 for &b in buf {
402 if !b.is_ascii_digit() {
403 return Err(ParseError::BadLength);
404 }
405 v = v.checked_mul(10).ok_or(ParseError::BadLength)?;
406 v = v
407 .checked_add((b - b'0') as usize)
408 .ok_or(ParseError::BadLength)?;
409 }
410 Ok(v)
411}
412
413#[inline]
415fn parse_count(buf: &[u8]) -> Result<usize, ParseError> {
416 let count = parse_usize(buf)?;
417 if count > MAX_COLLECTION_SIZE {
418 return Err(ParseError::BadLength);
419 }
420 Ok(count)
421}
422
423#[inline]
425fn parse_i64(buf: &[u8]) -> Result<i64, ParseError> {
426 if buf.is_empty() {
427 return Err(ParseError::InvalidFormat);
428 }
429 let (neg, digits) = if buf[0] == b'-' {
430 (true, &buf[1..])
431 } else {
432 (false, buf)
433 };
434 if digits.is_empty() {
435 return Err(ParseError::InvalidFormat);
436 }
437 let mut v: i64 = 0;
438 for (i, &d) in digits.iter().enumerate() {
439 if !d.is_ascii_digit() {
440 return Err(ParseError::InvalidFormat);
441 }
442 let digit = (d - b'0') as i64;
443 if neg && v == i64::MAX / 10 && digit == 8 && i == digits.len() - 1 {
444 return Ok(i64::MIN);
445 }
446 if v > i64::MAX / 10 || (v == i64::MAX / 10 && digit > i64::MAX % 10) {
447 return Err(ParseError::Overflow);
448 }
449 v = v * 10 + digit;
450 }
451 if neg { Ok(-v) } else { Ok(v) }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457
458 #[test]
459 fn simple_string() {
460 let (frame, rest) = parse_frame(Bytes::from("+OK\r\nrest")).unwrap();
461 assert_eq!(frame, Frame::SimpleString(Bytes::from("OK")));
462 assert_eq!(rest, Bytes::from("rest"));
463 }
464
465 #[test]
466 fn error() {
467 let (frame, _) = parse_frame(Bytes::from("-ERR fail\r\n")).unwrap();
468 assert_eq!(frame, Frame::Error(Bytes::from("ERR fail")));
469 }
470
471 #[test]
472 fn integer() {
473 let (frame, _) = parse_frame(Bytes::from(":42\r\n")).unwrap();
474 assert_eq!(frame, Frame::Integer(42));
475
476 let (frame, _) = parse_frame(Bytes::from(":-123\r\n")).unwrap();
477 assert_eq!(frame, Frame::Integer(-123));
478 }
479
480 #[test]
481 fn bulk_string() {
482 let (frame, rest) = parse_frame(Bytes::from("$5\r\nhello\r\nX")).unwrap();
483 assert_eq!(frame, Frame::BulkString(Some(Bytes::from("hello"))));
484 assert_eq!(rest, Bytes::from("X"));
485 }
486
487 #[test]
488 fn null_bulk_string() {
489 let (frame, _) = parse_frame(Bytes::from("$-1\r\n")).unwrap();
490 assert_eq!(frame, Frame::BulkString(None));
491 }
492
493 #[test]
494 fn empty_bulk_string() {
495 let (frame, rest) = parse_frame(Bytes::from("$0\r\n\r\nX")).unwrap();
496 assert_eq!(frame, Frame::BulkString(Some(Bytes::new())));
497 assert_eq!(rest, Bytes::from("X"));
498 }
499
500 #[test]
501 fn array() {
502 let input = Bytes::from("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n");
503 let (frame, _) = parse_frame(input).unwrap();
504 assert_eq!(
505 frame,
506 Frame::Array(Some(vec![
507 Frame::BulkString(Some(Bytes::from("foo"))),
508 Frame::BulkString(Some(Bytes::from("bar"))),
509 ]))
510 );
511 }
512
513 #[test]
514 fn null_array() {
515 let (frame, _) = parse_frame(Bytes::from("*-1\r\n")).unwrap();
516 assert_eq!(frame, Frame::Array(None));
517 }
518
519 #[test]
520 fn empty_array() {
521 let (frame, _) = parse_frame(Bytes::from("*0\r\n")).unwrap();
522 assert_eq!(frame, Frame::Array(Some(vec![])));
523 }
524
525 #[test]
526 fn nested_array() {
527 let input = Bytes::from("*2\r\n*1\r\n:1\r\n+OK\r\n");
528 let (frame, _) = parse_frame(input).unwrap();
529 assert_eq!(
530 frame,
531 Frame::Array(Some(vec![
532 Frame::Array(Some(vec![Frame::Integer(1)])),
533 Frame::SimpleString(Bytes::from("OK")),
534 ]))
535 );
536 }
537
538 #[test]
539 fn incomplete() {
540 assert_eq!(parse_frame(Bytes::new()), Err(ParseError::Incomplete));
541 assert_eq!(
542 parse_frame(Bytes::from("+OK\r")),
543 Err(ParseError::Incomplete)
544 );
545 assert_eq!(
546 parse_frame(Bytes::from("$5\r\nhel")),
547 Err(ParseError::Incomplete)
548 );
549 }
550
551 #[test]
552 fn invalid_tag() {
553 assert_eq!(
554 parse_frame(Bytes::from("X\r\n")),
555 Err(ParseError::InvalidTag(b'X'))
556 );
557 }
558
559 #[test]
560 fn roundtrip() {
561 let frames = vec![
562 Frame::SimpleString(Bytes::from("OK")),
563 Frame::Error(Bytes::from("ERR bad")),
564 Frame::Integer(42),
565 Frame::BulkString(Some(Bytes::from("hello"))),
566 Frame::BulkString(None),
567 Frame::Array(Some(vec![
568 Frame::Integer(1),
569 Frame::BulkString(Some(Bytes::from("two"))),
570 ])),
571 Frame::Array(None),
572 ];
573 for frame in &frames {
574 let bytes = frame_to_bytes(frame);
575 let (parsed, rest) = parse_frame(bytes).unwrap();
576 assert_eq!(&parsed, frame);
577 assert!(rest.is_empty());
578 }
579 }
580
581 #[test]
582 fn streaming_parser() {
583 let mut parser = Parser::new();
584 parser.feed(Bytes::from("+HEL"));
585 assert!(parser.next_frame().unwrap().is_none());
586
587 parser.feed(Bytes::from("LO\r\n:42\r\n"));
588 let f1 = parser.next_frame().unwrap().unwrap();
589 assert_eq!(f1, Frame::SimpleString(Bytes::from("HELLO")));
590
591 let f2 = parser.next_frame().unwrap().unwrap();
592 assert_eq!(f2, Frame::Integer(42));
593
594 assert!(parser.next_frame().unwrap().is_none());
595 }
596
597 #[test]
598 fn chained_frames() {
599 let input = Bytes::from("+OK\r\n:1\r\n$3\r\nfoo\r\n");
600 let (f1, rest) = parse_frame(input).unwrap();
601 assert_eq!(f1, Frame::SimpleString(Bytes::from("OK")));
602 let (f2, rest) = parse_frame(rest).unwrap();
603 assert_eq!(f2, Frame::Integer(1));
604 let (f3, rest) = parse_frame(rest).unwrap();
605 assert_eq!(f3, Frame::BulkString(Some(Bytes::from("foo"))));
606 assert!(rest.is_empty());
607 }
608
609 #[test]
610 fn binary_bulk_string() {
611 let mut data = Vec::new();
612 data.extend_from_slice(b"$5\r\n");
613 data.extend_from_slice(&[0x00, 0x01, 0xFF, 0xFE, 0x42]);
614 data.extend_from_slice(b"\r\n");
615 let (frame, _) = parse_frame(Bytes::from(data)).unwrap();
616 match frame {
617 Frame::BulkString(Some(b)) => {
618 assert_eq!(b.as_ref(), &[0x00, 0x01, 0xFF, 0xFE, 0x42]);
619 }
620 _ => panic!("expected bulk string"),
621 }
622 }
623
624 #[test]
625 fn rejects_resp3_types() {
626 assert!(parse_frame(Bytes::from("_\r\n")).is_err()); assert!(parse_frame(Bytes::from(",3.14\r\n")).is_err()); assert!(parse_frame(Bytes::from("#t\r\n")).is_err()); assert!(parse_frame(Bytes::from("(123\r\n")).is_err()); }
632
633 #[test]
634 fn integer_overflow() {
635 assert_eq!(
637 parse_frame(Bytes::from(":9223372036854775808\r\n")),
638 Err(ParseError::Overflow)
639 );
640
641 let (frame, _) = parse_frame(Bytes::from(":9223372036854775807\r\n")).unwrap();
643 assert_eq!(frame, Frame::Integer(i64::MAX));
644
645 let (frame, _) = parse_frame(Bytes::from(":-9223372036854775808\r\n")).unwrap();
647 assert_eq!(frame, Frame::Integer(i64::MIN));
648
649 assert!(parse_frame(Bytes::from(":-9223372036854775809\r\n")).is_err());
651 }
652
653 #[test]
654 fn zero_length_bulk_edge_cases() {
655 assert_eq!(
657 parse_frame(Bytes::from("$0\r\n")),
658 Err(ParseError::Incomplete)
659 );
660
661 assert_eq!(
663 parse_frame(Bytes::from("$0\r\n\r")),
664 Err(ParseError::Incomplete)
665 );
666
667 assert_eq!(
669 parse_frame(Bytes::from("$0\r\nXY")),
670 Err(ParseError::InvalidFormat)
671 );
672 }
673
674 #[test]
675 fn nonempty_bulk_malformed_terminator() {
676 assert_eq!(
678 parse_frame(Bytes::from("$3\r\nfoo")),
679 Err(ParseError::Incomplete)
680 );
681
682 assert_eq!(
684 parse_frame(Bytes::from("$3\r\nfooX")),
685 Err(ParseError::Incomplete)
686 );
687
688 assert_eq!(
690 parse_frame(Bytes::from("$3\r\nfooXY")),
691 Err(ParseError::InvalidFormat)
692 );
693 }
694
695 #[test]
696 fn array_size_limit() {
697 assert_eq!(
699 parse_frame(Bytes::from("*10000001\r\n")),
700 Err(ParseError::BadLength)
701 );
702
703 assert_eq!(
705 parse_frame(Bytes::from("*10000000\r\n")),
706 Err(ParseError::Incomplete)
707 );
708 }
709
710 #[test]
711 fn bulk_string_size_limit() {
712 assert_eq!(
714 parse_frame(Bytes::from("$536870913\r\n")),
715 Err(ParseError::BadLength)
716 );
717 }
718
719 #[test]
720 fn streaming_parser_clears_buffer_on_error() {
721 let mut parser = Parser::new();
722 parser.feed(Bytes::from("X\r\n"));
723 assert_eq!(parser.next_frame(), Err(ParseError::InvalidTag(b'X')));
724 assert_eq!(parser.buffered_bytes(), 0);
725 }
726
727 #[test]
728 fn streaming_parser_recovers_after_error() {
729 let mut parser = Parser::new();
730 parser.feed(Bytes::from("X\r\n"));
732 assert!(parser.next_frame().is_err());
733 assert_eq!(parser.buffered_bytes(), 0);
734
735 parser.feed(Bytes::from("+OK\r\n"));
737 let frame = parser.next_frame().unwrap().unwrap();
738 assert_eq!(frame, Frame::SimpleString(Bytes::from("OK")));
739 }
740
741 #[test]
742 fn frame_as_bytes() {
743 assert_eq!(
744 Frame::SimpleString(Bytes::from("OK")).as_bytes(),
745 Some(&Bytes::from("OK"))
746 );
747 assert_eq!(
748 Frame::Error(Bytes::from("ERR")).as_bytes(),
749 Some(&Bytes::from("ERR"))
750 );
751 assert_eq!(
752 Frame::BulkString(Some(Bytes::from("data"))).as_bytes(),
753 Some(&Bytes::from("data"))
754 );
755 assert_eq!(Frame::BulkString(None).as_bytes(), None);
756 assert_eq!(Frame::Integer(42).as_bytes(), None);
757 }
758
759 #[test]
760 fn frame_as_str() {
761 assert_eq!(Frame::SimpleString(Bytes::from("OK")).as_str(), Some("OK"));
762 assert_eq!(
764 Frame::BulkString(Some(Bytes::from_static(&[0xFF]))).as_str(),
765 None
766 );
767 }
768
769 #[test]
770 fn frame_as_integer() {
771 assert_eq!(Frame::Integer(42).as_integer(), Some(42));
772 assert_eq!(Frame::SimpleString(Bytes::from("42")).as_integer(), None);
773 }
774
775 #[test]
776 fn frame_as_array() {
777 let arr = Frame::Array(Some(vec![Frame::Integer(1)]));
778 assert_eq!(arr.as_array(), Some([Frame::Integer(1)].as_slice()));
779 assert_eq!(Frame::Array(None).as_array(), None);
780 assert_eq!(Frame::Integer(1).as_array(), None);
781 }
782
783 #[test]
784 fn frame_into_array() {
785 let arr = Frame::Array(Some(vec![Frame::Integer(1)]));
786 assert_eq!(arr.into_array(), Ok(vec![Frame::Integer(1)]));
787 assert!(Frame::Array(None).into_array().is_err());
788 assert!(Frame::Integer(1).into_array().is_err());
789 }
790
791 #[test]
792 fn frame_into_bulk_string() {
793 let bs = Frame::BulkString(Some(Bytes::from("data")));
794 assert_eq!(bs.into_bulk_string(), Ok(Bytes::from("data")));
795 assert!(Frame::BulkString(None).into_bulk_string().is_err());
796 }
797
798 #[test]
799 fn frame_is_null() {
800 assert!(Frame::BulkString(None).is_null());
801 assert!(Frame::Array(None).is_null());
802 assert!(!Frame::BulkString(Some(Bytes::new())).is_null());
803 assert!(!Frame::Integer(0).is_null());
804 }
805
806 #[test]
807 fn frame_is_error() {
808 assert!(Frame::Error(Bytes::from("ERR")).is_error());
809 assert!(!Frame::SimpleString(Bytes::from("OK")).is_error());
810 }
811}