1use crate::{
2 error::{RedisProtocolError, RedisProtocolErrorKind},
3 resp3::utils as resp3_utils,
4 types::{_Range, PATTERN_PUBSUB_PREFIX, PUBSUB_PREFIX, PUBSUB_PUSH_PREFIX, SHARD_PUBSUB_PREFIX},
5 utils,
6};
7use alloc::{
8 collections::VecDeque,
9 format,
10 string::{String, ToString},
11 vec::Vec,
12};
13use core::{
14 fmt::Debug,
15 hash::{Hash, Hasher},
16 mem,
17 str,
18};
19
20#[cfg(feature = "convert")]
21use crate::convert::FromResp3;
22#[cfg(feature = "bytes")]
23use bytes::{Bytes, BytesMut};
24#[cfg(feature = "bytes")]
25use bytes_utils::Str;
26
27#[cfg(feature = "hashbrown")]
28use hashbrown::{HashMap, HashSet};
29#[cfg(feature = "index-map")]
30use indexmap::{IndexMap, IndexSet};
31#[cfg(feature = "std")]
32use std::collections::{HashMap, HashSet};
33
34pub const SIMPLE_STRING_BYTE: u8 = b'+';
36pub const SIMPLE_ERROR_BYTE: u8 = b'-';
38pub const NUMBER_BYTE: u8 = b':';
40pub const DOUBLE_BYTE: u8 = b',';
42pub const BLOB_STRING_BYTE: u8 = b'$';
44pub const BLOB_ERROR_BYTE: u8 = b'!';
46pub const BOOLEAN_BYTE: u8 = b'#';
48pub const VERBATIM_STRING_BYTE: u8 = b'=';
50pub const BIG_NUMBER_BYTE: u8 = b'(';
52pub const ARRAY_BYTE: u8 = b'*';
54pub const MAP_BYTE: u8 = b'%';
56pub const SET_BYTE: u8 = b'~';
58pub const ATTRIBUTE_BYTE: u8 = b'|';
60pub const PUSH_BYTE: u8 = b'>';
62pub const NULL_BYTE: u8 = b'_';
64pub const VERBATIM_FORMAT_BYTE: u8 = b':';
66pub const CHUNKED_STRING_BYTE: u8 = b';';
68pub const END_STREAM_BYTE: u8 = b'.';
70
71pub const STREAMED_LENGTH_BYTE: u8 = b'?';
73pub const END_STREAM_STRING_BYTES: &str = ";0\r\n";
75pub const END_STREAM_AGGREGATE_BYTES: &str = ".\r\n";
77
78pub const NULL: &str = "_\r\n";
80
81pub const INFINITY: &str = "inf";
83pub const NEG_INFINITY: &str = "-inf";
85pub const NAN: &str = "nan";
87
88pub const HELLO: &str = "HELLO";
90pub const BOOL_TRUE_BYTES: &str = "#t\r\n";
92pub const BOOL_FALSE_BYTES: &str = "#f\r\n";
94pub const EMPTY_SPACE: &str = " ";
96pub const AUTH: &str = "AUTH";
98pub const SETNAME: &str = "SETNAME";
100
101#[cfg(not(feature = "index-map"))]
104pub type FrameMap<K, V> = HashMap<K, V>;
105#[cfg(not(feature = "index-map"))]
108pub type FrameSet<T> = HashSet<T>;
109#[cfg(feature = "index-map")]
112pub type FrameMap<K, V> = IndexMap<K, V>;
113#[cfg(feature = "index-map")]
116pub type FrameSet<T> = IndexSet<T>;
117
118#[cfg(feature = "bytes")]
120#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
121pub type BytesAttributes = FrameMap<BytesFrame, BytesFrame>;
122pub type OwnedAttributes = FrameMap<OwnedFrame, OwnedFrame>;
124pub type RangeAttributes = FrameMap<RangeFrame, RangeFrame>;
126
127#[derive(Clone, Debug, Eq, PartialEq)]
129pub enum RespVersion {
130 RESP2,
131 RESP3,
132}
133
134impl RespVersion {
135 pub fn to_byte(&self) -> u8 {
136 match *self {
137 RespVersion::RESP2 => b'2',
138 RespVersion::RESP3 => b'3',
139 }
140 }
141}
142
143#[derive(Clone, Debug, Eq, PartialEq, Hash)]
145pub enum VerbatimStringFormat {
146 Text,
147 Markdown,
148}
149
150impl VerbatimStringFormat {
151 pub(crate) fn to_str(&self) -> &'static str {
152 match *self {
153 VerbatimStringFormat::Text => "txt",
154 VerbatimStringFormat::Markdown => "mkd",
155 }
156 }
157
158 pub(crate) fn encode_len(&self) -> usize {
159 match *self {
160 VerbatimStringFormat::Text => 4,
162 VerbatimStringFormat::Markdown => 4,
163 }
164 }
165}
166
167#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
169pub enum FrameKind {
170 Array,
171 BlobString,
172 SimpleString,
173 SimpleError,
174 Number,
175 Null,
176 Double,
177 Boolean,
178 BlobError,
179 VerbatimString,
180 Map,
181 Set,
182 Attribute,
183 Push,
184 Hello,
185 BigNumber,
186 ChunkedString,
187 EndStream,
188}
189
190impl FrameKind {
191 pub fn is_aggregate_type(&self) -> bool {
193 matches!(self, FrameKind::Array | FrameKind::Set | FrameKind::Map)
194 }
195
196 pub fn is_streaming_type(&self) -> bool {
198 matches!(
199 self,
200 FrameKind::Array | FrameKind::Set | FrameKind::Map | FrameKind::BlobString
201 )
202 }
203
204 pub fn can_hash(&self) -> bool {
209 matches!(
210 self,
211 FrameKind::BlobString
212 | FrameKind::BlobError
213 | FrameKind::SimpleString
214 | FrameKind::SimpleError
215 | FrameKind::Number
216 | FrameKind::Double
217 | FrameKind::Boolean
218 | FrameKind::Null
219 | FrameKind::ChunkedString
220 | FrameKind::EndStream
221 | FrameKind::VerbatimString
222 | FrameKind::BigNumber
223 )
224 }
225
226 pub fn hash_prefix(&self) -> &'static str {
228 use self::FrameKind::*;
229
230 match *self {
231 Array => "a",
232 BlobString => "b",
233 SimpleString => "s",
234 SimpleError => "e",
235 Number => "n",
236 Null => "N",
237 Double => "d",
238 Boolean => "B",
239 BlobError => "E",
240 VerbatimString => "v",
241 Map => "m",
242 Set => "S",
243 Attribute => "A",
244 Push => "p",
245 Hello => "h",
246 BigNumber => "bn",
247 ChunkedString => "cs",
248 EndStream => "es",
249 }
250 }
251
252 pub fn from_byte(d: u8) -> Option<FrameKind> {
254 use self::FrameKind::*;
255
256 match d {
257 SIMPLE_STRING_BYTE => Some(SimpleString),
258 SIMPLE_ERROR_BYTE => Some(SimpleError),
259 NUMBER_BYTE => Some(Number),
260 DOUBLE_BYTE => Some(Double),
261 BLOB_STRING_BYTE => Some(BlobString),
262 BLOB_ERROR_BYTE => Some(BlobError),
263 BOOLEAN_BYTE => Some(Boolean),
264 VERBATIM_STRING_BYTE => Some(VerbatimString),
265 BIG_NUMBER_BYTE => Some(BigNumber),
266 ARRAY_BYTE => Some(Array),
267 MAP_BYTE => Some(Map),
268 SET_BYTE => Some(Set),
269 ATTRIBUTE_BYTE => Some(Attribute),
270 PUSH_BYTE => Some(Push),
271 NULL_BYTE => Some(Null),
272 CHUNKED_STRING_BYTE => Some(ChunkedString),
273 END_STREAM_BYTE => Some(EndStream),
274 _ => None,
275 }
276 }
277
278 pub fn to_byte(&self) -> u8 {
280 use self::FrameKind::*;
281
282 match *self {
283 SimpleString => SIMPLE_STRING_BYTE,
284 SimpleError => SIMPLE_ERROR_BYTE,
285 Number => NUMBER_BYTE,
286 Double => DOUBLE_BYTE,
287 BlobString => BLOB_STRING_BYTE,
288 BlobError => BLOB_ERROR_BYTE,
289 Boolean => BOOLEAN_BYTE,
290 VerbatimString => VERBATIM_STRING_BYTE,
291 BigNumber => BIG_NUMBER_BYTE,
292 Array => ARRAY_BYTE,
293 Map => MAP_BYTE,
294 Set => SET_BYTE,
295 Attribute => ATTRIBUTE_BYTE,
296 Push => PUSH_BYTE,
297 Null => NULL_BYTE,
298 ChunkedString => CHUNKED_STRING_BYTE,
299 EndStream => END_STREAM_BYTE,
300 Hello => panic!("HELLO does not have a byte prefix."),
301 }
302 }
303}
304
305#[derive(Debug, PartialEq, Eq)]
307pub enum BorrowedAttrs<'a> {
308 Owned(&'a OwnedAttributes),
309 #[cfg(feature = "bytes")]
310 #[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
311 Bytes(&'a BytesAttributes),
312}
313
314impl<'a> Clone for BorrowedAttrs<'a> {
315 fn clone(self: &BorrowedAttrs<'a>) -> BorrowedAttrs<'a> {
316 match self {
317 BorrowedAttrs::Owned(owned) => BorrowedAttrs::Owned(owned),
318 #[cfg(feature = "bytes")]
319 BorrowedAttrs::Bytes(bytes) => BorrowedAttrs::Bytes(bytes),
320 }
321 }
322}
323
324impl<'a> From<&'a BorrowedAttrs<'a>> for BorrowedAttrs<'a> {
325 fn from(value: &'a BorrowedAttrs) -> Self {
326 value.clone()
327 }
328}
329
330impl<'a> From<&'a OwnedAttributes> for BorrowedAttrs<'a> {
331 fn from(attr: &'a OwnedAttributes) -> BorrowedAttrs<'a> {
332 BorrowedAttrs::Owned(attr)
333 }
334}
335
336#[cfg(feature = "bytes")]
337#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
338impl<'a> From<&'a BytesAttributes> for BorrowedAttrs<'a> {
339 fn from(attr: &'a BytesAttributes) -> BorrowedAttrs<'a> {
340 BorrowedAttrs::Bytes(attr)
341 }
342}
343
344#[derive(Debug, PartialEq)]
346pub enum BorrowedFrame<'a> {
347 BlobString {
349 data: &'a [u8],
350 attributes: Option<BorrowedAttrs<'a>>,
351 },
352 BlobError {
354 data: &'a [u8],
355 attributes: Option<BorrowedAttrs<'a>>,
356 },
357 SimpleString {
359 data: &'a [u8],
360 attributes: Option<BorrowedAttrs<'a>>,
361 },
362 SimpleError {
364 data: &'a str,
365 attributes: Option<BorrowedAttrs<'a>>,
366 },
367 Boolean {
369 data: bool,
370 attributes: Option<BorrowedAttrs<'a>>,
371 },
372 Null,
374 Number {
376 data: i64,
377 attributes: Option<BorrowedAttrs<'a>>,
378 },
379 Double {
381 data: f64,
382 attributes: Option<BorrowedAttrs<'a>>,
383 },
384 BigNumber {
386 data: &'a [u8],
387 attributes: Option<BorrowedAttrs<'a>>,
388 },
389 VerbatimString {
391 data: &'a [u8],
392 format: VerbatimStringFormat,
393 attributes: Option<BorrowedAttrs<'a>>,
394 },
395 Array {
397 data: &'a [BorrowedFrame<'a>],
398 attributes: Option<BorrowedAttrs<'a>>,
399 },
400 Map {
402 data: &'a FrameMap<BorrowedFrame<'a>, BorrowedFrame<'a>>,
403 attributes: Option<BorrowedAttrs<'a>>,
404 },
405 Set {
407 data: &'a FrameSet<BorrowedFrame<'a>>,
408 attributes: Option<BorrowedAttrs<'a>>,
409 },
410 Push {
412 data: &'a [BorrowedFrame<'a>],
413 attributes: Option<BorrowedAttrs<'a>>,
414 },
415 Hello {
418 version: RespVersion,
419 auth: Option<(&'a str, &'a str)>,
420 setname: Option<&'a str>,
421 },
422 ChunkedString(&'a [u8]),
424}
425
426impl<'a> Clone for BorrowedFrame<'a> {
427 fn clone(self: &BorrowedFrame<'a>) -> BorrowedFrame<'a> {
428 use self::BorrowedFrame::*;
429
430 match self {
431 Array { data, attributes } => Array {
432 data,
433 attributes: attributes.clone(),
434 },
435 Push { data, attributes } => Push {
436 data,
437 attributes: attributes.clone(),
438 },
439 Set { data, attributes } => Set {
440 data,
441 attributes: attributes.clone(),
442 },
443 Map { data, attributes } => Map {
444 data,
445 attributes: attributes.clone(),
446 },
447 BlobString { data, attributes } => BlobString {
448 data,
449 attributes: attributes.clone(),
450 },
451 SimpleString { data, attributes } => SimpleString {
452 data,
453 attributes: attributes.clone(),
454 },
455 SimpleError { data, attributes } => SimpleError {
456 data,
457 attributes: attributes.clone(),
458 },
459 Number { data, attributes } => Number {
460 data: *data,
461 attributes: attributes.clone(),
462 },
463 Null => Null,
464 Double { data, attributes } => Double {
465 data: *data,
466 attributes: attributes.clone(),
467 },
468 Boolean { data, attributes } => Boolean {
469 data: *data,
470 attributes: attributes.clone(),
471 },
472 BlobError { data, attributes } => BlobError {
473 data,
474 attributes: attributes.clone(),
475 },
476 VerbatimString {
477 data,
478 format,
479 attributes,
480 } => VerbatimString {
481 data,
482 format: format.clone(),
483 attributes: attributes.clone(),
484 },
485 ChunkedString(data) => ChunkedString(data),
486 BigNumber { data, attributes } => BigNumber {
487 data,
488 attributes: attributes.clone(),
489 },
490 Hello { auth, setname, version } => Hello {
491 version: version.clone(),
492 setname: *setname,
493 auth: *auth,
494 },
495 }
496 }
497}
498
499impl Eq for BorrowedFrame<'_> {}
500
501impl<'a> Hash for BorrowedFrame<'a> {
502 fn hash<H: Hasher>(&self, state: &mut H) {
503 use self::BorrowedFrame::*;
504 self.kind().hash_prefix().hash(state);
505
506 match self {
507 BlobString { data, .. } => data.hash(state),
508 SimpleString { data, .. } => data.hash(state),
509 SimpleError { data, .. } => data.hash(state),
510 Number { data, .. } => data.hash(state),
511 Null => NULL.hash(state),
512 Double { data, .. } => data.to_string().hash(state),
513 Boolean { data, .. } => data.hash(state),
514 BlobError { data, .. } => data.hash(state),
515 VerbatimString { data, format, .. } => {
516 format.hash(state);
517 data.hash(state);
518 },
519 ChunkedString(data) => data.hash(state),
520 BigNumber { data, .. } => data.hash(state),
521 _ => panic!("Invalid RESP3 data type to use as hash key."),
522 };
523 }
524}
525
526impl<'a> BorrowedFrame<'a> {
527 pub fn kind(&self) -> FrameKind {
529 match self {
530 BorrowedFrame::Array { .. } => FrameKind::Array,
531 BorrowedFrame::BlobString { .. } => FrameKind::BlobString,
532 BorrowedFrame::SimpleString { .. } => FrameKind::SimpleString,
533 BorrowedFrame::SimpleError { .. } => FrameKind::SimpleError,
534 BorrowedFrame::Number { .. } => FrameKind::Number,
535 BorrowedFrame::Null => FrameKind::Null,
536 BorrowedFrame::Double { .. } => FrameKind::Double,
537 BorrowedFrame::BlobError { .. } => FrameKind::BlobError,
538 BorrowedFrame::VerbatimString { .. } => FrameKind::VerbatimString,
539 BorrowedFrame::Boolean { .. } => FrameKind::Boolean,
540 BorrowedFrame::Map { .. } => FrameKind::Map,
541 BorrowedFrame::Set { .. } => FrameKind::Set,
542 BorrowedFrame::Push { .. } => FrameKind::Push,
543 BorrowedFrame::Hello { .. } => FrameKind::Hello,
544 BorrowedFrame::BigNumber { .. } => FrameKind::BigNumber,
545 BorrowedFrame::ChunkedString(inner) => {
546 if inner.is_empty() {
547 FrameKind::EndStream
548 } else {
549 FrameKind::ChunkedString
550 }
551 },
552 }
553 }
554
555 pub fn encode_len(&self, int_as_blobstring: bool) -> usize {
557 resp3_utils::borrowed_encode_len(self, int_as_blobstring)
558 }
559}
560
561#[derive(Clone, Debug, PartialEq)]
564pub enum RangeFrame {
565 BlobString {
567 data: _Range,
568 attributes: Option<RangeAttributes>,
569 },
570 BlobError {
572 data: _Range,
573 attributes: Option<RangeAttributes>,
574 },
575 SimpleString {
577 data: _Range,
578 attributes: Option<RangeAttributes>,
579 },
580 SimpleError {
582 data: _Range,
583 attributes: Option<RangeAttributes>,
584 },
585 Boolean {
587 data: bool,
588 attributes: Option<RangeAttributes>,
589 },
590 Null,
592 Number {
594 data: i64,
595 attributes: Option<RangeAttributes>,
596 },
597 Double {
599 data: f64,
600 attributes: Option<RangeAttributes>,
601 },
602 BigNumber {
604 data: _Range,
605 attributes: Option<RangeAttributes>,
606 },
607 VerbatimString {
609 data: _Range,
610 format: VerbatimStringFormat,
611 attributes: Option<RangeAttributes>,
612 },
613 Array {
615 data: Vec<RangeFrame>,
616 attributes: Option<RangeAttributes>,
617 },
618 Map {
620 data: FrameMap<RangeFrame, RangeFrame>,
621 attributes: Option<RangeAttributes>,
622 },
623 Set {
625 data: FrameSet<RangeFrame>,
626 attributes: Option<RangeAttributes>,
627 },
628 Push {
630 data: Vec<RangeFrame>,
631 attributes: Option<RangeAttributes>,
632 },
633 Hello {
636 version: RespVersion,
637 username: Option<_Range>,
638 password: Option<_Range>,
639 setname: Option<_Range>,
640 },
641 ChunkedString(_Range),
643}
644
645impl Eq for RangeFrame {}
646
647impl RangeFrame {
648 pub fn kind(&self) -> FrameKind {
650 match self {
651 RangeFrame::Array { .. } => FrameKind::Array,
652 RangeFrame::BlobString { .. } => FrameKind::BlobString,
653 RangeFrame::SimpleString { .. } => FrameKind::SimpleString,
654 RangeFrame::SimpleError { .. } => FrameKind::SimpleError,
655 RangeFrame::Number { .. } => FrameKind::Number,
656 RangeFrame::Null => FrameKind::Null,
657 RangeFrame::Double { .. } => FrameKind::Double,
658 RangeFrame::BlobError { .. } => FrameKind::BlobError,
659 RangeFrame::VerbatimString { .. } => FrameKind::VerbatimString,
660 RangeFrame::Boolean { .. } => FrameKind::Boolean,
661 RangeFrame::Map { .. } => FrameKind::Map,
662 RangeFrame::Set { .. } => FrameKind::Set,
663 RangeFrame::Push { .. } => FrameKind::Push,
664 RangeFrame::Hello { .. } => FrameKind::Hello,
665 RangeFrame::BigNumber { .. } => FrameKind::BigNumber,
666 RangeFrame::ChunkedString(inner) => {
667 if inner.1 == 0 && inner.0 == 0 {
668 FrameKind::EndStream
669 } else {
670 FrameKind::ChunkedString
671 }
672 },
673 }
674 }
675
676 pub fn len(&self) -> usize {
683 match self {
684 RangeFrame::Array { data, .. } | RangeFrame::Push { data, .. } => data.len(),
685 RangeFrame::BlobString { data, .. }
686 | RangeFrame::BlobError { data, .. }
687 | RangeFrame::BigNumber { data, .. }
688 | RangeFrame::ChunkedString(data) => data.1 - data.0,
689 RangeFrame::SimpleString { data, .. } => data.1 - data.0,
690 RangeFrame::SimpleError { data, .. } => data.1 - data.0,
691 RangeFrame::Number { .. } | RangeFrame::Double { .. } | RangeFrame::Boolean { .. } => 1,
692 RangeFrame::Null => 0,
693 RangeFrame::VerbatimString { data, .. } => data.1 - data.0,
694 RangeFrame::Map { data, .. } => data.len(),
695 RangeFrame::Set { data, .. } => data.len(),
696 RangeFrame::Hello { .. } => 1,
697 }
698 }
699
700 pub fn add_attributes(&mut self, attributes: RangeAttributes) -> Result<(), RedisProtocolError> {
702 let _attributes = match self {
703 RangeFrame::Array { attributes, .. } => attributes,
704 RangeFrame::Push { attributes, .. } => attributes,
705 RangeFrame::BlobString { attributes, .. } => attributes,
706 RangeFrame::BlobError { attributes, .. } => attributes,
707 RangeFrame::BigNumber { attributes, .. } => attributes,
708 RangeFrame::Boolean { attributes, .. } => attributes,
709 RangeFrame::Number { attributes, .. } => attributes,
710 RangeFrame::Double { attributes, .. } => attributes,
711 RangeFrame::VerbatimString { attributes, .. } => attributes,
712 RangeFrame::SimpleError { attributes, .. } => attributes,
713 RangeFrame::SimpleString { attributes, .. } => attributes,
714 RangeFrame::Set { attributes, .. } => attributes,
715 RangeFrame::Map { attributes, .. } => attributes,
716 RangeFrame::Null | RangeFrame::ChunkedString(_) | RangeFrame::Hello { .. } => {
717 return Err(RedisProtocolError::new(
718 RedisProtocolErrorKind::Unknown,
719 format!("{:?} cannot have attributes.", self.kind()),
720 ))
721 },
722 };
723
724 if let Some(_attributes) = _attributes.as_mut() {
725 _attributes.extend(attributes);
726 } else {
727 *_attributes = Some(attributes);
728 }
729
730 Ok(())
731 }
732
733 pub fn new_end_stream() -> Self {
734 RangeFrame::ChunkedString((0, 0))
735 }
736}
737
738impl Hash for RangeFrame {
739 fn hash<H: Hasher>(&self, state: &mut H) {
740 use self::RangeFrame::*;
741 self.kind().hash_prefix().hash(state);
742
743 match self {
744 BlobString { data, .. } => resp3_utils::hash_tuple(state, data),
745 SimpleString { data, .. } => resp3_utils::hash_tuple(state, data),
746 SimpleError { data, .. } => resp3_utils::hash_tuple(state, data),
747 Number { data, .. } => data.hash(state),
748 Null => NULL.hash(state),
749 Double { data, .. } => data.to_string().hash(state),
750 Boolean { data, .. } => data.hash(state),
751 BlobError { data, .. } => resp3_utils::hash_tuple(state, data),
752 VerbatimString { data, format, .. } => {
753 format.hash(state);
754 resp3_utils::hash_tuple(state, data);
755 },
756 ChunkedString(data) => resp3_utils::hash_tuple(state, data),
757 BigNumber { data, .. } => resp3_utils::hash_tuple(state, data),
758 _ => panic!("Invalid RESP3 data type to use as hash key."),
759 };
760 }
761}
762
763#[derive(Debug)]
766pub struct StreamedRangeFrame {
767 pub kind: FrameKind,
768 pub attributes: Option<RangeAttributes>,
769}
770
771impl StreamedRangeFrame {
772 pub fn new(kind: FrameKind) -> Self {
774 StreamedRangeFrame { kind, attributes: None }
775 }
776
777 pub fn add_attributes(&mut self, attributes: RangeAttributes) {
779 if let Some(_attributes) = self.attributes.as_mut() {
780 _attributes.extend(attributes);
781 } else {
782 self.attributes = Some(attributes);
783 }
784 }
785}
786
787#[derive(Debug)]
790pub enum DecodedRangeFrame {
791 Complete(RangeFrame),
792 Streaming(StreamedRangeFrame),
793}
794
795impl DecodedRangeFrame {
796 pub fn add_attributes(&mut self, attributes: RangeAttributes) -> Result<(), RedisProtocolError> {
798 match self {
799 DecodedRangeFrame::Streaming(inner) => {
800 inner.add_attributes(attributes);
801 Ok(())
802 },
803 DecodedRangeFrame::Complete(inner) => inner.add_attributes(attributes),
804 }
805 }
806
807 pub fn into_complete_frame(self) -> Result<RangeFrame, RedisProtocolError> {
809 match self {
810 DecodedRangeFrame::Complete(frame) => Ok(frame),
811 DecodedRangeFrame::Streaming(_) => Err(RedisProtocolError::new(
812 RedisProtocolErrorKind::DecodeError,
813 "Expected complete frame.",
814 )),
815 }
816 }
817}
818
819pub trait Resp3Frame: Debug + Hash + Eq + Sized {
821 type Attributes;
822
823 fn from_buffer(
825 target: FrameKind,
826 buf: impl IntoIterator<Item = Self>,
827 attributes: Option<Self::Attributes>,
828 ) -> Result<Self, RedisProtocolError>;
829
830 fn attributes(&self) -> Option<&Self::Attributes>;
832
833 fn take_attributes(&mut self) -> Option<Self::Attributes>;
835
836 fn attributes_mut(&mut self) -> Option<&mut Self::Attributes>;
838
839 fn add_attributes(&mut self, attributes: Self::Attributes) -> Result<(), RedisProtocolError>;
841
842 fn new_end_stream() -> Self;
844
845 fn new_empty() -> Self;
847
848 fn len(&self) -> usize;
857
858 fn take(&mut self) -> Self;
860
861 fn kind(&self) -> FrameKind;
863
864 fn is_end_stream_frame(&self) -> bool;
866
867 fn verbatim_string_format(&self) -> Option<&VerbatimStringFormat>;
869
870 fn as_str(&self) -> Option<&str>;
872
873 fn as_bool(&self) -> Option<bool>;
875
876 fn to_string(&self) -> Option<String>;
878
879 fn as_bytes(&self) -> Option<&[u8]>;
881
882 fn encode_len(&self, int_as_blobstring: bool) -> usize;
884
885 fn is_normal_pubsub_message(&self) -> bool;
887
888 fn is_pattern_pubsub_message(&self) -> bool;
890
891 fn is_shard_pubsub_message(&self) -> bool;
893
894 fn is_redirection(&self) -> bool {
896 self.as_str().map(utils::is_redirection).unwrap_or(false)
897 }
898
899 fn as_f64(&self) -> Option<f64> {
901 self.as_str().and_then(|s| s.parse::<f64>().ok())
902 }
903
904 #[cfg(feature = "convert")]
906 #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
907 fn convert<T>(self) -> Result<T, RedisProtocolError>
908 where
909 Self: Sized,
910 T: FromResp3<Self>,
911 {
912 T::from_frame(self)
913 }
914
915 #[cfg(feature = "convert")]
917 #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
918 fn is_single_element_vec(&self) -> bool;
919
920 #[cfg(feature = "convert")]
924 #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
925 fn pop_or_take(self) -> Self;
926}
927
928#[derive(Clone, Debug, PartialEq)]
932pub enum OwnedFrame {
933 BlobString {
935 data: Vec<u8>,
936 attributes: Option<OwnedAttributes>,
937 },
938 BlobError {
940 data: Vec<u8>,
941 attributes: Option<OwnedAttributes>,
942 },
943 SimpleString {
948 data: Vec<u8>,
949 attributes: Option<OwnedAttributes>,
950 },
951 SimpleError {
953 data: String,
954 attributes: Option<OwnedAttributes>,
955 },
956 Boolean {
958 data: bool,
959 attributes: Option<OwnedAttributes>,
960 },
961 Null,
963 Number {
965 data: i64,
966 attributes: Option<OwnedAttributes>,
967 },
968 Double {
970 data: f64,
971 attributes: Option<OwnedAttributes>,
972 },
973 BigNumber {
977 data: Vec<u8>,
978 attributes: Option<OwnedAttributes>,
979 },
980 VerbatimString {
982 data: Vec<u8>,
983 format: VerbatimStringFormat,
984 attributes: Option<OwnedAttributes>,
985 },
986 Array {
988 data: Vec<OwnedFrame>,
989 attributes: Option<OwnedAttributes>,
990 },
991 Map {
997 data: FrameMap<OwnedFrame, OwnedFrame>,
998 attributes: Option<OwnedAttributes>,
999 },
1000 Set {
1002 data: FrameSet<OwnedFrame>,
1003 attributes: Option<OwnedAttributes>,
1004 },
1005 Push {
1007 data: Vec<OwnedFrame>,
1008 attributes: Option<OwnedAttributes>,
1009 },
1010 Hello {
1013 version: RespVersion,
1014 auth: Option<(String, String)>,
1015 setname: Option<String>,
1016 },
1017 ChunkedString(Vec<u8>),
1019}
1020
1021impl Eq for OwnedFrame {}
1022
1023impl Hash for OwnedFrame {
1024 fn hash<H: Hasher>(&self, state: &mut H) {
1025 use self::OwnedFrame::*;
1026 self.kind().hash_prefix().hash(state);
1027
1028 match self {
1029 BlobString { data, .. } => data.hash(state),
1030 SimpleString { data, .. } => data.hash(state),
1031 SimpleError { data, .. } => data.hash(state),
1032 Number { data, .. } => data.hash(state),
1033 Null => NULL.hash(state),
1034 Double { data, .. } => data.to_string().hash(state),
1035 Boolean { data, .. } => data.hash(state),
1036 BlobError { data, .. } => data.hash(state),
1037 VerbatimString { data, format, .. } => {
1038 format.hash(state);
1039 data.hash(state);
1040 },
1041 ChunkedString(data) => data.hash(state),
1042 BigNumber { data, .. } => data.hash(state),
1043 _ => panic!("Invalid RESP3 data type to use as hash key."),
1044 };
1045 }
1046}
1047
1048impl Resp3Frame for OwnedFrame {
1049 type Attributes = OwnedAttributes;
1050
1051 fn from_buffer(
1052 target: FrameKind,
1053 buf: impl IntoIterator<Item = Self>,
1054 attributes: Option<Self::Attributes>,
1055 ) -> Result<Self, RedisProtocolError> {
1056 let mut data: Vec<_> = buf.into_iter().collect();
1057
1058 Ok(match target {
1059 FrameKind::BlobString => {
1060 let total_len = data.iter().fold(0, |m, f| m + f.len());
1061 let mut buf = Vec::with_capacity(total_len);
1062 for frame in data.into_iter() {
1063 buf.extend(match frame {
1064 OwnedFrame::ChunkedString(chunk) => chunk,
1065 OwnedFrame::BlobString { data, .. } => data,
1066 _ => {
1067 return Err(RedisProtocolError::new(
1068 RedisProtocolErrorKind::DecodeError,
1069 "Expected chunked or blob string.",
1070 ));
1071 },
1072 });
1073 }
1074
1075 OwnedFrame::BlobString { data: buf, attributes }
1076 },
1077 FrameKind::Map => OwnedFrame::Map {
1078 attributes,
1079 data: data
1080 .chunks_exact_mut(2)
1081 .map(|chunk| (chunk[0].take(), chunk[1].take()))
1082 .collect(),
1083 },
1084 FrameKind::Set => OwnedFrame::Set {
1085 attributes,
1086 data: data.into_iter().collect(),
1087 },
1088 FrameKind::Array => OwnedFrame::Array { attributes, data },
1089 _ => {
1090 return Err(RedisProtocolError::new(
1091 RedisProtocolErrorKind::DecodeError,
1092 "Streaming frames only supported for blob strings, maps, sets, and arrays.",
1093 ))
1094 },
1095 })
1096 }
1097
1098 fn add_attributes(&mut self, attributes: Self::Attributes) -> Result<(), RedisProtocolError> {
1099 let _attributes = match self {
1100 OwnedFrame::Array { attributes, .. } => attributes,
1101 OwnedFrame::Push { attributes, .. } => attributes,
1102 OwnedFrame::BlobString { attributes, .. } => attributes,
1103 OwnedFrame::BlobError { attributes, .. } => attributes,
1104 OwnedFrame::BigNumber { attributes, .. } => attributes,
1105 OwnedFrame::Boolean { attributes, .. } => attributes,
1106 OwnedFrame::Number { attributes, .. } => attributes,
1107 OwnedFrame::Double { attributes, .. } => attributes,
1108 OwnedFrame::VerbatimString { attributes, .. } => attributes,
1109 OwnedFrame::SimpleError { attributes, .. } => attributes,
1110 OwnedFrame::SimpleString { attributes, .. } => attributes,
1111 OwnedFrame::Set { attributes, .. } => attributes,
1112 OwnedFrame::Map { attributes, .. } => attributes,
1113 OwnedFrame::Null | OwnedFrame::ChunkedString(_) | OwnedFrame::Hello { .. } => {
1114 return Err(RedisProtocolError::new(
1115 RedisProtocolErrorKind::Unknown,
1116 format!("{:?} cannot have attributes.", self.kind()),
1117 ))
1118 },
1119 };
1120
1121 if let Some(_attributes) = _attributes.as_mut() {
1122 _attributes.extend(attributes);
1123 } else {
1124 *_attributes = Some(attributes);
1125 }
1126
1127 Ok(())
1128 }
1129
1130 fn attributes(&self) -> Option<&Self::Attributes> {
1131 let attributes = match self {
1132 OwnedFrame::Array { attributes, .. } => attributes,
1133 OwnedFrame::Push { attributes, .. } => attributes,
1134 OwnedFrame::BlobString { attributes, .. } => attributes,
1135 OwnedFrame::BlobError { attributes, .. } => attributes,
1136 OwnedFrame::BigNumber { attributes, .. } => attributes,
1137 OwnedFrame::Boolean { attributes, .. } => attributes,
1138 OwnedFrame::Number { attributes, .. } => attributes,
1139 OwnedFrame::Double { attributes, .. } => attributes,
1140 OwnedFrame::VerbatimString { attributes, .. } => attributes,
1141 OwnedFrame::SimpleError { attributes, .. } => attributes,
1142 OwnedFrame::SimpleString { attributes, .. } => attributes,
1143 OwnedFrame::Set { attributes, .. } => attributes,
1144 OwnedFrame::Map { attributes, .. } => attributes,
1145 OwnedFrame::Null | OwnedFrame::ChunkedString(_) | OwnedFrame::Hello { .. } => return None,
1146 };
1147
1148 attributes.as_ref()
1149 }
1150
1151 fn attributes_mut(&mut self) -> Option<&mut Self::Attributes> {
1152 let attributes = match self {
1153 OwnedFrame::Array { attributes, .. } => attributes,
1154 OwnedFrame::Push { attributes, .. } => attributes,
1155 OwnedFrame::BlobString { attributes, .. } => attributes,
1156 OwnedFrame::BlobError { attributes, .. } => attributes,
1157 OwnedFrame::BigNumber { attributes, .. } => attributes,
1158 OwnedFrame::Boolean { attributes, .. } => attributes,
1159 OwnedFrame::Number { attributes, .. } => attributes,
1160 OwnedFrame::Double { attributes, .. } => attributes,
1161 OwnedFrame::VerbatimString { attributes, .. } => attributes,
1162 OwnedFrame::SimpleError { attributes, .. } => attributes,
1163 OwnedFrame::SimpleString { attributes, .. } => attributes,
1164 OwnedFrame::Set { attributes, .. } => attributes,
1165 OwnedFrame::Map { attributes, .. } => attributes,
1166 OwnedFrame::Null | OwnedFrame::ChunkedString(_) | OwnedFrame::Hello { .. } => return None,
1167 };
1168
1169 attributes.as_mut()
1170 }
1171
1172 fn take_attributes(&mut self) -> Option<Self::Attributes> {
1173 let attributes = match self {
1174 OwnedFrame::Array { attributes, .. } => attributes,
1175 OwnedFrame::Push { attributes, .. } => attributes,
1176 OwnedFrame::BlobString { attributes, .. } => attributes,
1177 OwnedFrame::BlobError { attributes, .. } => attributes,
1178 OwnedFrame::BigNumber { attributes, .. } => attributes,
1179 OwnedFrame::Boolean { attributes, .. } => attributes,
1180 OwnedFrame::Number { attributes, .. } => attributes,
1181 OwnedFrame::Double { attributes, .. } => attributes,
1182 OwnedFrame::VerbatimString { attributes, .. } => attributes,
1183 OwnedFrame::SimpleError { attributes, .. } => attributes,
1184 OwnedFrame::SimpleString { attributes, .. } => attributes,
1185 OwnedFrame::Set { attributes, .. } => attributes,
1186 OwnedFrame::Map { attributes, .. } => attributes,
1187 OwnedFrame::Null | OwnedFrame::ChunkedString(_) | OwnedFrame::Hello { .. } => return None,
1188 };
1189
1190 attributes.take()
1191 }
1192
1193 fn new_end_stream() -> Self {
1194 OwnedFrame::ChunkedString(Vec::new())
1195 }
1196
1197 fn new_empty() -> Self {
1198 OwnedFrame::Number {
1199 data: 0,
1200 attributes: None,
1201 }
1202 }
1203
1204 fn len(&self) -> usize {
1205 match self {
1206 OwnedFrame::Array { data, .. } | OwnedFrame::Push { data, .. } => data.len(),
1207 OwnedFrame::BlobString { data, .. }
1208 | OwnedFrame::BlobError { data, .. }
1209 | OwnedFrame::BigNumber { data, .. }
1210 | OwnedFrame::ChunkedString(data) => data.len(),
1211 OwnedFrame::SimpleString { data, .. } => data.len(),
1212 OwnedFrame::SimpleError { data, .. } => data.len(),
1213 OwnedFrame::Number { .. } | OwnedFrame::Double { .. } | OwnedFrame::Boolean { .. } => 1,
1214 OwnedFrame::Null => 0,
1215 OwnedFrame::VerbatimString { data, .. } => data.len(),
1216 OwnedFrame::Map { data, .. } => data.len(),
1217 OwnedFrame::Set { data, .. } => data.len(),
1218 OwnedFrame::Hello { .. } => 1,
1219 }
1220 }
1221
1222 fn take(&mut self) -> Self {
1223 mem::replace(self, OwnedFrame::Null)
1224 }
1225
1226 fn kind(&self) -> FrameKind {
1227 match self {
1228 OwnedFrame::Array { .. } => FrameKind::Array,
1229 OwnedFrame::BlobString { .. } => FrameKind::BlobString,
1230 OwnedFrame::SimpleString { .. } => FrameKind::SimpleString,
1231 OwnedFrame::SimpleError { .. } => FrameKind::SimpleError,
1232 OwnedFrame::Number { .. } => FrameKind::Number,
1233 OwnedFrame::Null => FrameKind::Null,
1234 OwnedFrame::Double { .. } => FrameKind::Double,
1235 OwnedFrame::BlobError { .. } => FrameKind::BlobError,
1236 OwnedFrame::VerbatimString { .. } => FrameKind::VerbatimString,
1237 OwnedFrame::Boolean { .. } => FrameKind::Boolean,
1238 OwnedFrame::Map { .. } => FrameKind::Map,
1239 OwnedFrame::Set { .. } => FrameKind::Set,
1240 OwnedFrame::Push { .. } => FrameKind::Push,
1241 OwnedFrame::Hello { .. } => FrameKind::Hello,
1242 OwnedFrame::BigNumber { .. } => FrameKind::BigNumber,
1243 OwnedFrame::ChunkedString(inner) => {
1244 if inner.is_empty() {
1245 FrameKind::EndStream
1246 } else {
1247 FrameKind::ChunkedString
1248 }
1249 },
1250 }
1251 }
1252
1253 fn is_end_stream_frame(&self) -> bool {
1254 match self {
1255 OwnedFrame::ChunkedString(s) => s.is_empty(),
1256 _ => false,
1257 }
1258 }
1259
1260 fn verbatim_string_format(&self) -> Option<&VerbatimStringFormat> {
1261 match self {
1262 OwnedFrame::VerbatimString { format, .. } => Some(format),
1263 _ => None,
1264 }
1265 }
1266
1267 fn as_str(&self) -> Option<&str> {
1268 match self {
1269 OwnedFrame::SimpleError { data, .. } => Some(data),
1270 OwnedFrame::SimpleString { data, .. } => str::from_utf8(data).ok(),
1271 OwnedFrame::BlobError { data, .. }
1272 | OwnedFrame::BlobString { data, .. }
1273 | OwnedFrame::BigNumber { data, .. } => str::from_utf8(data).ok(),
1274 OwnedFrame::VerbatimString { data, .. } => str::from_utf8(data).ok(),
1275 OwnedFrame::ChunkedString(data) => str::from_utf8(data).ok(),
1276 _ => None,
1277 }
1278 }
1279
1280 fn as_bool(&self) -> Option<bool> {
1281 match self {
1282 OwnedFrame::SimpleString { data, .. }
1283 | OwnedFrame::BlobString { data, .. }
1284 | OwnedFrame::VerbatimString { data, .. } => utils::bytes_to_bool(data),
1285 OwnedFrame::ChunkedString(data) => utils::bytes_to_bool(data),
1286 OwnedFrame::Boolean { data, .. } => Some(*data),
1287 OwnedFrame::Number { data, .. } => match data {
1288 0 => Some(false),
1289 1 => Some(true),
1290 _ => None,
1291 },
1292 _ => None,
1293 }
1294 }
1295
1296 fn to_string(&self) -> Option<String> {
1297 match self {
1298 OwnedFrame::SimpleError { data, .. } => Some(data.to_string()),
1299 OwnedFrame::SimpleString { data, .. } => String::from_utf8(data.to_vec()).ok(),
1300 OwnedFrame::BlobError { data, .. }
1301 | OwnedFrame::BlobString { data, .. }
1302 | OwnedFrame::BigNumber { data, .. } => String::from_utf8(data.to_vec()).ok(),
1303 OwnedFrame::VerbatimString { data, .. } => String::from_utf8(data.to_vec()).ok(),
1304 OwnedFrame::ChunkedString(b) => String::from_utf8(b.to_vec()).ok(),
1305 OwnedFrame::Double { data, .. } => Some(data.to_string()),
1306 OwnedFrame::Number { data, .. } => Some(data.to_string()),
1307 _ => None,
1308 }
1309 }
1310
1311 fn as_bytes(&self) -> Option<&[u8]> {
1312 match self {
1313 OwnedFrame::SimpleError { data, .. } => Some(data.as_bytes()),
1314 OwnedFrame::SimpleString { data, .. } => Some(data),
1315 OwnedFrame::BlobError { data, .. }
1316 | OwnedFrame::BlobString { data, .. }
1317 | OwnedFrame::BigNumber { data, .. } => Some(data),
1318 OwnedFrame::VerbatimString { data, .. } => Some(data),
1319 OwnedFrame::ChunkedString(b) => Some(b),
1320 _ => None,
1321 }
1322 }
1323
1324 fn encode_len(&self, int_as_blobstring: bool) -> usize {
1325 resp3_utils::owned_encode_len(self, int_as_blobstring)
1326 }
1327
1328 fn is_normal_pubsub_message(&self) -> bool {
1329 match self {
1332 OwnedFrame::Array { data, .. } | OwnedFrame::Push { data, .. } => {
1333 (data.len() == 3 && data[0].as_str().map(|s| s == PUBSUB_PREFIX).unwrap_or(false))
1334 || (data.len() == 4
1335 && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1336 && data[1].as_str().map(|s| s == PUBSUB_PREFIX).unwrap_or(false))
1337 },
1338 _ => false,
1339 }
1340 }
1341
1342 fn is_pattern_pubsub_message(&self) -> bool {
1343 match self {
1346 OwnedFrame::Array { data, .. } | OwnedFrame::Push { data, .. } => {
1347 (data.len() == 4 && data[0].as_str().map(|s| s == PATTERN_PUBSUB_PREFIX).unwrap_or(false))
1348 || (data.len() == 5
1349 && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1350 && data[1].as_str().map(|s| s == PATTERN_PUBSUB_PREFIX).unwrap_or(false))
1351 },
1352 _ => false,
1353 }
1354 }
1355
1356 fn is_shard_pubsub_message(&self) -> bool {
1357 match self {
1360 OwnedFrame::Array { data, .. } | OwnedFrame::Push { data, .. } => {
1361 (data.len() == 3 && data[0].as_str().map(|s| s == SHARD_PUBSUB_PREFIX).unwrap_or(false))
1362 || (data.len() == 4
1363 && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1364 && data[1].as_str().map(|s| s == SHARD_PUBSUB_PREFIX).unwrap_or(false))
1365 },
1366 _ => false,
1367 }
1368 }
1369
1370 #[cfg(feature = "convert")]
1371 #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
1372 fn is_single_element_vec(&self) -> bool {
1373 match self {
1374 OwnedFrame::Array { data, .. } | OwnedFrame::Push { data, .. } => data.len() == 1,
1375 _ => false,
1376 }
1377 }
1378
1379 #[cfg(feature = "convert")]
1380 #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
1381 fn pop_or_take(self) -> Self {
1382 match self {
1383 OwnedFrame::Array { mut data, .. } | OwnedFrame::Push { mut data, .. } => data.pop().unwrap(),
1384 _ => self,
1385 }
1386 }
1387}
1388
1389impl OwnedFrame {
1390 #[cfg(feature = "bytes")]
1392 #[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
1393 pub fn into_bytes_frame(self) -> BytesFrame {
1394 resp3_utils::owned_to_bytes_frame(self)
1395 }
1396}
1397
1398impl<B: Into<Vec<u8>>> TryFrom<(FrameKind, B)> for OwnedFrame {
1399 type Error = RedisProtocolError;
1400
1401 fn try_from((kind, buf): (FrameKind, B)) -> Result<Self, Self::Error> {
1402 Ok(match kind {
1403 FrameKind::SimpleString => OwnedFrame::SimpleString {
1404 data: buf.into(),
1405 attributes: None,
1406 },
1407 FrameKind::SimpleError => OwnedFrame::SimpleError {
1408 data: String::from_utf8(buf.into())?,
1409 attributes: None,
1410 },
1411 FrameKind::BlobString => OwnedFrame::BlobString {
1412 data: buf.into(),
1413 attributes: None,
1414 },
1415 FrameKind::BlobError => OwnedFrame::BlobError {
1416 data: buf.into(),
1417 attributes: None,
1418 },
1419 FrameKind::BigNumber => OwnedFrame::BigNumber {
1420 data: buf.into(),
1421 attributes: None,
1422 },
1423 FrameKind::ChunkedString => OwnedFrame::ChunkedString(buf.into()),
1424 FrameKind::Null => OwnedFrame::Null,
1425 _ => {
1426 return Err(RedisProtocolError::new(
1427 RedisProtocolErrorKind::Unknown,
1428 "Cannot convert to frame.",
1429 ))
1430 },
1431 })
1432 }
1433}
1434
1435impl From<i64> for OwnedFrame {
1436 fn from(value: i64) -> Self {
1437 OwnedFrame::Number {
1438 data: value,
1439 attributes: None,
1440 }
1441 }
1442}
1443
1444impl From<bool> for OwnedFrame {
1445 fn from(value: bool) -> Self {
1446 OwnedFrame::Boolean {
1447 data: value,
1448 attributes: None,
1449 }
1450 }
1451}
1452
1453impl From<f64> for OwnedFrame {
1454 fn from(value: f64) -> Self {
1455 OwnedFrame::Double {
1456 data: value,
1457 attributes: None,
1458 }
1459 }
1460}
1461
1462#[cfg(feature = "bytes")]
1466#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
1467#[derive(Clone, Debug, PartialEq)]
1468pub enum BytesFrame {
1469 BlobString {
1471 data: Bytes,
1472 attributes: Option<BytesAttributes>,
1473 },
1474 BlobError {
1476 data: Bytes,
1477 attributes: Option<BytesAttributes>,
1478 },
1479 SimpleString {
1484 data: Bytes,
1485 attributes: Option<BytesAttributes>,
1486 },
1487 SimpleError {
1489 data: Str,
1490 attributes: Option<BytesAttributes>,
1491 },
1492 Boolean {
1494 data: bool,
1495 attributes: Option<BytesAttributes>,
1496 },
1497 Null,
1499 Number {
1501 data: i64,
1502 attributes: Option<BytesAttributes>,
1503 },
1504 Double {
1506 data: f64,
1507 attributes: Option<BytesAttributes>,
1508 },
1509 BigNumber {
1513 data: Bytes,
1514 attributes: Option<BytesAttributes>,
1515 },
1516 VerbatimString {
1518 data: Bytes,
1519 format: VerbatimStringFormat,
1520 attributes: Option<BytesAttributes>,
1521 },
1522 Array {
1524 data: Vec<BytesFrame>,
1525 attributes: Option<BytesAttributes>,
1526 },
1527 Map {
1533 data: FrameMap<BytesFrame, BytesFrame>,
1534 attributes: Option<BytesAttributes>,
1535 },
1536 Set {
1538 data: FrameSet<BytesFrame>,
1539 attributes: Option<BytesAttributes>,
1540 },
1541 Push {
1543 data: Vec<BytesFrame>,
1544 attributes: Option<BytesAttributes>,
1545 },
1546 Hello {
1549 version: RespVersion,
1550 auth: Option<(Str, Str)>,
1551 setname: Option<Str>,
1552 },
1553 ChunkedString(Bytes),
1555}
1556
1557#[cfg(feature = "bytes")]
1558impl<B: Into<Bytes>> TryFrom<(FrameKind, B)> for BytesFrame {
1559 type Error = RedisProtocolError;
1560
1561 fn try_from((kind, buf): (FrameKind, B)) -> Result<Self, Self::Error> {
1562 Ok(match kind {
1563 FrameKind::SimpleString => BytesFrame::SimpleString {
1564 data: buf.into(),
1565 attributes: None,
1566 },
1567 FrameKind::SimpleError => BytesFrame::SimpleError {
1568 data: Str::from_inner(buf.into())?,
1569 attributes: None,
1570 },
1571 FrameKind::BlobString => BytesFrame::BlobString {
1572 data: buf.into(),
1573 attributes: None,
1574 },
1575 FrameKind::BlobError => BytesFrame::BlobError {
1576 data: buf.into(),
1577 attributes: None,
1578 },
1579 FrameKind::BigNumber => BytesFrame::BigNumber {
1580 data: buf.into(),
1581 attributes: None,
1582 },
1583 FrameKind::ChunkedString => BytesFrame::ChunkedString(buf.into()),
1584 FrameKind::Null => BytesFrame::Null,
1585 _ => {
1586 return Err(RedisProtocolError::new(
1587 RedisProtocolErrorKind::Unknown,
1588 "Cannot convert to frame.",
1589 ))
1590 },
1591 })
1592 }
1593}
1594
1595#[cfg(feature = "bytes")]
1596impl From<i64> for BytesFrame {
1597 fn from(value: i64) -> Self {
1598 BytesFrame::Number {
1599 data: value,
1600 attributes: None,
1601 }
1602 }
1603}
1604
1605#[cfg(feature = "bytes")]
1606impl From<bool> for BytesFrame {
1607 fn from(value: bool) -> Self {
1608 BytesFrame::Boolean {
1609 data: value,
1610 attributes: None,
1611 }
1612 }
1613}
1614
1615#[cfg(feature = "bytes")]
1616impl From<f64> for BytesFrame {
1617 fn from(value: f64) -> Self {
1618 BytesFrame::Double {
1619 data: value,
1620 attributes: None,
1621 }
1622 }
1623}
1624
1625#[cfg(feature = "bytes")]
1626impl Hash for BytesFrame {
1627 fn hash<H: Hasher>(&self, state: &mut H) {
1628 use self::BytesFrame::*;
1629 self.kind().hash_prefix().hash(state);
1630
1631 match self {
1632 BlobString { data, .. } => data.hash(state),
1633 SimpleString { data, .. } => data.hash(state),
1634 SimpleError { data, .. } => data.hash(state),
1635 Number { data, .. } => data.hash(state),
1636 Null => NULL.hash(state),
1637 Double { data, .. } => data.to_string().hash(state),
1638 Boolean { data, .. } => data.hash(state),
1639 BlobError { data, .. } => data.hash(state),
1640 VerbatimString { data, format, .. } => {
1641 format.hash(state);
1642 data.hash(state);
1643 },
1644 ChunkedString(data) => data.hash(state),
1645 BigNumber { data, .. } => data.hash(state),
1646 _ => panic!("Invalid RESP3 data type to use as hash key."),
1647 };
1648 }
1649}
1650
1651#[cfg(feature = "bytes")]
1652impl Eq for BytesFrame {}
1653
1654#[cfg(feature = "bytes")]
1655impl Resp3Frame for BytesFrame {
1656 type Attributes = BytesAttributes;
1657
1658 fn from_buffer(
1659 target: FrameKind,
1660 buf: impl IntoIterator<Item = Self>,
1661 attributes: Option<Self::Attributes>,
1662 ) -> Result<Self, RedisProtocolError> {
1663 let mut data: Vec<_> = buf.into_iter().collect();
1664
1665 Ok(match target {
1666 FrameKind::BlobString => {
1667 let total_len = data.iter().fold(0, |m, f| m + f.len());
1668 let mut buf = BytesMut::with_capacity(total_len);
1669 for frame in data.into_iter() {
1670 buf.extend(match frame {
1671 BytesFrame::ChunkedString(chunk) => chunk,
1672 BytesFrame::BlobString { data, .. } => data,
1673 _ => {
1674 return Err(RedisProtocolError::new(
1675 RedisProtocolErrorKind::DecodeError,
1676 "Expected chunked or blob string.",
1677 ));
1678 },
1679 });
1680 }
1681
1682 BytesFrame::BlobString {
1683 data: buf.freeze(),
1684 attributes,
1685 }
1686 },
1687 FrameKind::Map => BytesFrame::Map {
1688 attributes,
1689 data: data
1690 .chunks_exact_mut(2)
1691 .map(|chunk| (chunk[0].take(), chunk[1].take()))
1692 .collect(),
1693 },
1694 FrameKind::Set => BytesFrame::Set {
1695 attributes,
1696 data: data.into_iter().collect(),
1697 },
1698 FrameKind::Array => BytesFrame::Array { attributes, data },
1699 _ => {
1700 return Err(RedisProtocolError::new(
1701 RedisProtocolErrorKind::DecodeError,
1702 "Streaming frames only supported for blob strings, maps, sets, and arrays.",
1703 ))
1704 },
1705 })
1706 }
1707
1708 fn attributes(&self) -> Option<&Self::Attributes> {
1709 let attributes = match self {
1710 BytesFrame::Array { attributes, .. } => attributes,
1711 BytesFrame::Push { attributes, .. } => attributes,
1712 BytesFrame::BlobString { attributes, .. } => attributes,
1713 BytesFrame::BlobError { attributes, .. } => attributes,
1714 BytesFrame::BigNumber { attributes, .. } => attributes,
1715 BytesFrame::Boolean { attributes, .. } => attributes,
1716 BytesFrame::Number { attributes, .. } => attributes,
1717 BytesFrame::Double { attributes, .. } => attributes,
1718 BytesFrame::VerbatimString { attributes, .. } => attributes,
1719 BytesFrame::SimpleError { attributes, .. } => attributes,
1720 BytesFrame::SimpleString { attributes, .. } => attributes,
1721 BytesFrame::Set { attributes, .. } => attributes,
1722 BytesFrame::Map { attributes, .. } => attributes,
1723 BytesFrame::Null | BytesFrame::ChunkedString(_) | BytesFrame::Hello { .. } => return None,
1724 };
1725
1726 attributes.as_ref()
1727 }
1728
1729 fn take_attributes(&mut self) -> Option<Self::Attributes> {
1730 let attributes = match self {
1731 BytesFrame::Array { attributes, .. } => attributes,
1732 BytesFrame::Push { attributes, .. } => attributes,
1733 BytesFrame::BlobString { attributes, .. } => attributes,
1734 BytesFrame::BlobError { attributes, .. } => attributes,
1735 BytesFrame::BigNumber { attributes, .. } => attributes,
1736 BytesFrame::Boolean { attributes, .. } => attributes,
1737 BytesFrame::Number { attributes, .. } => attributes,
1738 BytesFrame::Double { attributes, .. } => attributes,
1739 BytesFrame::VerbatimString { attributes, .. } => attributes,
1740 BytesFrame::SimpleError { attributes, .. } => attributes,
1741 BytesFrame::SimpleString { attributes, .. } => attributes,
1742 BytesFrame::Set { attributes, .. } => attributes,
1743 BytesFrame::Map { attributes, .. } => attributes,
1744 BytesFrame::Null | BytesFrame::ChunkedString(_) | BytesFrame::Hello { .. } => return None,
1745 };
1746
1747 attributes.take()
1748 }
1749
1750 fn attributes_mut(&mut self) -> Option<&mut Self::Attributes> {
1751 let attributes = match self {
1752 BytesFrame::Array { attributes, .. } => attributes,
1753 BytesFrame::Push { attributes, .. } => attributes,
1754 BytesFrame::BlobString { attributes, .. } => attributes,
1755 BytesFrame::BlobError { attributes, .. } => attributes,
1756 BytesFrame::BigNumber { attributes, .. } => attributes,
1757 BytesFrame::Boolean { attributes, .. } => attributes,
1758 BytesFrame::Number { attributes, .. } => attributes,
1759 BytesFrame::Double { attributes, .. } => attributes,
1760 BytesFrame::VerbatimString { attributes, .. } => attributes,
1761 BytesFrame::SimpleError { attributes, .. } => attributes,
1762 BytesFrame::SimpleString { attributes, .. } => attributes,
1763 BytesFrame::Set { attributes, .. } => attributes,
1764 BytesFrame::Map { attributes, .. } => attributes,
1765 BytesFrame::Null | BytesFrame::ChunkedString(_) | BytesFrame::Hello { .. } => return None,
1766 };
1767
1768 attributes.as_mut()
1769 }
1770
1771 fn add_attributes(&mut self, attributes: Self::Attributes) -> Result<(), RedisProtocolError> {
1773 let _attributes = match self {
1774 BytesFrame::Array { attributes, .. } => attributes,
1775 BytesFrame::Push { attributes, .. } => attributes,
1776 BytesFrame::BlobString { attributes, .. } => attributes,
1777 BytesFrame::BlobError { attributes, .. } => attributes,
1778 BytesFrame::BigNumber { attributes, .. } => attributes,
1779 BytesFrame::Boolean { attributes, .. } => attributes,
1780 BytesFrame::Number { attributes, .. } => attributes,
1781 BytesFrame::Double { attributes, .. } => attributes,
1782 BytesFrame::VerbatimString { attributes, .. } => attributes,
1783 BytesFrame::SimpleError { attributes, .. } => attributes,
1784 BytesFrame::SimpleString { attributes, .. } => attributes,
1785 BytesFrame::Set { attributes, .. } => attributes,
1786 BytesFrame::Map { attributes, .. } => attributes,
1787 BytesFrame::Null | BytesFrame::ChunkedString(_) | BytesFrame::Hello { .. } => {
1788 return Err(RedisProtocolError::new(
1789 RedisProtocolErrorKind::Unknown,
1790 format!("{:?} cannot have attributes.", self.kind()),
1791 ))
1792 },
1793 };
1794
1795 if let Some(_attributes) = _attributes.as_mut() {
1796 _attributes.extend(attributes);
1797 } else {
1798 *_attributes = Some(attributes);
1799 }
1800
1801 Ok(())
1802 }
1803
1804 fn new_empty() -> Self {
1805 BytesFrame::Number {
1806 data: 0,
1807 attributes: None,
1808 }
1809 }
1810
1811 fn new_end_stream() -> Self {
1812 BytesFrame::ChunkedString(Bytes::new())
1813 }
1814
1815 fn len(&self) -> usize {
1816 match self {
1817 BytesFrame::Array { data, .. } | BytesFrame::Push { data, .. } => data.len(),
1818 BytesFrame::BlobString { data, .. }
1819 | BytesFrame::BlobError { data, .. }
1820 | BytesFrame::BigNumber { data, .. }
1821 | BytesFrame::ChunkedString(data) => data.len(),
1822 BytesFrame::SimpleString { data, .. } => data.len(),
1823 BytesFrame::SimpleError { data, .. } => data.len(),
1824 BytesFrame::Number { .. } | BytesFrame::Double { .. } | BytesFrame::Boolean { .. } => 1,
1825 BytesFrame::Null => 0,
1826 BytesFrame::VerbatimString { data, .. } => data.len(),
1827 BytesFrame::Map { data, .. } => data.len(),
1828 BytesFrame::Set { data, .. } => data.len(),
1829 BytesFrame::Hello { .. } => 1,
1830 }
1831 }
1832
1833 fn take(&mut self) -> BytesFrame {
1834 mem::replace(self, BytesFrame::Null)
1835 }
1836
1837 fn kind(&self) -> FrameKind {
1838 match self {
1839 BytesFrame::Array { .. } => FrameKind::Array,
1840 BytesFrame::BlobString { .. } => FrameKind::BlobString,
1841 BytesFrame::SimpleString { .. } => FrameKind::SimpleString,
1842 BytesFrame::SimpleError { .. } => FrameKind::SimpleError,
1843 BytesFrame::Number { .. } => FrameKind::Number,
1844 BytesFrame::Null => FrameKind::Null,
1845 BytesFrame::Double { .. } => FrameKind::Double,
1846 BytesFrame::BlobError { .. } => FrameKind::BlobError,
1847 BytesFrame::VerbatimString { .. } => FrameKind::VerbatimString,
1848 BytesFrame::Boolean { .. } => FrameKind::Boolean,
1849 BytesFrame::Map { .. } => FrameKind::Map,
1850 BytesFrame::Set { .. } => FrameKind::Set,
1851 BytesFrame::Push { .. } => FrameKind::Push,
1852 BytesFrame::Hello { .. } => FrameKind::Hello,
1853 BytesFrame::BigNumber { .. } => FrameKind::BigNumber,
1854 BytesFrame::ChunkedString(inner) => {
1855 if inner.is_empty() {
1856 FrameKind::EndStream
1857 } else {
1858 FrameKind::ChunkedString
1859 }
1860 },
1861 }
1862 }
1863
1864 fn is_end_stream_frame(&self) -> bool {
1865 match self {
1866 BytesFrame::ChunkedString(s) => s.is_empty(),
1867 _ => false,
1868 }
1869 }
1870
1871 fn verbatim_string_format(&self) -> Option<&VerbatimStringFormat> {
1872 match self {
1873 BytesFrame::VerbatimString { format, .. } => Some(format),
1874 _ => None,
1875 }
1876 }
1877
1878 fn as_str(&self) -> Option<&str> {
1879 match self {
1880 BytesFrame::SimpleError { data, .. } => Some(data),
1881 BytesFrame::SimpleString { data, .. }
1882 | BytesFrame::BlobError { data, .. }
1883 | BytesFrame::BlobString { data, .. }
1884 | BytesFrame::BigNumber { data, .. } => str::from_utf8(data).ok(),
1885 BytesFrame::VerbatimString { data, .. } => str::from_utf8(data).ok(),
1886 BytesFrame::ChunkedString(data) => str::from_utf8(data).ok(),
1887 _ => None,
1888 }
1889 }
1890
1891 fn as_bool(&self) -> Option<bool> {
1892 match self {
1893 BytesFrame::SimpleString { data, .. }
1894 | BytesFrame::BlobString { data, .. }
1895 | BytesFrame::VerbatimString { data, .. } => utils::bytes_to_bool(data),
1896 BytesFrame::ChunkedString(data) => utils::bytes_to_bool(data),
1897 BytesFrame::Boolean { data, .. } => Some(*data),
1898 BytesFrame::Number { data, .. } => match data {
1899 0 => Some(false),
1900 1 => Some(true),
1901 _ => None,
1902 },
1903 _ => None,
1904 }
1905 }
1906
1907 fn to_string(&self) -> Option<String> {
1908 match self {
1909 BytesFrame::SimpleError { data, .. } => Some(data.to_string()),
1910 BytesFrame::SimpleString { data, .. }
1911 | BytesFrame::BlobError { data, .. }
1912 | BytesFrame::BlobString { data, .. }
1913 | BytesFrame::BigNumber { data, .. } => String::from_utf8(data.to_vec()).ok(),
1914 BytesFrame::VerbatimString { data, .. } => String::from_utf8(data.to_vec()).ok(),
1915 BytesFrame::ChunkedString(b) => String::from_utf8(b.to_vec()).ok(),
1916 BytesFrame::Double { data, .. } => Some(data.to_string()),
1917 BytesFrame::Number { data, .. } => Some(data.to_string()),
1918 _ => None,
1919 }
1920 }
1921
1922 fn as_bytes(&self) -> Option<&[u8]> {
1923 match self {
1924 BytesFrame::SimpleError { data, .. } => Some(data.as_bytes()),
1925 BytesFrame::SimpleString { data, .. } => Some(data),
1926 BytesFrame::BlobError { data, .. }
1927 | BytesFrame::BlobString { data, .. }
1928 | BytesFrame::BigNumber { data, .. } => Some(data),
1929 BytesFrame::VerbatimString { data, .. } => Some(data),
1930 BytesFrame::ChunkedString(b) => Some(b),
1931 _ => None,
1932 }
1933 }
1934
1935 fn encode_len(&self, int_as_blobstring: bool) -> usize {
1936 resp3_utils::bytes_encode_len(self, int_as_blobstring)
1937 }
1938
1939 fn is_normal_pubsub_message(&self) -> bool {
1940 match self {
1949 BytesFrame::Array { data, .. } | BytesFrame::Push { data, .. } => {
1950 (data.len() == 3 && data[0].as_str().map(|s| s == PUBSUB_PREFIX).unwrap_or(false))
1951 || (data.len() == 4
1952 && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1953 && data[1].as_str().map(|s| s == PUBSUB_PREFIX).unwrap_or(false))
1954 },
1955 _ => false,
1956 }
1957 }
1958
1959 fn is_pattern_pubsub_message(&self) -> bool {
1960 match self {
1963 BytesFrame::Array { data, .. } | BytesFrame::Push { data, .. } => {
1964 (data.len() == 4 && data[0].as_str().map(|s| s == PATTERN_PUBSUB_PREFIX).unwrap_or(false))
1965 || (data.len() == 5
1966 && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1967 && data[1].as_str().map(|s| s == PATTERN_PUBSUB_PREFIX).unwrap_or(false))
1968 },
1969 _ => false,
1970 }
1971 }
1972
1973 fn is_shard_pubsub_message(&self) -> bool {
1974 match self {
1977 BytesFrame::Array { data, .. } | BytesFrame::Push { data, .. } => {
1978 (data.len() == 3 && data[0].as_str().map(|s| s == SHARD_PUBSUB_PREFIX).unwrap_or(false))
1979 || (data.len() == 4
1980 && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1981 && data[1].as_str().map(|s| s == SHARD_PUBSUB_PREFIX).unwrap_or(false))
1982 },
1983 _ => false,
1984 }
1985 }
1986
1987 #[cfg(feature = "convert")]
1988 #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
1989 fn is_single_element_vec(&self) -> bool {
1990 match self {
1991 BytesFrame::Array { data, .. } | BytesFrame::Push { data, .. } => data.len() == 1,
1992 _ => false,
1993 }
1994 }
1995
1996 #[cfg(feature = "convert")]
1997 #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
1998 fn pop_or_take(self) -> Self {
1999 match self {
2000 BytesFrame::Array { mut data, .. } | BytesFrame::Push { mut data, .. } => data.pop().unwrap(),
2001 _ => self,
2002 }
2003 }
2004}
2005
2006#[cfg(feature = "bytes")]
2007impl BytesFrame {
2008 pub fn to_owned_frame(&self) -> OwnedFrame {
2010 resp3_utils::bytes_to_owned_frame(self)
2011 }
2012}
2013
2014#[derive(Debug, Eq, PartialEq)]
2016pub enum DecodedFrame<T: Resp3Frame> {
2017 Streaming(StreamedFrame<T>),
2018 Complete(T),
2019}
2020
2021impl<T: Resp3Frame> DecodedFrame<T> {
2022 pub fn add_attributes(&mut self, attributes: T::Attributes) -> Result<(), RedisProtocolError> {
2024 match self {
2025 DecodedFrame::Streaming(inner) => inner.add_attributes(attributes),
2026 DecodedFrame::Complete(inner) => inner.add_attributes(attributes),
2027 }
2028 }
2029
2030 pub fn into_complete_frame(self) -> Result<T, RedisProtocolError> {
2032 match self {
2033 DecodedFrame::Complete(frame) => Ok(frame),
2034 DecodedFrame::Streaming(_) => Err(RedisProtocolError::new(
2035 RedisProtocolErrorKind::DecodeError,
2036 "Expected complete frame.",
2037 )),
2038 }
2039 }
2040
2041 pub fn into_streaming_frame(self) -> Result<StreamedFrame<T>, RedisProtocolError> {
2043 match self {
2044 DecodedFrame::Streaming(frame) => Ok(frame),
2045 DecodedFrame::Complete(_) => Err(RedisProtocolError::new(
2046 RedisProtocolErrorKind::DecodeError,
2047 "Expected streamed frame.",
2048 )),
2049 }
2050 }
2051
2052 pub fn is_streaming(&self) -> bool {
2054 matches!(self, DecodedFrame::Streaming(_))
2055 }
2056
2057 pub fn is_complete(&self) -> bool {
2059 matches!(self, DecodedFrame::Complete(_))
2060 }
2061}
2062
2063#[derive(Debug, Eq, PartialEq)]
2103pub struct StreamedFrame<T: Resp3Frame> {
2104 buffer: VecDeque<T>,
2106 attribute_frame: T,
2108 pub kind: FrameKind,
2110}
2111
2112impl<T: Resp3Frame> StreamedFrame<T> {
2113 pub fn new(kind: FrameKind) -> Self {
2115 let buffer = VecDeque::new();
2116 StreamedFrame {
2117 buffer,
2118 kind,
2119 attribute_frame: T::new_empty(),
2120 }
2121 }
2122
2123 pub fn add_attributes(&mut self, attributes: T::Attributes) -> Result<(), RedisProtocolError> {
2125 self.attribute_frame.add_attributes(attributes)
2126 }
2127
2128 pub fn take(&mut self) -> Result<T, RedisProtocolError> {
2130 if !self.kind.is_streaming_type() {
2131 return Err(RedisProtocolError::new(
2133 RedisProtocolErrorKind::DecodeError,
2134 "Only blob strings, sets, maps, and arrays can be streamed.",
2135 ));
2136 }
2137
2138 if self.is_finished() {
2139 self.buffer.pop_back();
2141 }
2142 let buffer = mem::take(&mut self.buffer);
2143 let attributes = self.attribute_frame.take_attributes();
2144 T::from_buffer(self.kind, buffer, attributes)
2145 }
2146
2147 pub fn add_frame(&mut self, frame: T) {
2149 self.buffer.push_back(frame);
2150 }
2151
2152 pub fn is_finished(&self) -> bool {
2154 self.buffer.back().map(|f| f.is_end_stream_frame()).unwrap_or(false)
2155 }
2156}
2157
2158#[cfg(test)]
2159#[cfg(feature = "bytes")]
2160mod tests {
2161 use super::*;
2162 use crate::resp3::utils::new_map;
2163
2164 #[test]
2165 fn should_convert_basic_streaming_buffer_to_frame() {
2166 let mut streaming_buf = StreamedFrame::new(FrameKind::BlobString);
2167 streaming_buf.add_frame((FrameKind::ChunkedString, "foo").try_into().unwrap());
2168 streaming_buf.add_frame((FrameKind::ChunkedString, "bar").try_into().unwrap());
2169 streaming_buf.add_frame((FrameKind::ChunkedString, "baz").try_into().unwrap());
2170 streaming_buf.add_frame(BytesFrame::new_end_stream());
2171
2172 let frame = streaming_buf.take().expect("Failed to build frame from chunked stream");
2173 assert_eq!(frame.as_str(), Some("foobarbaz"));
2174 }
2175
2176 #[test]
2177 fn should_convert_basic_streaming_buffer_to_frame_with_attributes() {
2178 let mut attributes = new_map(0);
2179 attributes.insert((FrameKind::SimpleString, "a").try_into().unwrap(), 1.into());
2180 attributes.insert((FrameKind::SimpleString, "b").try_into().unwrap(), 2.into());
2181 attributes.insert((FrameKind::SimpleString, "c").try_into().unwrap(), 3.into());
2182
2183 let mut streaming_buf = StreamedFrame::new(FrameKind::BlobString);
2184 streaming_buf.add_attributes(attributes.clone()).unwrap();
2185
2186 streaming_buf.add_frame((FrameKind::ChunkedString, "foo").try_into().unwrap());
2187 streaming_buf.add_frame((FrameKind::ChunkedString, "bar").try_into().unwrap());
2188 streaming_buf.add_frame((FrameKind::ChunkedString, "baz").try_into().unwrap());
2189 streaming_buf.add_frame(BytesFrame::new_end_stream());
2190
2191 let frame = streaming_buf.take().expect("Failed to build frame from chunked stream");
2192 assert_eq!(frame.as_str(), Some("foobarbaz"));
2193 assert_eq!(frame.attributes(), Some(&attributes));
2194 }
2195}