1use crate::resp3::utils as resp3_utils;
2use crate::types::{Redirection, RedisProtocolError, RedisProtocolErrorKind};
3use crate::utils;
4use alloc::collections::VecDeque;
5use alloc::format;
6use alloc::string::{String, ToString};
7use alloc::vec::Vec;
8use bytes::Bytes;
9use bytes_utils::Str;
10use core::convert::TryFrom;
11use core::hash::{Hash, Hasher};
12use core::mem;
13use core::str;
14
15#[cfg(feature = "std")]
16use std::collections::{HashMap, HashSet};
17
18#[cfg(feature = "hashbrown")]
19use hashbrown::{HashMap, HashSet};
20
21#[cfg(feature = "index-map")]
22use indexmap::{IndexMap, IndexSet};
23
24#[cfg(test)]
25use std::convert::TryInto;
26
27pub const SIMPLE_STRING_BYTE: u8 = b'+';
29pub const SIMPLE_ERROR_BYTE: u8 = b'-';
31pub const NUMBER_BYTE: u8 = b':';
33pub const DOUBLE_BYTE: u8 = b',';
35pub const BLOB_STRING_BYTE: u8 = b'$';
37pub const BLOB_ERROR_BYTE: u8 = b'!';
39pub const BOOLEAN_BYTE: u8 = b'#';
41pub const VERBATIM_STRING_BYTE: u8 = b'=';
43pub const BIG_NUMBER_BYTE: u8 = b'(';
45pub const ARRAY_BYTE: u8 = b'*';
47pub const MAP_BYTE: u8 = b'%';
49pub const SET_BYTE: u8 = b'~';
51pub const ATTRIBUTE_BYTE: u8 = b'|';
53pub const PUSH_BYTE: u8 = b'>';
55pub const NULL_BYTE: u8 = b'_';
57pub const VERBATIM_FORMAT_BYTE: u8 = b':';
59pub const CHUNKED_STRING_BYTE: u8 = b';';
61pub const END_STREAM_BYTE: u8 = b'.';
63
64pub const STREAMED_LENGTH_BYTE: u8 = b'?';
66pub const END_STREAM_STRING_BYTES: &'static str = ";0\r\n";
68pub const END_STREAM_AGGREGATE_BYTES: &'static str = ".\r\n";
70
71pub const NULL: &'static str = "_\r\n";
73
74pub const INFINITY: &'static str = "inf";
76pub const NEG_INFINITY: &'static str = "-inf";
78
79pub const HELLO: &'static str = "HELLO";
81pub const BOOL_TRUE_BYTES: &'static str = "#t\r\n";
83pub const BOOL_FALSE_BYTES: &'static str = "#f\r\n";
85pub const EMPTY_SPACE: &'static str = " ";
87pub const AUTH: &'static str = "AUTH";
89
90pub use crate::utils::{PATTERN_PUBSUB_PREFIX, PUBSUB_PREFIX, PUBSUB_PUSH_PREFIX};
91
92#[cfg(not(feature = "index-map"))]
94pub type FrameMap = HashMap<Frame, Frame>;
95#[cfg(not(feature = "index-map"))]
97pub type FrameSet = HashSet<Frame>;
98#[cfg(feature = "index-map")]
100pub type FrameMap = IndexMap<Frame, Frame>;
101#[cfg(feature = "index-map")]
103pub type FrameSet = IndexSet<Frame>;
104
105pub type Attributes = FrameMap;
107
108#[derive(Clone, Debug, Eq, PartialEq)]
110pub enum RespVersion {
111 RESP2,
112 RESP3,
113}
114
115impl RespVersion {
116 pub fn to_byte(&self) -> u8 {
117 match *self {
118 RespVersion::RESP2 => b'2',
119 RespVersion::RESP3 => b'3',
120 }
121 }
122}
123
124#[derive(Clone, Debug, Eq, PartialEq)]
126pub struct Auth {
127 pub username: Str,
128 pub password: Str,
129}
130
131impl Auth {
132 pub fn from_password<S: Into<Str>>(password: S) -> Auth {
134 Auth {
135 username: Str::from("default"),
136 password: password.into(),
137 }
138 }
139}
140
141#[derive(Clone, Debug, Eq, PartialEq, Hash)]
143pub enum VerbatimStringFormat {
144 Text,
145 Markdown,
146}
147
148impl VerbatimStringFormat {
149 pub(crate) fn to_str(&self) -> &'static str {
150 match *self {
151 VerbatimStringFormat::Text => "txt",
152 VerbatimStringFormat::Markdown => "mkd",
153 }
154 }
155
156 pub(crate) fn encode_len(&self) -> usize {
157 match *self {
158 VerbatimStringFormat::Text => 4,
160 VerbatimStringFormat::Markdown => 4,
161 }
162 }
163}
164
165#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
167pub enum FrameKind {
168 Array,
169 BlobString,
170 SimpleString,
171 SimpleError,
172 Number,
173 Null,
174 Double,
175 Boolean,
176 BlobError,
177 VerbatimString,
178 Map,
179 Set,
180 Attribute,
181 Push,
182 Hello,
183 BigNumber,
184 ChunkedString,
185 EndStream,
186}
187
188impl FrameKind {
189 pub fn is_aggregate_type(&self) -> bool {
191 match *self {
192 FrameKind::Array | FrameKind::Set | FrameKind::Map => true,
193 _ => false,
194 }
195 }
196
197 pub fn is_streaming_type(&self) -> bool {
199 match *self {
200 FrameKind::Array | FrameKind::Set | FrameKind::Map | FrameKind::BlobString => true,
201 _ => false,
202 }
203 }
204
205 pub fn hash_prefix(&self) -> &'static str {
207 use self::FrameKind::*;
208
209 match *self {
210 Array => "a",
211 BlobString => "b",
212 SimpleString => "s",
213 SimpleError => "e",
214 Number => "n",
215 Null => "N",
216 Double => "d",
217 Boolean => "B",
218 BlobError => "E",
219 VerbatimString => "v",
220 Map => "m",
221 Set => "S",
222 Attribute => "A",
223 Push => "p",
224 Hello => "h",
225 BigNumber => "bn",
226 ChunkedString => "cs",
227 EndStream => "es",
228 }
229 }
230
231 pub fn from_byte(d: u8) -> Option<FrameKind> {
233 use self::FrameKind::*;
234
235 match d {
236 SIMPLE_STRING_BYTE => Some(SimpleString),
237 SIMPLE_ERROR_BYTE => Some(SimpleError),
238 NUMBER_BYTE => Some(Number),
239 DOUBLE_BYTE => Some(Double),
240 BLOB_STRING_BYTE => Some(BlobString),
241 BLOB_ERROR_BYTE => Some(BlobError),
242 BOOLEAN_BYTE => Some(Boolean),
243 VERBATIM_STRING_BYTE => Some(VerbatimString),
244 BIG_NUMBER_BYTE => Some(BigNumber),
245 ARRAY_BYTE => Some(Array),
246 MAP_BYTE => Some(Map),
247 SET_BYTE => Some(Set),
248 ATTRIBUTE_BYTE => Some(Attribute),
249 PUSH_BYTE => Some(Push),
250 NULL_BYTE => Some(Null),
251 CHUNKED_STRING_BYTE => Some(ChunkedString),
252 END_STREAM_BYTE => Some(EndStream),
253 _ => None,
254 }
255 }
256
257 pub fn to_byte(&self) -> u8 {
259 use self::FrameKind::*;
260
261 match *self {
262 SimpleString => SIMPLE_STRING_BYTE,
263 SimpleError => SIMPLE_ERROR_BYTE,
264 Number => NUMBER_BYTE,
265 Double => DOUBLE_BYTE,
266 BlobString => BLOB_STRING_BYTE,
267 BlobError => BLOB_ERROR_BYTE,
268 Boolean => BOOLEAN_BYTE,
269 VerbatimString => VERBATIM_STRING_BYTE,
270 BigNumber => BIG_NUMBER_BYTE,
271 Array => ARRAY_BYTE,
272 Map => MAP_BYTE,
273 Set => SET_BYTE,
274 Attribute => ATTRIBUTE_BYTE,
275 Push => PUSH_BYTE,
276 Null => NULL_BYTE,
277 ChunkedString => CHUNKED_STRING_BYTE,
278 EndStream => END_STREAM_BYTE,
279 Hello => panic!("HELLO does not have a byte prefix."),
280 }
281 }
282
283 pub fn is_hello(&self) -> bool {
287 match *self {
288 FrameKind::Hello => true,
289 _ => false,
290 }
291 }
292}
293
294#[derive(Clone, Debug)]
298pub enum Frame {
299 BlobString {
301 data: Bytes,
302 attributes: Option<Attributes>,
303 },
304 BlobError {
306 data: Bytes,
307 attributes: Option<Attributes>,
308 },
309 SimpleString {
313 data: Bytes,
314 attributes: Option<Attributes>,
315 },
316 SimpleError { data: Str, attributes: Option<Attributes> },
318 Boolean { data: bool, attributes: Option<Attributes> },
320 Null,
322 Number { data: i64, attributes: Option<Attributes> },
324 Double { data: f64, attributes: Option<Attributes> },
326 BigNumber {
330 data: Bytes,
331 attributes: Option<Attributes>,
332 },
333 VerbatimString {
335 data: Bytes,
336 format: VerbatimStringFormat,
337 attributes: Option<Attributes>,
338 },
339 Array {
341 data: Vec<Frame>,
342 attributes: Option<Attributes>,
343 },
344 Map {
351 data: FrameMap,
352 attributes: Option<Attributes>,
353 },
354 Set {
356 data: FrameSet,
357 attributes: Option<Attributes>,
358 },
359 Push {
361 data: Vec<Frame>,
362 attributes: Option<Attributes>,
363 },
364 Hello { version: RespVersion, auth: Option<Auth> },
366 ChunkedString(Bytes),
368}
369
370impl Hash for Frame {
371 fn hash<H: Hasher>(&self, state: &mut H) {
372 use self::Frame::*;
373 self.kind().hash_prefix().hash(state);
374
375 match *self {
376 BlobString { ref data, .. } => data.hash(state),
377 SimpleString { ref data, .. } => data.hash(state),
378 SimpleError { ref data, .. } => data.hash(state),
379 Number { ref data, .. } => data.hash(state),
380 Null => NULL.hash(state),
381 Double { ref data, .. } => data.to_string().hash(state),
382 Boolean { ref data, .. } => data.hash(state),
383 BlobError { ref data, .. } => data.hash(state),
384 VerbatimString {
385 ref data, ref format, ..
386 } => {
387 format.hash(state);
388 data.hash(state);
389 }
390 ChunkedString(ref data) => data.hash(state),
391 BigNumber { ref data, .. } => data.hash(state),
392 _ => panic!("Invalid RESP3 data type to use as hash key."),
393 };
394 }
395}
396
397impl PartialEq for Frame {
398 fn eq(&self, other: &Self) -> bool {
399 use self::Frame::*;
400
401 match *self {
402 ChunkedString(ref b) => match *other {
403 ChunkedString(ref _b) => b == _b,
404 _ => false,
405 },
406 Array {
407 ref data,
408 ref attributes,
409 } => {
410 let (_data, _attributes) = (data, attributes);
411 match *other {
412 Array {
413 ref data,
414 ref attributes,
415 } => data == _data && attributes == _attributes,
416 _ => false,
417 }
418 }
419 BlobString {
420 ref data,
421 ref attributes,
422 } => {
423 let (_data, _attributes) = (data, attributes);
424 match *other {
425 BlobString {
426 ref data,
427 ref attributes,
428 } => data == _data && attributes == _attributes,
429 _ => false,
430 }
431 }
432 SimpleString {
433 ref data,
434 ref attributes,
435 } => {
436 let (_data, _attributes) = (data, attributes);
437 match *other {
438 SimpleString {
439 ref data,
440 ref attributes,
441 } => data == _data && attributes == _attributes,
442 _ => false,
443 }
444 }
445 SimpleError {
446 ref data,
447 ref attributes,
448 } => {
449 let (_data, _attributes) = (data, attributes);
450 match *other {
451 SimpleError {
452 ref data,
453 ref attributes,
454 } => data == _data && attributes == _attributes,
455 _ => false,
456 }
457 }
458 Number {
459 ref data,
460 ref attributes,
461 } => {
462 let (_data, _attributes) = (data, attributes);
463 match *other {
464 Number {
465 ref data,
466 ref attributes,
467 } => data == _data && attributes == _attributes,
468 _ => false,
469 }
470 }
471 Null => match *other {
472 Null => true,
473 _ => false,
474 },
475 Boolean {
476 ref data,
477 ref attributes,
478 } => {
479 let (_data, _attributes) = (data, attributes);
480 match *other {
481 Boolean {
482 ref data,
483 ref attributes,
484 } => data == _data && attributes == _attributes,
485 _ => false,
486 }
487 }
488 Double {
489 ref data,
490 ref attributes,
491 } => {
492 let (_data, _attributes) = (data, attributes);
493 match *other {
494 Double {
495 ref data,
496 ref attributes,
497 } => data == _data && attributes == _attributes,
498 _ => false,
499 }
500 }
501 BlobError {
502 ref data,
503 ref attributes,
504 } => {
505 let (_data, _attributes) = (data, attributes);
506 match *other {
507 BlobError {
508 ref data,
509 ref attributes,
510 } => data == _data && attributes == _attributes,
511 _ => false,
512 }
513 }
514 VerbatimString {
515 ref data,
516 ref format,
517 ref attributes,
518 } => {
519 let (_data, _format, _attributes) = (data, format, attributes);
520 match *other {
521 VerbatimString {
522 ref data,
523 ref format,
524 ref attributes,
525 } => _data == data && _format == format && attributes == _attributes,
526 _ => false,
527 }
528 }
529 Map {
530 ref data,
531 ref attributes,
532 } => {
533 let (_data, _attributes) = (data, attributes);
534 match *other {
535 Map {
536 ref data,
537 ref attributes,
538 } => data == _data && attributes == _attributes,
539 _ => false,
540 }
541 }
542 Set {
543 ref data,
544 ref attributes,
545 } => {
546 let (_data, _attributes) = (data, attributes);
547 match *other {
548 Set {
549 ref data,
550 ref attributes,
551 } => data == _data && attributes == _attributes,
552 _ => false,
553 }
554 }
555 Push {
556 ref data,
557 ref attributes,
558 } => {
559 let (_data, _attributes) = (data, attributes);
560 match *other {
561 Push {
562 ref data,
563 ref attributes,
564 } => data == _data && attributes == _attributes,
565 _ => false,
566 }
567 }
568 Hello { ref version, ref auth } => {
569 let (_version, _auth) = (version, auth);
570 match *other {
571 Hello { ref version, ref auth } => _version == version && _auth == auth,
572 _ => false,
573 }
574 }
575 BigNumber {
576 ref data,
577 ref attributes,
578 } => {
579 let (_data, _attributes) = (data, attributes);
580 match *other {
581 BigNumber {
582 ref data,
583 ref attributes,
584 } => data == _data && attributes == _attributes,
585 _ => false,
586 }
587 }
588 }
589 }
590}
591
592impl Eq for Frame {}
593
594#[cfg(test)]
595impl TryFrom<(FrameKind, Vec<u8>)> for Frame {
596 type Error = RedisProtocolError;
597
598 fn try_from((kind, value): (FrameKind, Vec<u8>)) -> Result<Self, Self::Error> {
599 let frame = match kind {
600 FrameKind::BlobString => Frame::BlobString {
601 data: value.into(),
602 attributes: None,
603 },
604 FrameKind::BlobError => Frame::BlobError {
605 data: value.into(),
606 attributes: None,
607 },
608 FrameKind::BigNumber => Frame::BigNumber {
609 data: value.into(),
610 attributes: None,
611 },
612 FrameKind::ChunkedString => Frame::ChunkedString(value.into()),
613 _ => {
614 return Err(RedisProtocolError::new(
615 RedisProtocolErrorKind::Unknown,
616 "Cannot convert to Frame.",
617 ))
618 }
619 };
620
621 Ok(frame)
622 }
623}
624
625#[cfg(test)]
626impl TryFrom<(FrameKind, Vec<Frame>)> for Frame {
627 type Error = RedisProtocolError;
628
629 fn try_from((kind, data): (FrameKind, Vec<Frame>)) -> Result<Self, Self::Error> {
630 let frame = match kind {
631 FrameKind::Array => Frame::Array { data, attributes: None },
632 FrameKind::Push => Frame::Push { data, attributes: None },
633 _ => {
634 return Err(RedisProtocolError::new(
635 RedisProtocolErrorKind::Unknown,
636 "Cannot convert to Frame.",
637 ))
638 }
639 };
640
641 Ok(frame)
642 }
643}
644
645#[cfg(test)]
646impl TryFrom<(FrameKind, VecDeque<Frame>)> for Frame {
647 type Error = RedisProtocolError;
648
649 fn try_from((kind, value): (FrameKind, VecDeque<Frame>)) -> Result<Self, Self::Error> {
650 let data: Vec<Frame> = value.into_iter().collect();
651
652 let frame = match kind {
653 FrameKind::Array => Frame::Array { data, attributes: None },
654 FrameKind::Push => Frame::Push { data, attributes: None },
655 _ => {
656 return Err(RedisProtocolError::new(
657 RedisProtocolErrorKind::Unknown,
658 "Cannot convert to Frame.",
659 ))
660 }
661 };
662
663 Ok(frame)
664 }
665}
666
667impl TryFrom<HashMap<Frame, Frame>> for Frame {
668 type Error = RedisProtocolError;
669
670 fn try_from(value: HashMap<Frame, Frame>) -> Result<Self, Self::Error> {
671 Ok(Frame::Map {
672 data: resp3_utils::hashmap_to_frame_map(value),
673 attributes: None,
674 })
675 }
676}
677
678impl TryFrom<HashSet<Frame>> for Frame {
679 type Error = RedisProtocolError;
680
681 fn try_from(value: HashSet<Frame>) -> Result<Self, Self::Error> {
682 Ok(Frame::Set {
683 data: resp3_utils::hashset_to_frame_set(value),
684 attributes: None,
685 })
686 }
687}
688
689#[cfg(feature = "index-map")]
690impl TryFrom<IndexMap<Frame, Frame>> for Frame {
691 type Error = RedisProtocolError;
692
693 fn try_from(value: IndexMap<Frame, Frame>) -> Result<Self, Self::Error> {
694 Ok(Frame::Map {
695 data: value,
696 attributes: None,
697 })
698 }
699}
700
701#[cfg(feature = "index-map")]
702impl TryFrom<IndexSet<Frame>> for Frame {
703 type Error = RedisProtocolError;
704
705 fn try_from(value: IndexSet<Frame>) -> Result<Self, Self::Error> {
706 Ok(Frame::Set {
707 data: value,
708 attributes: None,
709 })
710 }
711}
712
713impl From<i64> for Frame {
714 fn from(value: i64) -> Self {
715 Frame::Number {
716 data: value,
717 attributes: None,
718 }
719 }
720}
721
722impl From<bool> for Frame {
723 fn from(value: bool) -> Self {
724 Frame::Boolean {
725 data: value,
726 attributes: None,
727 }
728 }
729}
730
731impl TryFrom<f64> for Frame {
732 type Error = RedisProtocolError;
733
734 fn try_from(value: f64) -> Result<Self, Self::Error> {
735 if value.is_nan() {
736 Err(RedisProtocolError::new(
737 RedisProtocolErrorKind::Unknown,
738 "Cannot cast NaN to double.",
739 ))
740 } else {
741 Ok(Frame::Double {
742 data: value,
743 attributes: None,
744 })
745 }
746 }
747}
748
749#[cfg(test)]
750impl TryFrom<(FrameKind, String)> for Frame {
751 type Error = RedisProtocolError;
752
753 fn try_from((kind, value): (FrameKind, String)) -> Result<Self, Self::Error> {
754 let frame = match kind {
755 FrameKind::SimpleError => Frame::SimpleError {
756 data: value.into(),
757 attributes: None,
758 },
759 FrameKind::SimpleString => Frame::SimpleString {
760 data: value.into(),
761 attributes: None,
762 },
763 FrameKind::BlobError => Frame::BlobError {
764 data: value.into(),
765 attributes: None,
766 },
767 FrameKind::BlobString => Frame::BlobString {
768 data: value.into(),
769 attributes: None,
770 },
771 FrameKind::BigNumber => Frame::BigNumber {
772 data: value.into(),
773 attributes: None,
774 },
775 FrameKind::ChunkedString => Frame::ChunkedString(value.into()),
776 _ => {
777 return Err(RedisProtocolError::new(
778 RedisProtocolErrorKind::Unknown,
779 "Cannot convert to Frame.",
780 ))
781 }
782 };
783
784 Ok(frame)
785 }
786}
787
788#[cfg(test)]
789impl<'a> TryFrom<(FrameKind, &'a str)> for Frame {
790 type Error = RedisProtocolError;
791
792 fn try_from((kind, value): (FrameKind, &'a str)) -> Result<Self, Self::Error> {
793 (kind, value.to_owned()).try_into()
794 }
795}
796
797impl Frame {
798 pub fn can_hash(&self) -> bool {
802 match self.kind() {
803 FrameKind::BlobString
804 | FrameKind::BlobError
805 | FrameKind::SimpleString
806 | FrameKind::SimpleError
807 | FrameKind::Number
808 | FrameKind::Double
809 | FrameKind::Boolean
810 | FrameKind::Null
811 | FrameKind::ChunkedString
812 | FrameKind::EndStream
813 | FrameKind::VerbatimString
814 | FrameKind::BigNumber => true,
815 _ => false,
816 }
817 }
818
819 pub fn attributes(&self) -> Option<&Attributes> {
821 let attributes = match *self {
822 Frame::Array { ref attributes, .. } => attributes,
823 Frame::Push { ref attributes, .. } => attributes,
824 Frame::BlobString { ref attributes, .. } => attributes,
825 Frame::BlobError { ref attributes, .. } => attributes,
826 Frame::BigNumber { ref attributes, .. } => attributes,
827 Frame::Boolean { ref attributes, .. } => attributes,
828 Frame::Number { ref attributes, .. } => attributes,
829 Frame::Double { ref attributes, .. } => attributes,
830 Frame::VerbatimString { ref attributes, .. } => attributes,
831 Frame::SimpleError { ref attributes, .. } => attributes,
832 Frame::SimpleString { ref attributes, .. } => attributes,
833 Frame::Set { ref attributes, .. } => attributes,
834 Frame::Map { ref attributes, .. } => attributes,
835 Frame::Null | Frame::ChunkedString(_) | Frame::Hello { .. } => return None,
836 };
837
838 attributes.as_ref()
839 }
840
841 pub fn take_attributes(&mut self) -> Option<Attributes> {
843 let attributes = match *self {
844 Frame::Array { ref mut attributes, .. } => attributes,
845 Frame::Push { ref mut attributes, .. } => attributes,
846 Frame::BlobString { ref mut attributes, .. } => attributes,
847 Frame::BlobError { ref mut attributes, .. } => attributes,
848 Frame::BigNumber { ref mut attributes, .. } => attributes,
849 Frame::Boolean { ref mut attributes, .. } => attributes,
850 Frame::Number { ref mut attributes, .. } => attributes,
851 Frame::Double { ref mut attributes, .. } => attributes,
852 Frame::VerbatimString { ref mut attributes, .. } => attributes,
853 Frame::SimpleError { ref mut attributes, .. } => attributes,
854 Frame::SimpleString { ref mut attributes, .. } => attributes,
855 Frame::Set { ref mut attributes, .. } => attributes,
856 Frame::Map { ref mut attributes, .. } => attributes,
857 Frame::Null | Frame::ChunkedString(_) | Frame::Hello { .. } => return None,
858 };
859
860 attributes.take()
861 }
862
863 pub fn attributes_mut(&mut self) -> Option<&mut Attributes> {
865 let attributes = match *self {
866 Frame::Array { ref mut attributes, .. } => attributes,
867 Frame::Push { ref mut attributes, .. } => attributes,
868 Frame::BlobString { ref mut attributes, .. } => attributes,
869 Frame::BlobError { ref mut attributes, .. } => attributes,
870 Frame::BigNumber { ref mut attributes, .. } => attributes,
871 Frame::Boolean { ref mut attributes, .. } => attributes,
872 Frame::Number { ref mut attributes, .. } => attributes,
873 Frame::Double { ref mut attributes, .. } => attributes,
874 Frame::VerbatimString { ref mut attributes, .. } => attributes,
875 Frame::SimpleError { ref mut attributes, .. } => attributes,
876 Frame::SimpleString { ref mut attributes, .. } => attributes,
877 Frame::Set { ref mut attributes, .. } => attributes,
878 Frame::Map { ref mut attributes, .. } => attributes,
879 Frame::Null | Frame::ChunkedString(_) | Frame::Hello { .. } => return None,
880 };
881
882 attributes.as_mut()
883 }
884
885 pub fn add_attributes(&mut self, attributes: Attributes) -> Result<(), RedisProtocolError> {
887 let _attributes = match *self {
888 Frame::Array { ref mut attributes, .. } => attributes,
889 Frame::Push { ref mut attributes, .. } => attributes,
890 Frame::BlobString { ref mut attributes, .. } => attributes,
891 Frame::BlobError { ref mut attributes, .. } => attributes,
892 Frame::BigNumber { ref mut attributes, .. } => attributes,
893 Frame::Boolean { ref mut attributes, .. } => attributes,
894 Frame::Number { ref mut attributes, .. } => attributes,
895 Frame::Double { ref mut attributes, .. } => attributes,
896 Frame::VerbatimString { ref mut attributes, .. } => attributes,
897 Frame::SimpleError { ref mut attributes, .. } => attributes,
898 Frame::SimpleString { ref mut attributes, .. } => attributes,
899 Frame::Set { ref mut attributes, .. } => attributes,
900 Frame::Map { ref mut attributes, .. } => attributes,
901 Frame::Null | Frame::ChunkedString(_) | Frame::Hello { .. } => {
902 return Err(RedisProtocolError::new(
903 RedisProtocolErrorKind::Unknown,
904 format!("{:?} cannot have attributes.", self.kind()),
905 ))
906 }
907 };
908
909 if let Some(_attributes) = _attributes.as_mut() {
910 _attributes.extend(attributes.into_iter());
911 } else {
912 *_attributes = Some(attributes);
913 }
914
915 Ok(())
916 }
917
918 pub fn new_end_stream() -> Self {
920 Frame::ChunkedString(Bytes::new())
921 }
922
923 pub fn len(&self) -> usize {
931 use self::Frame::*;
932
933 match *self {
934 Array { ref data, .. } | Push { ref data, .. } => data.len(),
935 BlobString { ref data, .. }
936 | BlobError { ref data, .. }
937 | BigNumber { ref data, .. }
938 | ChunkedString(ref data) => data.len(),
939 SimpleString { ref data, .. } => data.len(),
940 SimpleError { ref data, .. } => data.len(),
941 Number { .. } | Double { .. } | Boolean { .. } => 1,
942 Null => 0,
943 VerbatimString { ref data, .. } => data.len(),
944 Map { ref data, .. } => data.len(),
945 Set { ref data, .. } => data.len(),
946 Hello { .. } => 1,
947 }
948 }
949
950 pub fn take(&mut self) -> Frame {
952 mem::replace(self, Frame::Null)
953 }
954
955 pub fn kind(&self) -> FrameKind {
957 use self::Frame::*;
958
959 match *self {
960 Array { .. } => FrameKind::Array,
961 BlobString { .. } => FrameKind::BlobString,
962 SimpleString { .. } => FrameKind::SimpleString,
963 SimpleError { .. } => FrameKind::SimpleError,
964 Number { .. } => FrameKind::Number,
965 Null => FrameKind::Null,
966 Double { .. } => FrameKind::Double,
967 BlobError { .. } => FrameKind::BlobError,
968 VerbatimString { .. } => FrameKind::VerbatimString,
969 Boolean { .. } => FrameKind::Boolean,
970 Map { .. } => FrameKind::Map,
971 Set { .. } => FrameKind::Set,
972 Push { .. } => FrameKind::Push,
973 Hello { .. } => FrameKind::Hello,
974 BigNumber { .. } => FrameKind::BigNumber,
975 ChunkedString(ref inner) => {
976 if inner.is_empty() {
977 FrameKind::EndStream
978 } else {
979 FrameKind::ChunkedString
980 }
981 }
982 }
983 }
984
985 pub fn is_null(&self) -> bool {
987 match *self {
988 Frame::Null => true,
989 _ => false,
990 }
991 }
992
993 pub fn is_array(&self) -> bool {
995 match *self {
996 Frame::Array { .. } => true,
997 _ => false,
998 }
999 }
1000
1001 pub fn is_push(&self) -> bool {
1003 match *self {
1004 Frame::Push { .. } => true,
1005 _ => false,
1006 }
1007 }
1008
1009 pub fn is_boolean(&self) -> bool {
1011 match *self {
1012 Frame::Boolean { .. } => true,
1013 _ => false,
1014 }
1015 }
1016
1017 pub fn is_error(&self) -> bool {
1019 match *self {
1020 Frame::BlobError { .. } | Frame::SimpleError { .. } => true,
1021 _ => false,
1022 }
1023 }
1024
1025 pub fn is_aggregate_type(&self) -> bool {
1027 match *self {
1028 Frame::Map { .. } | Frame::Set { .. } | Frame::Array { .. } => true,
1029 _ => false,
1030 }
1031 }
1032
1033 pub fn is_blob(&self) -> bool {
1035 match *self {
1036 Frame::BlobString { .. } | Frame::BlobError { .. } => true,
1037 _ => false,
1038 }
1039 }
1040
1041 pub fn is_chunked_string(&self) -> bool {
1043 match *self {
1044 Frame::ChunkedString(_) => true,
1045 _ => false,
1046 }
1047 }
1048
1049 pub fn is_end_stream_frame(&self) -> bool {
1051 match *self {
1052 Frame::ChunkedString(ref s) => s.is_empty(),
1053 _ => false,
1054 }
1055 }
1056
1057 pub fn is_verbatim_string(&self) -> bool {
1059 match *self {
1060 Frame::VerbatimString { .. } => true,
1061 _ => false,
1062 }
1063 }
1064
1065 pub fn verbatim_string_format(&self) -> Option<&VerbatimStringFormat> {
1067 match *self {
1068 Frame::VerbatimString { ref format, .. } => Some(format),
1069 _ => None,
1070 }
1071 }
1072
1073 pub fn as_str(&self) -> Option<&str> {
1077 match *self {
1078 Frame::SimpleError { ref data, .. } => Some(data),
1079 Frame::SimpleString { ref data, .. } => str::from_utf8(data).ok(),
1080 Frame::BlobError { ref data, .. } | Frame::BlobString { ref data, .. } | Frame::BigNumber { ref data, .. } => {
1081 str::from_utf8(data).ok()
1082 }
1083 Frame::VerbatimString { ref data, .. } => str::from_utf8(data).ok(),
1084 Frame::ChunkedString(ref data) => str::from_utf8(data).ok(),
1085 _ => None,
1086 }
1087 }
1088
1089 pub fn to_string(&self) -> Option<String> {
1091 match *self {
1092 Frame::SimpleError { ref data, .. } => Some(data.to_string()),
1093 Frame::SimpleString { ref data, .. } => String::from_utf8(data.to_vec()).ok(),
1094 Frame::BlobError { ref data, .. } | Frame::BlobString { ref data, .. } | Frame::BigNumber { ref data, .. } => {
1095 String::from_utf8(data.to_vec()).ok()
1096 }
1097 Frame::VerbatimString { ref data, .. } => String::from_utf8(data.to_vec()).ok(),
1098 Frame::ChunkedString(ref b) => String::from_utf8(b.to_vec()).ok(),
1099 Frame::Double { ref data, .. } => Some(data.to_string()),
1100 Frame::Number { ref data, .. } => Some(data.to_string()),
1101 _ => None,
1102 }
1103 }
1104
1105 pub fn as_bytes(&self) -> Option<&[u8]> {
1107 match *self {
1108 Frame::SimpleError { ref data, .. } => Some(data.as_bytes()),
1109 Frame::SimpleString { ref data, .. } => Some(&data),
1110 Frame::BlobError { ref data, .. } | Frame::BlobString { ref data, .. } | Frame::BigNumber { ref data, .. } => {
1111 Some(data)
1112 }
1113 Frame::VerbatimString { ref data, .. } => Some(data),
1114 Frame::ChunkedString(ref b) => Some(b),
1115 _ => None,
1116 }
1117 }
1118
1119 pub fn as_i64(&self) -> Option<i64> {
1121 match *self {
1122 Frame::Number { ref data, .. } => Some(*data),
1123 Frame::Double { ref data, .. } => Some(*data as i64),
1124 Frame::BlobString { ref data, .. } => str::from_utf8(data).ok().and_then(|s| s.parse::<i64>().ok()),
1125 Frame::SimpleString { ref data, .. } => str::from_utf8(data).ok().and_then(|s| s.parse::<i64>().ok()),
1126 _ => None,
1127 }
1128 }
1129
1130 pub fn as_f64(&self) -> Option<f64> {
1132 match *self {
1133 Frame::Double { ref data, .. } => Some(*data),
1134 Frame::Number { ref data, .. } => Some(*data as f64),
1135 Frame::BlobString { ref data, .. } => str::from_utf8(data).ok().and_then(|s| s.parse::<f64>().ok()),
1136 Frame::SimpleString { ref data, .. } => str::from_utf8(data).ok().and_then(|s| s.parse::<f64>().ok()),
1137 _ => None,
1138 }
1139 }
1140
1141 pub fn is_moved_or_ask_error(&self) -> bool {
1143 match *self {
1144 Frame::SimpleError { ref data, .. } => utils::is_cluster_error(data),
1145 _ => false,
1146 }
1147 }
1148
1149 pub fn to_redirection(&self) -> Option<Redirection> {
1151 match *self {
1152 Frame::SimpleError { ref data, .. } => utils::read_cluster_error(data),
1153 _ => None,
1154 }
1155 }
1156
1157 pub fn is_normal_pubsub(&self) -> bool {
1159 if let Frame::Push { ref data, .. } = *self {
1160 resp3_utils::is_normal_pubsub(data)
1161 } else {
1162 false
1163 }
1164 }
1165
1166 pub fn is_pubsub_message(&self) -> bool {
1168 if let Frame::Push { ref data, .. } = *self {
1169 resp3_utils::is_normal_pubsub(data) || resp3_utils::is_pattern_pubsub(data)
1170 } else {
1171 false
1172 }
1173 }
1174
1175 pub fn is_pattern_pubsub_message(&self) -> bool {
1177 if let Frame::Push { ref data, .. } = *self {
1178 resp3_utils::is_pattern_pubsub(data)
1179 } else {
1180 false
1181 }
1182 }
1183
1184 pub fn parse_as_pubsub(self) -> Result<(Frame, Frame), Self> {
1187 if self.is_pubsub_message() {
1188 if let Frame::Push { mut data, .. } = self {
1189 let message = data.pop().unwrap();
1191 let channel = data.pop().unwrap();
1192
1193 Ok((channel, message))
1194 } else {
1195 warn!("Invalid pubsub frame. Expected a Push frame.");
1196 Err(self)
1197 }
1198 } else {
1199 Err(self)
1200 }
1201 }
1202
1203 pub fn encode_len(&self) -> Result<usize, RedisProtocolError> {
1205 resp3_utils::encode_len(self).map_err(|e| e.into())
1206 }
1207}
1208
1209#[derive(Debug, Eq, PartialEq)]
1245pub struct StreamedFrame {
1246 buffer: VecDeque<Frame>,
1248 pub attributes: Option<Attributes>,
1250 pub kind: FrameKind,
1252}
1253
1254impl StreamedFrame {
1255 pub fn new(kind: FrameKind) -> Self {
1257 let buffer = VecDeque::new();
1258 StreamedFrame {
1259 buffer,
1260 kind,
1261 attributes: None,
1262 }
1263 }
1264
1265 pub fn into_frame(&mut self) -> Result<Frame, RedisProtocolError> {
1267 if !self.kind.is_streaming_type() {
1268 return Err(RedisProtocolError::new(
1270 RedisProtocolErrorKind::DecodeError,
1271 "Only blob strings, sets, maps, and arrays can be streamed.",
1272 ));
1273 }
1274
1275 if self.is_finished() {
1276 let _ = self.buffer.pop_back();
1278 }
1279 let buffer = mem::replace(&mut self.buffer, VecDeque::new());
1280 let attributes = self.attributes.take();
1281
1282 let frame = match self.kind {
1283 FrameKind::BlobString => resp3_utils::reconstruct_blobstring(buffer, attributes)?,
1284 FrameKind::Map => resp3_utils::reconstruct_map(buffer, attributes)?,
1285 FrameKind::Set => resp3_utils::reconstruct_set(buffer, attributes)?,
1286 FrameKind::Array => resp3_utils::reconstruct_array(buffer, attributes)?,
1287 _ => {
1288 return Err(RedisProtocolError::new(
1289 RedisProtocolErrorKind::DecodeError,
1290 "Streaming frames only supported for blob strings, maps, sets, and arrays.",
1291 ))
1292 }
1293 };
1294
1295 Ok(frame)
1296 }
1297
1298 pub fn add_frame(&mut self, data: Frame) {
1300 self.buffer.push_back(data);
1301 }
1302
1303 pub fn is_finished(&self) -> bool {
1305 self.buffer.back().map(|f| f.is_end_stream_frame()).unwrap_or(false)
1306 }
1307}
1308
1309#[derive(Debug, Eq, PartialEq)]
1311pub enum DecodedFrame {
1312 Streaming(StreamedFrame),
1313 Complete(Frame),
1314}
1315
1316impl DecodedFrame {
1317 pub fn add_attributes(&mut self, attributes: Attributes) -> Result<(), RedisProtocolError> {
1319 let _ = match *self {
1320 DecodedFrame::Streaming(ref mut inner) => inner.attributes = Some(attributes),
1321 DecodedFrame::Complete(ref mut inner) => inner.add_attributes(attributes)?,
1322 };
1323
1324 Ok(())
1325 }
1326
1327 pub fn into_complete_frame(self) -> Result<Frame, RedisProtocolError> {
1329 match self {
1330 DecodedFrame::Complete(frame) => Ok(frame),
1331 DecodedFrame::Streaming(_) => Err(RedisProtocolError::new(
1332 RedisProtocolErrorKind::DecodeError,
1333 "Expected complete frame.",
1334 )),
1335 }
1336 }
1337
1338 pub fn into_streaming_frame(self) -> Result<StreamedFrame, RedisProtocolError> {
1340 match self {
1341 DecodedFrame::Streaming(frame) => Ok(frame),
1342 DecodedFrame::Complete(_) => Err(RedisProtocolError::new(
1343 RedisProtocolErrorKind::DecodeError,
1344 "Expected streamed frame.",
1345 )),
1346 }
1347 }
1348
1349 pub fn is_streaming(&self) -> bool {
1351 match *self {
1352 DecodedFrame::Streaming(_) => true,
1353 _ => false,
1354 }
1355 }
1356
1357 pub fn is_complete(&self) -> bool {
1359 !self.is_streaming()
1360 }
1361}
1362
1363#[cfg(test)]
1364mod tests {
1365 use super::*;
1366 use crate::resp3::utils::new_map;
1367
1368 #[test]
1369 fn should_convert_basic_streaming_buffer_to_frame() {
1370 let mut streaming_buf = StreamedFrame::new(FrameKind::BlobString);
1371 streaming_buf.add_frame((FrameKind::ChunkedString, "foo").try_into().unwrap());
1372 streaming_buf.add_frame((FrameKind::ChunkedString, "bar").try_into().unwrap());
1373 streaming_buf.add_frame((FrameKind::ChunkedString, "baz").try_into().unwrap());
1374 streaming_buf.add_frame(Frame::new_end_stream());
1375 let frame = streaming_buf
1376 .into_frame()
1377 .expect("Failed to build frame from chunked stream");
1378
1379 assert_eq!(frame.as_str(), Some("foobarbaz"));
1380 }
1381
1382 #[test]
1383 fn should_convert_basic_streaming_buffer_to_frame_with_attributes() {
1384 let mut attributes = new_map(None);
1385 attributes.insert((FrameKind::SimpleString, "a").try_into().unwrap(), 1.into());
1386 attributes.insert((FrameKind::SimpleString, "b").try_into().unwrap(), 2.into());
1387 attributes.insert((FrameKind::SimpleString, "c").try_into().unwrap(), 3.into());
1388
1389 let mut streaming_buf = StreamedFrame::new(FrameKind::BlobString);
1390 streaming_buf.attributes = Some(attributes.clone());
1391
1392 streaming_buf.add_frame((FrameKind::ChunkedString, "foo").try_into().unwrap());
1393 streaming_buf.add_frame((FrameKind::ChunkedString, "bar").try_into().unwrap());
1394 streaming_buf.add_frame((FrameKind::ChunkedString, "baz").try_into().unwrap());
1395 streaming_buf.add_frame(Frame::new_end_stream());
1396
1397 let frame = streaming_buf
1398 .into_frame()
1399 .expect("Failed to build frame from chunked stream");
1400
1401 assert_eq!(frame.as_str(), Some("foobarbaz"));
1402 assert_eq!(frame.attributes(), Some(&attributes));
1403 }
1404}