redis_protocol/resp3/
encode.rs

1//! Functions for encoding Frames into the RESP3 protocol.
2//!
3//! <https://github.com/antirez/RESP3/blob/master/spec.md>
4
5use crate::resp3::types::*;
6use crate::resp3::utils::{self as resp3_utils};
7use crate::types::{RedisProtocolError, RedisProtocolErrorKind, CRLF};
8use crate::utils;
9use crate::alloc::string::ToString;
10use alloc::vec::Vec;
11use bytes::BytesMut;
12use cookie_factory::GenError;
13
14macro_rules! encode_attributes (
15  ($x:ident, $attributes:ident) => {
16    if let Some(ref attributes) = *$attributes {
17      $x = gen_attribute($x, attributes)?;
18    }
19  }
20);
21
22fn gen_simplestring<'a>(
23  mut x: (&'a mut [u8], usize),
24  data: &[u8],
25  attributes: &Option<Attributes>,
26) -> Result<(&'a mut [u8], usize), GenError> {
27  encode_attributes!(x, attributes);
28
29  do_gen!(
30    x,
31    gen_be_u8!(FrameKind::SimpleString.to_byte()) >> gen_slice!(data) >> gen_slice!(CRLF.as_bytes())
32  )
33}
34
35fn gen_simpleerror<'a>(
36  mut x: (&'a mut [u8], usize),
37  data: &str,
38  attributes: &Option<Attributes>,
39) -> Result<(&'a mut [u8], usize), GenError> {
40  encode_attributes!(x, attributes);
41
42  do_gen!(
43    x,
44    gen_be_u8!(FrameKind::SimpleError.to_byte()) >> gen_slice!(data.as_bytes()) >> gen_slice!(CRLF.as_bytes())
45  )
46}
47
48fn gen_number<'a>(
49  mut x: (&'a mut [u8], usize),
50  data: &i64,
51  attributes: &Option<Attributes>,
52) -> Result<(&'a mut [u8], usize), GenError> {
53  encode_attributes!(x, attributes);
54
55  do_gen!(
56    x,
57    gen_be_u8!(FrameKind::Number.to_byte()) >> gen_slice!(data.to_string().as_bytes()) >> gen_slice!(CRLF.as_bytes())
58  )
59}
60
61fn gen_null(x: (&mut [u8], usize)) -> Result<(&mut [u8], usize), GenError> {
62  do_gen!(x, gen_slice!(NULL.as_bytes()))
63}
64
65fn gen_double<'a>(
66  mut x: (&'a mut [u8], usize),
67  data: &f64,
68  attributes: &Option<Attributes>,
69) -> Result<(&'a mut [u8], usize), GenError> {
70  encode_attributes!(x, attributes);
71
72  let as_string = resp3_utils::f64_to_redis_string(data);
73  do_gen!(
74    x,
75    gen_be_u8!(FrameKind::Double.to_byte()) >> gen_slice!(as_string.as_bytes()) >> gen_slice!(CRLF.as_bytes())
76  )
77}
78
79fn gen_boolean<'a>(
80  mut x: (&'a mut [u8], usize),
81  data: &bool,
82  attributes: &Option<Attributes>,
83) -> Result<(&'a mut [u8], usize), GenError> {
84  encode_attributes!(x, attributes);
85
86  let data = if *data { BOOL_TRUE_BYTES } else { BOOL_FALSE_BYTES };
87  do_gen!(x, gen_slice!(data.as_bytes()))
88}
89
90fn gen_bignumber<'a>(
91  mut x: (&'a mut [u8], usize),
92  data: &[u8],
93  attributes: &Option<Attributes>,
94) -> Result<(&'a mut [u8], usize), GenError> {
95  encode_attributes!(x, attributes);
96
97  do_gen!(
98    x,
99    gen_be_u8!(FrameKind::BigNumber.to_byte()) >> gen_slice!(data) >> gen_slice!(CRLF.as_bytes())
100  )
101}
102
103fn gen_blobstring<'a>(
104  mut x: (&'a mut [u8], usize),
105  data: &[u8],
106  attributes: &Option<Attributes>,
107) -> Result<(&'a mut [u8], usize), GenError> {
108  encode_attributes!(x, attributes);
109
110  do_gen!(
111    x,
112    gen_be_u8!(FrameKind::BlobString.to_byte())
113      >> gen_slice!(data.len().to_string().as_bytes())
114      >> gen_slice!(CRLF.as_bytes())
115      >> gen_slice!(data)
116      >> gen_slice!(CRLF.as_bytes())
117  )
118}
119
120fn gen_bloberror<'a>(
121  mut x: (&'a mut [u8], usize),
122  data: &[u8],
123  attributes: &Option<Attributes>,
124) -> Result<(&'a mut [u8], usize), GenError> {
125  encode_attributes!(x, attributes);
126
127  do_gen!(
128    x,
129    gen_be_u8!(FrameKind::BlobError.to_byte())
130      >> gen_slice!(data.len().to_string().as_bytes())
131      >> gen_slice!(CRLF.as_bytes())
132      >> gen_slice!(data)
133      >> gen_slice!(CRLF.as_bytes())
134  )
135}
136
137fn gen_verbatimstring<'a>(
138  mut x: (&'a mut [u8], usize),
139  data: &[u8],
140  format: &VerbatimStringFormat,
141  attributes: &Option<Attributes>,
142) -> Result<(&'a mut [u8], usize), GenError> {
143  encode_attributes!(x, attributes);
144  let total_len = format.encode_len() + data.len();
145
146  do_gen!(
147    x,
148    gen_be_u8!(FrameKind::VerbatimString.to_byte())
149      >> gen_slice!(total_len.to_string().as_bytes())
150      >> gen_slice!(CRLF.as_bytes())
151      >> gen_slice!(format.to_str().as_bytes())
152      >> gen_be_u8!(VERBATIM_FORMAT_BYTE)
153      >> gen_slice!(data)
154      >> gen_slice!(CRLF.as_bytes())
155  )
156}
157
158fn gen_array<'a>(
159  mut x: (&'a mut [u8], usize),
160  data: &Vec<Frame>,
161  attributes: &Option<Attributes>,
162) -> Result<(&'a mut [u8], usize), GenError> {
163  encode_attributes!(x, attributes);
164
165  let mut x = do_gen!(
166    x,
167    gen_be_u8!(FrameKind::Array.to_byte())
168      >> gen_slice!(data.len().to_string().as_bytes())
169      >> gen_slice!(CRLF.as_bytes())
170  )?;
171
172  for frame in data.iter() {
173    x = attempt_encoding(x.0, x.1, frame)?;
174  }
175
176  Ok(x)
177}
178
179fn gen_map<'a>(
180  mut x: (&'a mut [u8], usize),
181  data: &FrameMap,
182  attributes: &Option<Attributes>,
183) -> Result<(&'a mut [u8], usize), GenError> {
184  encode_attributes!(x, attributes);
185
186  x = do_gen!(
187    x,
188    gen_be_u8!(FrameKind::Map.to_byte())
189      >> gen_slice!(data.len().to_string().as_bytes())
190      >> gen_slice!(CRLF.as_bytes())
191  )?;
192
193  for (key, value) in data.iter() {
194    x = attempt_encoding(x.0, x.1, key)?;
195    x = attempt_encoding(x.0, x.1, value)?;
196  }
197
198  Ok(x)
199}
200
201fn gen_set<'a>(
202  mut x: (&'a mut [u8], usize),
203  data: &FrameSet,
204  attributes: &Option<Attributes>,
205) -> Result<(&'a mut [u8], usize), GenError> {
206  encode_attributes!(x, attributes);
207
208  x = do_gen!(
209    x,
210    gen_be_u8!(FrameKind::Set.to_byte())
211      >> gen_slice!(data.len().to_string().as_bytes())
212      >> gen_slice!(CRLF.as_bytes())
213  )?;
214
215  for frame in data.iter() {
216    x = attempt_encoding(x.0, x.1, frame)?;
217  }
218
219  Ok(x)
220}
221
222fn gen_attribute<'a>(x: (&'a mut [u8], usize), data: &FrameMap) -> Result<(&'a mut [u8], usize), GenError> {
223  let mut x = do_gen!(
224    x,
225    gen_be_u8!(FrameKind::Attribute.to_byte())
226      >> gen_slice!(data.len().to_string().as_bytes())
227      >> gen_slice!(CRLF.as_bytes())
228  )?;
229
230  for (key, value) in data.iter() {
231    x = attempt_encoding(x.0, x.1, key)?;
232    x = attempt_encoding(x.0, x.1, value)?;
233  }
234
235  Ok(x)
236}
237
238fn gen_push<'a>(
239  mut x: (&'a mut [u8], usize),
240  data: &Vec<Frame>,
241  attributes: &Option<Attributes>,
242) -> Result<(&'a mut [u8], usize), GenError> {
243  encode_attributes!(x, attributes);
244
245  x = do_gen!(
246    x,
247    gen_be_u8!(FrameKind::Push.to_byte())
248      >> gen_slice!(data.len().to_string().as_bytes())
249      >> gen_slice!(CRLF.as_bytes())
250  )?;
251
252  for frame in data.iter() {
253    x = attempt_encoding(x.0, x.1, frame)?;
254  }
255
256  Ok(x)
257}
258
259fn gen_hello<'a>(
260  x: (&'a mut [u8], usize),
261  version: &RespVersion,
262  auth: &Option<Auth>,
263) -> Result<(&'a mut [u8], usize), GenError> {
264  let mut x = do_gen!(
265    x,
266    gen_slice!(HELLO.as_bytes()) >> gen_slice!(EMPTY_SPACE.as_bytes()) >> gen_be_u8!(version.to_byte())
267  )?;
268
269  if let Some(ref auth) = *auth {
270    x = do_gen!(
271      x,
272      gen_slice!(EMPTY_SPACE.as_bytes())
273        >> gen_slice!(AUTH.as_bytes())
274        >> gen_slice!(EMPTY_SPACE.as_bytes())
275        >> gen_slice!(auth.username.as_bytes())
276        >> gen_slice!(EMPTY_SPACE.as_bytes())
277        >> gen_slice!(auth.password.as_bytes())
278    )?;
279  }
280
281  let x = do_gen!(x, gen_slice!(CRLF.as_bytes()))?;
282  Ok(x)
283}
284
285fn gen_chunked_string<'a>(x: (&'a mut [u8], usize), data: &[u8]) -> Result<(&'a mut [u8], usize), GenError> {
286  if data.is_empty() {
287    // signal the end of the chunked stream
288    do_gen!(x, gen_slice!(END_STREAM_STRING_BYTES.as_bytes()))
289  } else {
290    do_gen!(
291      x,
292      gen_be_u8!(FrameKind::ChunkedString.to_byte())
293        >> gen_slice!(data.len().to_string().as_bytes())
294        >> gen_slice!(CRLF.as_bytes())
295        >> gen_slice!(data)
296        >> gen_slice!(CRLF.as_bytes())
297    )
298  }
299}
300
301fn attempt_encoding<'a>(buf: &'a mut [u8], offset: usize, frame: &Frame) -> Result<(&'a mut [u8], usize), GenError> {
302  use crate::resp3::types::Frame::*;
303
304  let x = (buf, offset);
305  let total_size = resp3_utils::encode_len(frame)?;
306  trace!("Attempting to encode {:?} with total size {}", frame.kind(), total_size);
307  encode_checks!(x, total_size);
308
309  match *frame {
310    Array {
311      ref data,
312      ref attributes,
313    } => gen_array(x, data, attributes),
314    BlobString {
315      ref data,
316      ref attributes,
317    } => gen_blobstring(x, data, attributes),
318    SimpleString {
319      ref data,
320      ref attributes,
321    } => gen_simplestring(x, data, attributes),
322    SimpleError {
323      ref data,
324      ref attributes,
325    } => gen_simpleerror(x, data, attributes),
326    Number {
327      ref data,
328      ref attributes,
329    } => gen_number(x, data, attributes),
330    Null => gen_null(x),
331    Double {
332      ref data,
333      ref attributes,
334    } => gen_double(x, data, attributes),
335    BlobError {
336      ref data,
337      ref attributes,
338    } => gen_bloberror(x, data, attributes),
339    VerbatimString {
340      ref data,
341      ref format,
342      ref attributes,
343    } => gen_verbatimstring(x, data, format, attributes),
344    Boolean {
345      ref data,
346      ref attributes,
347    } => gen_boolean(x, data, attributes),
348    Map {
349      ref data,
350      ref attributes,
351    } => gen_map(x, data, attributes),
352    Set {
353      ref data,
354      ref attributes,
355    } => gen_set(x, data, attributes),
356    Push {
357      ref data,
358      ref attributes,
359    } => gen_push(x, data, attributes),
360    Hello { ref version, ref auth } => gen_hello(x, version, auth),
361    BigNumber {
362      ref data,
363      ref attributes,
364    } => gen_bignumber(x, data, attributes),
365    ChunkedString(ref b) => gen_chunked_string(x, b),
366  }
367}
368
369/// Encoding functions for complete frames.
370///
371/// ```edition2018 no_run
372/// # extern crate bytes;
373/// # extern crate tokio;
374///
375/// # use redis_protocol::resp3::encode::complete::*;
376/// # use redis_protocol::resp3::types::{Frame, FrameKind};
377/// # use bytes::BytesMut;
378/// # use tokio::net::TcpStream;
379/// # use redis_protocol::types::RedisProtocolError;
380/// # use tokio::io::AsyncWriteExt;
381/// # use std::convert::TryInto;
382///
383/// async fn example(socket: &mut TcpStream, buf: &mut BytesMut) -> Result<(), RedisProtocolError> {
384///   // in many cases the starting buffer wont be empty, so this example shows how to track the offset as well
385///   let frame = Frame::BlobString { data: "foobarbaz".into(), attributes: None };
386///   let offset = encode_bytes(buf, &frame).expect("Failed to encode frame");
387///   
388///   let _ = socket.write_all(&buf).await.expect("Failed to write to socket");
389///   let _ = buf.split_to(offset);
390///   Ok(())
391/// }
392/// ```
393pub mod complete {
394  use super::*;
395
396  /// Attempt to encode a frame into `buf` at the provided `offset`.
397  ///
398  /// The caller is responsible for extending the buffer if a `RedisProtocolErrorKind::BufferTooSmall` is returned.
399  pub fn encode(buf: &mut [u8], offset: usize, frame: &Frame) -> Result<usize, RedisProtocolError> {
400    attempt_encoding(buf, offset, frame)
401      .map(|(_, amt)| amt)
402      .map_err(|e| e.into())
403  }
404
405  /// Attempt to encode a frame into `buf`, extending the buffer as needed.
406  ///
407  /// Returns the number of bytes encoded.
408  pub fn encode_bytes(buf: &mut BytesMut, frame: &Frame) -> Result<usize, RedisProtocolError> {
409    let offset = buf.len();
410
411    loop {
412      match attempt_encoding(buf, offset, frame) {
413        Ok((_, amt)) => return Ok(amt),
414        Err(GenError::BufferTooSmall(amt)) => utils::zero_extend(buf, amt),
415        Err(e) => return Err(e.into()),
416      }
417    }
418  }
419}
420
421/// Encoding functions for streaming blobs and aggregate types.
422///
423/// This interface is stateful due to the fact that the caller almost always needs to maintain some state outside of these functions. As a result it's up to the caller to manage the starting, middle, and ending state of data streams in order to call the correct functions in this module.
424///
425/// Examples:
426///
427/// ```rust
428/// # extern crate bytes;
429///
430/// use redis_protocol::resp3::encode::streaming::*;
431/// use redis_protocol::resp3::types::{Frame, FrameKind};
432/// use bytes::BytesMut;
433/// use redis_protocol::types::RedisProtocolError;
434///
435/// fn example(buf: &mut BytesMut) -> Result<usize, RedisProtocolError> {
436///   // in many cases the starting buffer wont be empty, so this example shows how to track the offset as well
437///   let mut offset = buf.len();
438///   let frames: Vec<Frame> = vec![1.into(), 2.into()];
439///
440///   offset += extend_while_encoding(buf, |buf| {
441///     encode_start_aggregate_type(buf, offset, &FrameKind::Array)
442///   })?;
443///
444///   for frame in frames.iter() {
445///     offset += extend_while_encoding(buf, |buf| {
446///       encode_aggregate_type_inner_value(buf, offset, frame)
447///     })?;
448///   }
449///
450///   offset += extend_while_encoding(buf, |buf| {
451///     encode_end_aggregate_type(buf, offset)
452///   })?;
453///
454///   println!("New buffer size: {}", offset);
455///   Ok(offset)
456/// }
457/// ```
458///
459/// Or a somewhat more practical example...
460///
461/// ```edition2018 no_run
462/// # #[macro_use]
463/// # extern crate tokio;
464/// # extern crate redis_protocol;
465/// # extern crate bytes;
466///
467/// use redis_protocol::resp3::encode::streaming::*;
468/// use redis_protocol::resp3::types::{Frame, FrameKind};
469/// use redis_protocol::types::RedisProtocolError;
470/// use bytes::BytesMut;
471/// use std::future::Future;
472/// use tokio::net::TcpStream;
473/// use tokio::io::{AsyncWrite, AsyncWriteExt};
474/// use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
475///
476/// async fn write_all(socket: &mut TcpStream, buf: &mut BytesMut) -> Result<usize, RedisProtocolError> {
477///   let len = buf.len();
478///   socket.write_all(&buf).await.expect("Failed to write to socket.");
479///   // we could just clear the buffer here since we use `write_all`, but in many cases callers don't flush the socket on each `write` call.
480///   // in those scenarios the caller should split the buffer based on the result from `write`.
481///   let _ = buf.split_to(len);
482///
483///   Ok(len)
484/// }
485///
486/// /// Start a new array stream, sending frames received from `rx` out to `socket` and ending the stream when `rx` closes.
487/// async fn stream_array(socket: &mut TcpStream, mut rx: UnboundedReceiver<Frame>) -> Result<(), RedisProtocolError> {
488///   let mut buf = BytesMut::new();
489///
490///   let _ = extend_while_encoding(&mut buf, |buf| {
491///     encode_start_aggregate_type(buf, 0, &FrameKind::Array)
492///   })?;
493///   let mut written = write_all(socket, &mut buf).await.expect("Failed to write to socket.");
494///
495///   loop {
496///     let frame = match rx.recv().await {
497///        Some(frame) => frame,
498///        // break out of the loop when the channel closes
499///        None => break
500///     };
501///
502///     let _ = extend_while_encoding(&mut buf, |buf| {
503///       encode_aggregate_type_inner_value(buf, 0, &frame)
504///     })?;
505///     written += write_all(socket, &mut buf).await.expect("Failed to write to socket.");
506///   }
507///
508///   let _ = extend_while_encoding(&mut buf, |buf| {
509///     encode_end_aggregate_type(buf, 0)
510///   })?;
511///   written += write_all(socket, &mut buf).await.expect("Failed to write to socket.");
512///
513///   println!("Streamed {} bytes to the socket.", written);
514///   Ok(())  
515/// }
516///
517/// fn generate_frames(tx: UnboundedSender<Frame>) -> Result<(), RedisProtocolError> {
518///   // read from another socket or somehow generate frames, writing them to `tx`
519///   unimplemented!()
520/// }
521///
522/// #[tokio::main]
523/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
524///   let (tx, rx) = unbounded_channel();
525///   let mut socket = TcpStream::connect("127.0.0.1:6379").await.expect("Failed to connect");
526///   
527///   generate_frames(tx);
528///   stream_array(&mut socket, rx).await.expect("Failed to stream array.");
529///
530///   Ok(())
531/// }
532///
533/// ```
534pub mod streaming {
535  use super::*;
536
537  fn gen_start_streaming_string(x: (&mut [u8], usize)) -> Result<(&mut [u8], usize), GenError> {
538    encode_checks!(x, 4);
539
540    do_gen!(
541      x,
542      gen_be_u8!(BLOB_STRING_BYTE) >> gen_be_u8!(STREAMED_LENGTH_BYTE) >> gen_slice!(CRLF.as_bytes())
543    )
544  }
545
546  fn gen_streaming_string_chunk<'a>(x: (&'a mut [u8], usize), data: &[u8]) -> Result<(&'a mut [u8], usize), GenError> {
547    encode_checks!(x, resp3_utils::blobstring_encode_len(data));
548
549    do_gen!(
550      x,
551      gen_be_u8!(CHUNKED_STRING_BYTE)
552        >> gen_slice!(data.len().to_string().as_bytes())
553        >> gen_slice!(CRLF.as_bytes())
554        >> gen_slice!(data)
555        >> gen_slice!(CRLF.as_bytes())
556    )
557  }
558
559  fn gen_end_streaming_string(x: (&mut [u8], usize)) -> Result<(&mut [u8], usize), GenError> {
560    encode_checks!(x, 4);
561
562    do_gen!(x, gen_slice!(END_STREAM_STRING_BYTES.as_bytes()))
563  }
564
565  fn gen_start_streaming_aggregate_type<'a>(
566    x: (&'a mut [u8], usize),
567    kind: &FrameKind,
568  ) -> Result<(&'a mut [u8], usize), GenError> {
569    if !kind.is_aggregate_type() {
570      return Err(GenError::CustomError(3));
571    }
572    encode_checks!(x, 4);
573
574    do_gen!(
575      x,
576      gen_be_u8!(kind.to_byte()) >> gen_be_u8!(STREAMED_LENGTH_BYTE) >> gen_slice!(CRLF.as_bytes())
577    )
578  }
579
580  fn gen_end_streaming_aggregate_type(x: (&mut [u8], usize)) -> Result<(&mut [u8], usize), GenError> {
581    encode_checks!(x, 3);
582
583    do_gen!(x, gen_slice!(END_STREAM_AGGREGATE_BYTES.as_bytes()))
584  }
585
586  fn gen_streaming_inner_value_frame<'a>(
587    x: (&'a mut [u8], usize),
588    data: &Frame,
589  ) -> Result<(&'a mut [u8], usize), GenError> {
590    attempt_encoding(x.0, x.1, data)
591  }
592
593  fn gen_streaming_inner_kv_pair_frames<'a>(
594    x: (&'a mut [u8], usize),
595    key: &Frame,
596    value: &Frame,
597  ) -> Result<(&'a mut [u8], usize), GenError> {
598    let x = attempt_encoding(x.0, x.1, key)?;
599    attempt_encoding(x.0, x.1, value)
600  }
601
602  /// Encode the starting bytes for a streaming blob string.
603  pub fn encode_start_string(buf: &mut [u8], offset: usize) -> Result<usize, RedisProtocolError> {
604    gen_start_streaming_string((buf, offset))
605      .map(|(_, l)| l)
606      .map_err(|e| e.into())
607  }
608
609  /// Encode the bytes making up one chunk of a streaming blob string.
610  ///
611  /// If `data` is empty this will do the same thing as [encode_end_string] to signal that the streamed string is finished.
612  pub fn encode_string_chunk(buf: &mut [u8], offset: usize, data: &[u8]) -> Result<usize, RedisProtocolError> {
613    gen_streaming_string_chunk((buf, offset), data)
614      .map(|(_, l)| l)
615      .map_err(|e| e.into())
616  }
617
618  /// Encode the terminating bytes at the end of a streaming blob string.
619  pub fn encode_end_string(buf: &mut [u8], offset: usize) -> Result<usize, RedisProtocolError> {
620    gen_end_streaming_string((buf, offset))
621      .map(|(_, l)| l)
622      .map_err(|e| e.into())
623  }
624
625  /// Encode the starting bytes for a streaming aggregate type (array, set, or map).
626  pub fn encode_start_aggregate_type(
627    buf: &mut [u8],
628    offset: usize,
629    kind: &FrameKind,
630  ) -> Result<usize, RedisProtocolError> {
631    gen_start_streaming_aggregate_type((buf, offset), kind)
632      .map(|(_, l)| l)
633      .map_err(|e| e.into())
634  }
635
636  /// Encode the inner frame inside a streamed array or set.
637  ///
638  /// Use [encode_aggregate_type_inner_kv_pair] to encode a key-value pair inside a streaming map.
639  pub fn encode_aggregate_type_inner_value(
640    buf: &mut [u8],
641    offset: usize,
642    data: &Frame,
643  ) -> Result<usize, RedisProtocolError> {
644    gen_streaming_inner_value_frame((buf, offset), data)
645      .map(|(_, l)| l)
646      .map_err(|e| e.into())
647  }
648
649  /// Encode the inner frames that make up a key-value pair in a streamed map.
650  pub fn encode_aggregate_type_inner_kv_pair<'a>(
651    buf: &'a mut [u8],
652    offset: usize,
653    key: &Frame,
654    value: &Frame,
655  ) -> Result<usize, RedisProtocolError> {
656    gen_streaming_inner_kv_pair_frames((buf, offset), key, value)
657      .map(|(_, l)| l)
658      .map_err(|e| e.into())
659  }
660
661  /// Encode the terminating bytes at the end of a streaming aggregate type (array, set, or map).
662  pub fn encode_end_aggregate_type(buf: &mut [u8], offset: usize) -> Result<usize, RedisProtocolError> {
663    gen_end_streaming_aggregate_type((buf, offset))
664      .map(|(_, l)| l)
665      .map_err(|e| e.into())
666  }
667
668  /// A wrapper function for automatically extending the input buffer while encoding frames with a different encoding function.
669  pub fn extend_while_encoding<F>(buf: &mut BytesMut, func: F) -> Result<usize, RedisProtocolError>
670  where
671    F: Fn(&mut BytesMut) -> Result<usize, RedisProtocolError>,
672  {
673    loop {
674      match func(buf) {
675        Ok(amt) => return Ok(amt),
676        Err(err) => match err.kind() {
677          RedisProtocolErrorKind::BufferTooSmall(amt) => utils::zero_extend(buf, *amt),
678          _ => return Err(err),
679        },
680      }
681    }
682  }
683}
684
685#[cfg(test)]
686mod tests {
687  use super::*;
688  use crate::utils::ZEROED_KB;
689  use itertools::Itertools;
690  use std::convert::TryInto;
691  use std::str;
692
693  const PADDING: &'static str = "foobar";
694
695  fn empty_bytes() -> BytesMut {
696    BytesMut::new()
697  }
698
699  fn create_attributes() -> (FrameMap, Vec<u8>) {
700    let mut out = resp3_utils::new_map(None);
701    let key = Frame::SimpleString {
702      data: "foo".into(),
703      attributes: None,
704    };
705    let value = Frame::Number {
706      data: 42,
707      attributes: None,
708    };
709    out.insert(key, value);
710    let encoded = "|1\r\n+foo\r\n:42\r\n".to_owned().into_bytes();
711
712    (out, encoded)
713  }
714
715  fn blobstring_array(data: Vec<&'static str>) -> Frame {
716    let inner: Vec<Frame> = data
717      .into_iter()
718      .map(|s| (FrameKind::BlobString, s).try_into().unwrap())
719      .collect();
720
721    Frame::Array {
722      data: inner,
723      attributes: None,
724    }
725  }
726
727  fn push_frame_to_array(frame: &mut Frame, inner: Frame) {
728    if let Frame::Array { ref mut data, .. } = frame {
729      data.push(inner);
730    }
731  }
732
733  fn unordered_assert_eq(data: BytesMut, expected_start: BytesMut, expected_middle: &[&str]) {
734    let mut exptected_permutations = vec![];
735    for middle_permutation in expected_middle.iter().permutations(expected_middle.len()) {
736      let mut expected = expected_start.clone();
737      for middle in middle_permutation {
738        expected.extend_from_slice(middle.as_bytes())
739      }
740      exptected_permutations.push(expected);
741    }
742
743    assert!(
744      exptected_permutations.contains(&data),
745      "No middle permutations matched: data {:?} needs to match with one of the following {:#?}",
746      data,
747      exptected_permutations
748    );
749  }
750
751  fn encode_and_verify_empty(input: &Frame, expected: &str) {
752    let mut buf = empty_bytes();
753
754    let len = match complete::encode_bytes(&mut buf, input) {
755      Ok(l) => l,
756      Err(e) => panic!("{:?}", e),
757    };
758
759    assert_eq!(
760      buf,
761      expected.as_bytes(),
762      "empty buf contents match {:?} == {:?}",
763      str::from_utf8(&buf),
764      expected
765    );
766    assert_eq!(len, expected.as_bytes().len(), "empty expected len is correct");
767  }
768
769  fn encode_and_verify_empty_unordered(input: &Frame, expected_start: &str, expected_middle: &[&str]) {
770    let mut buf = empty_bytes();
771
772    let len = complete::encode_bytes(&mut buf, input).unwrap();
773
774    unordered_assert_eq(buf, BytesMut::from(expected_start.as_bytes()), expected_middle);
775
776    let expected_middle_len: usize = expected_middle.iter().map(|x| x.as_bytes().len()).sum();
777    assert_eq!(
778      len,
779      expected_start.as_bytes().len() + expected_middle_len,
780      "empty expected len is correct"
781    );
782  }
783
784  fn encode_and_verify_non_empty(input: &Frame, expected: &str) {
785    let mut buf = empty_bytes();
786    buf.extend_from_slice(PADDING.as_bytes());
787
788    let len = complete::encode_bytes(&mut buf, input).unwrap();
789    let padded = vec![PADDING, expected].join("");
790
791    assert_eq!(
792      buf,
793      padded.as_bytes(),
794      "padded buf contents match {:?} == {:?}",
795      str::from_utf8(&buf),
796      expected
797    );
798    assert_eq!(len, padded.as_bytes().len(), "padded expected len is correct");
799  }
800
801  fn encode_and_verify_non_empty_unordered(input: &Frame, expected_start: &str, expected_middle: &[&str]) {
802    let mut buf = empty_bytes();
803    buf.extend_from_slice(PADDING.as_bytes());
804
805    let len = complete::encode_bytes(&mut buf, input).unwrap();
806    let expected_start_padded = vec![PADDING, expected_start].join("");
807
808    unordered_assert_eq(buf, BytesMut::from(expected_start_padded.as_bytes()), expected_middle);
809
810    let expected_middle_len: usize = expected_middle.iter().map(|x| x.as_bytes().len()).sum();
811    assert_eq!(
812      len,
813      expected_start_padded.as_bytes().len() + expected_middle_len,
814      "padded expected len is correct"
815    );
816  }
817
818  fn encode_raw_and_verify_empty(input: &Frame, expected: &str) {
819    let mut buf = Vec::from(&ZEROED_KB[0..expected.as_bytes().len()]);
820
821    let len = match complete::encode(&mut buf, 0, input) {
822      Ok(l) => l,
823      Err(e) => panic!("{:?}", e),
824    };
825
826    assert_eq!(
827      buf,
828      expected.as_bytes(),
829      "empty buf contents match {:?} == {:?}",
830      str::from_utf8(&buf),
831      expected
832    );
833    assert_eq!(len, expected.as_bytes().len(), "empty expected len is correct");
834  }
835
836  fn encode_and_verify_empty_with_attributes(input: &Frame, expected: &str) {
837    let (attributes, encoded_attributes) = create_attributes();
838    let mut frame = input.clone();
839    let _ = frame.add_attributes(attributes).unwrap();
840    let mut buf = empty_bytes();
841
842    let len = complete::encode_bytes(&mut buf, &frame).unwrap();
843
844    let mut expected_bytes = empty_bytes();
845    expected_bytes.extend_from_slice(&encoded_attributes);
846    expected_bytes.extend_from_slice(expected.as_bytes());
847
848    assert_eq!(buf, expected_bytes, "non empty buf contents match with attrs");
849    assert_eq!(
850      len,
851      expected.as_bytes().len() + encoded_attributes.len(),
852      "non empty expected len is correct with attrs"
853    );
854  }
855
856  fn encode_and_verify_empty_with_attributes_unordered(input: &Frame, expected_start: &str, expected_middle: &[&str]) {
857    let (attributes, encoded_attributes) = create_attributes();
858    let mut frame = input.clone();
859    let _ = frame.add_attributes(attributes).unwrap();
860    let mut buf = empty_bytes();
861
862    let len = complete::encode_bytes(&mut buf, &frame).unwrap();
863
864    let mut expected_start_bytes = empty_bytes();
865    expected_start_bytes.extend_from_slice(&encoded_attributes);
866    expected_start_bytes.extend_from_slice(expected_start.as_bytes());
867    unordered_assert_eq(buf, expected_start_bytes, expected_middle);
868
869    let expected_middle_len: usize = expected_middle.iter().map(|x| x.as_bytes().len()).sum();
870    assert_eq!(
871      len,
872      expected_start.as_bytes().len() + expected_middle_len + encoded_attributes.len(),
873      "non empty expected len is correct with attrs"
874    );
875  }
876
877  fn encode_and_verify_non_empty_with_attributes(input: &Frame, expected: &str) {
878    let (attributes, encoded_attributes) = create_attributes();
879    let mut frame = input.clone();
880    let _ = frame.add_attributes(attributes).unwrap();
881
882    let mut buf = empty_bytes();
883    buf.extend_from_slice(PADDING.as_bytes());
884
885    let len = complete::encode_bytes(&mut buf, &frame).unwrap();
886    let mut expected_bytes = empty_bytes();
887    expected_bytes.extend_from_slice(PADDING.as_bytes());
888    expected_bytes.extend_from_slice(&encoded_attributes);
889    expected_bytes.extend_from_slice(expected.as_bytes());
890
891    assert_eq!(buf, expected_bytes, "empty buf contents match with attrs");
892    assert_eq!(
893      len,
894      expected.as_bytes().len() + encoded_attributes.len() + PADDING.as_bytes().len(),
895      "empty expected len is correct with attrs"
896    );
897  }
898
899  fn encode_and_verify_non_empty_with_attributes_unordered(
900    input: &Frame,
901    expected_start: &str,
902    expected_middle: &[&str],
903  ) {
904    let (attributes, encoded_attributes) = create_attributes();
905    let mut frame = input.clone();
906    let _ = frame.add_attributes(attributes).unwrap();
907
908    let mut buf = empty_bytes();
909    buf.extend_from_slice(PADDING.as_bytes());
910
911    let len = complete::encode_bytes(&mut buf, &frame).unwrap();
912    let mut expected_start_bytes = empty_bytes();
913    expected_start_bytes.extend_from_slice(PADDING.as_bytes());
914    expected_start_bytes.extend_from_slice(&encoded_attributes);
915    expected_start_bytes.extend_from_slice(expected_start.as_bytes());
916    unordered_assert_eq(buf, expected_start_bytes, expected_middle);
917
918    let expected_middle_len: usize = expected_middle.iter().map(|x| x.as_bytes().len()).sum();
919    assert_eq!(
920      len,
921      expected_start.as_bytes().len() + expected_middle_len + encoded_attributes.len() + PADDING.as_bytes().len(),
922      "empty expected len is correct with attrs"
923    );
924  }
925
926  // ------------- tests adapted from RESP2 --------------------------
927
928  #[test]
929  fn should_encode_llen_req_example() {
930    let expected = "*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n";
931    let input = blobstring_array(vec!["LLEN", "mylist"]);
932
933    encode_and_verify_empty(&input, expected);
934    encode_and_verify_non_empty(&input, expected);
935    encode_and_verify_empty_with_attributes(&input, expected);
936    encode_and_verify_non_empty_with_attributes(&input, expected);
937  }
938
939  #[test]
940  fn should_encode_incr_req_example() {
941    let expected = "*2\r\n$4\r\nINCR\r\n$5\r\nmykey\r\n";
942    let input = blobstring_array(vec!["INCR", "mykey"]);
943
944    encode_and_verify_empty(&input, expected);
945    encode_and_verify_non_empty(&input, expected);
946    encode_and_verify_empty_with_attributes(&input, expected);
947    encode_and_verify_non_empty_with_attributes(&input, expected);
948  }
949
950  #[test]
951  fn should_encode_bitcount_req_example() {
952    let expected = "*2\r\n$8\r\nBITCOUNT\r\n$5\r\nmykey\r\n";
953    let input = blobstring_array(vec!["BITCOUNT", "mykey"]);
954
955    encode_and_verify_empty(&input, expected);
956    encode_and_verify_non_empty(&input, expected);
957    encode_and_verify_empty_with_attributes(&input, expected);
958    encode_and_verify_non_empty_with_attributes(&input, expected);
959  }
960
961  #[test]
962  fn should_encode_array_bulk_string_test() {
963    let expected = "*3\r\n$5\r\nWATCH\r\n$6\r\nWIBBLE\r\n$9\r\nfooBARbaz\r\n";
964    let input = blobstring_array(vec!["WATCH", "WIBBLE", "fooBARbaz"]);
965
966    encode_and_verify_empty(&input, expected);
967    encode_and_verify_non_empty(&input, expected);
968    encode_and_verify_empty_with_attributes(&input, expected);
969    encode_and_verify_non_empty_with_attributes(&input, expected);
970  }
971
972  #[test]
973  fn should_encode_array_null_test() {
974    let expected = "*3\r\n$4\r\nHSET\r\n$3\r\nfoo\r\n_\r\n";
975    let mut input = blobstring_array(vec!["HSET", "foo"]);
976    push_frame_to_array(&mut input, Frame::Null);
977
978    encode_and_verify_empty(&input, expected);
979    encode_and_verify_non_empty(&input, expected);
980  }
981
982  #[test]
983  fn should_encode_raw_llen_req_example() {
984    let expected = "*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n";
985    let input = blobstring_array(vec!["LLEN", "mylist"]);
986
987    encode_raw_and_verify_empty(&input, expected);
988  }
989
990  #[test]
991  fn should_encode_raw_incr_req_example() {
992    let expected = "*2\r\n$4\r\nINCR\r\n$5\r\nmykey\r\n";
993    let input = blobstring_array(vec!["INCR", "mykey"]);
994
995    encode_raw_and_verify_empty(&input, expected);
996  }
997
998  #[test]
999  fn should_encode_raw_bitcount_req_example() {
1000    let expected = "*2\r\n$8\r\nBITCOUNT\r\n$5\r\nmykey\r\n";
1001    let input = blobstring_array(vec!["BITCOUNT", "mykey"]);
1002
1003    encode_raw_and_verify_empty(&input, expected);
1004  }
1005
1006  #[test]
1007  fn should_encode_raw_array_bulk_string_test() {
1008    let expected = "*3\r\n$5\r\nWATCH\r\n$6\r\nWIBBLE\r\n$9\r\nfooBARbaz\r\n";
1009    let input = blobstring_array(vec!["WATCH", "WIBBLE", "fooBARbaz"]);
1010
1011    encode_raw_and_verify_empty(&input, expected);
1012  }
1013
1014  #[test]
1015  fn should_encode_raw_array_null_test() {
1016    let expected = "*3\r\n$4\r\nHSET\r\n$3\r\nfoo\r\n_\r\n";
1017    let mut input = blobstring_array(vec!["HSET", "foo"]);
1018    push_frame_to_array(&mut input, Frame::Null);
1019
1020    encode_raw_and_verify_empty(&input, expected);
1021  }
1022
1023  #[test]
1024  fn should_encode_moved_error() {
1025    let expected = "-MOVED 3999 127.0.0.1:6381\r\n";
1026    let input = (FrameKind::SimpleError, "MOVED 3999 127.0.0.1:6381")
1027      .try_into()
1028      .unwrap();
1029
1030    encode_and_verify_empty(&input, expected);
1031    encode_and_verify_non_empty(&input, expected);
1032    encode_and_verify_empty_with_attributes(&input, expected);
1033    encode_and_verify_non_empty_with_attributes(&input, expected);
1034  }
1035
1036  #[test]
1037  fn should_encode_ask_error() {
1038    let expected = "-ASK 3999 127.0.0.1:6381\r\n";
1039    let input = (FrameKind::SimpleError, "ASK 3999 127.0.0.1:6381").try_into().unwrap();
1040
1041    encode_and_verify_empty(&input, expected);
1042    encode_and_verify_non_empty(&input, expected);
1043    encode_and_verify_empty_with_attributes(&input, expected);
1044    encode_and_verify_non_empty_with_attributes(&input, expected);
1045  }
1046
1047  #[test]
1048  fn should_encode_error() {
1049    let expected = "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n";
1050    let input = (
1051      FrameKind::SimpleError,
1052      "WRONGTYPE Operation against a key holding the wrong kind of value",
1053    )
1054      .try_into()
1055      .unwrap();
1056
1057    encode_and_verify_empty(&input, expected);
1058    encode_and_verify_non_empty(&input, expected);
1059    encode_and_verify_empty_with_attributes(&input, expected);
1060    encode_and_verify_non_empty_with_attributes(&input, expected);
1061  }
1062
1063  #[test]
1064  fn should_encode_simplestring() {
1065    let expected = "+OK\r\n";
1066    let input = (FrameKind::SimpleString, "OK").try_into().unwrap();
1067
1068    encode_and_verify_empty(&input, expected);
1069    encode_and_verify_non_empty(&input, expected);
1070    encode_and_verify_empty_with_attributes(&input, expected);
1071    encode_and_verify_non_empty_with_attributes(&input, expected);
1072  }
1073
1074  #[test]
1075  fn should_encode_number() {
1076    let expected = ":1000\r\n";
1077    let input: Frame = 1000.into();
1078
1079    encode_and_verify_empty(&input, expected);
1080    encode_and_verify_non_empty(&input, expected);
1081    encode_and_verify_empty_with_attributes(&input, expected);
1082    encode_and_verify_non_empty_with_attributes(&input, expected);
1083  }
1084
1085  #[test]
1086  fn should_encode_negative_number() {
1087    let expected = ":-1000\r\n";
1088    let input: Frame = (-1000 as i64).into();
1089
1090    encode_and_verify_empty(&input, expected);
1091    encode_and_verify_non_empty(&input, expected);
1092    encode_and_verify_empty_with_attributes(&input, expected);
1093    encode_and_verify_non_empty_with_attributes(&input, expected);
1094  }
1095
1096  // ------------- end tests adapted from RESP2 --------------------------
1097
1098  #[test]
1099  fn should_encode_bool_true() {
1100    let expected = BOOL_TRUE_BYTES;
1101    let input: Frame = true.into();
1102
1103    encode_and_verify_empty(&input, expected);
1104    encode_and_verify_non_empty(&input, expected);
1105    encode_and_verify_empty_with_attributes(&input, expected);
1106    encode_and_verify_non_empty_with_attributes(&input, expected);
1107  }
1108
1109  #[test]
1110  fn should_encode_bool_false() {
1111    let expected = BOOL_FALSE_BYTES;
1112    let input: Frame = false.into();
1113
1114    encode_and_verify_empty(&input, expected);
1115    encode_and_verify_non_empty(&input, expected);
1116    encode_and_verify_empty_with_attributes(&input, expected);
1117    encode_and_verify_non_empty_with_attributes(&input, expected);
1118  }
1119
1120  #[test]
1121  fn should_encode_double_positive() {
1122    let expected = ",12.34567\r\n";
1123    let input: Frame = 12.34567.try_into().unwrap();
1124
1125    encode_and_verify_empty(&input, expected);
1126    encode_and_verify_non_empty(&input, expected);
1127    encode_and_verify_empty_with_attributes(&input, expected);
1128    encode_and_verify_non_empty_with_attributes(&input, expected);
1129  }
1130
1131  #[test]
1132  fn should_encode_double_negative() {
1133    let expected = ",-12.34567\r\n";
1134    let input: Frame = (-12.34567).try_into().unwrap();
1135
1136    encode_and_verify_empty(&input, expected);
1137    encode_and_verify_non_empty(&input, expected);
1138    encode_and_verify_empty_with_attributes(&input, expected);
1139    encode_and_verify_non_empty_with_attributes(&input, expected);
1140  }
1141
1142  #[test]
1143  #[should_panic]
1144  fn should_not_encode_double_nan() {
1145    // force NaN into the frame avoiding try_into, which also checks for NaN
1146    let input = Frame::Double {
1147      data: f64::NAN,
1148      attributes: None,
1149    };
1150    let mut buf = empty_bytes();
1151    let _ = complete::encode_bytes(&mut buf, &input).unwrap();
1152  }
1153
1154  #[test]
1155  fn should_encode_double_inf() {
1156    let expected = ",inf\r\n";
1157    let input: Frame = f64::INFINITY.try_into().unwrap();
1158
1159    encode_and_verify_empty(&input, expected);
1160    encode_and_verify_non_empty(&input, expected);
1161    encode_and_verify_empty_with_attributes(&input, expected);
1162    encode_and_verify_non_empty_with_attributes(&input, expected);
1163  }
1164
1165  #[test]
1166  fn should_encode_double_neg_inf() {
1167    let expected = ",-inf\r\n";
1168    let input: Frame = f64::NEG_INFINITY.try_into().unwrap();
1169
1170    encode_and_verify_empty(&input, expected);
1171    encode_and_verify_non_empty(&input, expected);
1172    encode_and_verify_empty_with_attributes(&input, expected);
1173    encode_and_verify_non_empty_with_attributes(&input, expected);
1174  }
1175
1176  #[test]
1177  fn should_encode_bignumber() {
1178    let expected = "(3492890328409238509324850943850943825024385\r\n";
1179    let input: Frame = (
1180      FrameKind::BigNumber,
1181      "3492890328409238509324850943850943825024385".as_bytes().to_vec(),
1182    )
1183      .try_into()
1184      .unwrap();
1185
1186    encode_and_verify_empty(&input, expected);
1187    encode_and_verify_non_empty(&input, expected);
1188    encode_and_verify_empty_with_attributes(&input, expected);
1189    encode_and_verify_non_empty_with_attributes(&input, expected);
1190  }
1191
1192  #[test]
1193  fn should_encode_null() {
1194    let expected = "_\r\n";
1195    let input = Frame::Null;
1196
1197    encode_and_verify_empty(&input, expected);
1198    encode_and_verify_non_empty(&input, expected);
1199  }
1200
1201  #[test]
1202  fn should_encode_blobstring() {
1203    let expected = "$9\r\nfoobarbaz\r\n";
1204    let input: Frame = (FrameKind::BlobString, "foobarbaz").try_into().unwrap();
1205
1206    encode_and_verify_empty(&input, expected);
1207    encode_and_verify_non_empty(&input, expected);
1208    encode_and_verify_empty_with_attributes(&input, expected);
1209    encode_and_verify_non_empty_with_attributes(&input, expected);
1210  }
1211
1212  #[test]
1213  fn should_encode_bloberror() {
1214    let expected = "!21\r\nSYNTAX invalid syntax\r\n";
1215    let input: Frame = (FrameKind::BlobError, "SYNTAX invalid syntax").try_into().unwrap();
1216
1217    encode_and_verify_empty(&input, expected);
1218    encode_and_verify_non_empty(&input, expected);
1219    encode_and_verify_empty_with_attributes(&input, expected);
1220    encode_and_verify_non_empty_with_attributes(&input, expected);
1221  }
1222
1223  #[test]
1224  fn should_encode_verbatimstring_txt() {
1225    let expected = "=15\r\ntxt:Some string\r\n";
1226    let input = Frame::VerbatimString {
1227      format: VerbatimStringFormat::Text,
1228      data: "Some string".as_bytes().into(),
1229      attributes: None,
1230    };
1231
1232    encode_and_verify_empty(&input, expected);
1233    encode_and_verify_non_empty(&input, expected);
1234    encode_and_verify_empty_with_attributes(&input, expected);
1235    encode_and_verify_non_empty_with_attributes(&input, expected);
1236  }
1237
1238  #[test]
1239  fn should_encode_verbatimstring_mkd() {
1240    let expected = "=15\r\nmkd:Some string\r\n";
1241    let input = Frame::VerbatimString {
1242      format: VerbatimStringFormat::Markdown,
1243      data: "Some string".as_bytes().into(),
1244      attributes: None,
1245    };
1246
1247    encode_and_verify_empty(&input, expected);
1248    encode_and_verify_non_empty(&input, expected);
1249    encode_and_verify_empty_with_attributes(&input, expected);
1250    encode_and_verify_non_empty_with_attributes(&input, expected);
1251  }
1252
1253  #[test]
1254  fn should_encode_push_pubsub() {
1255    let expected = ">4\r\n+pubsub\r\n+message\r\n+somechannel\r\n+this is the message\r\n";
1256    let input = Frame::Push {
1257      data: vec![
1258        (FrameKind::SimpleString, "pubsub").try_into().unwrap(),
1259        (FrameKind::SimpleString, "message").try_into().unwrap(),
1260        (FrameKind::SimpleString, "somechannel").try_into().unwrap(),
1261        (FrameKind::SimpleString, "this is the message").try_into().unwrap(),
1262      ],
1263      attributes: None,
1264    };
1265
1266    assert!(input.is_pubsub_message());
1267    encode_and_verify_empty(&input, expected);
1268    encode_and_verify_non_empty(&input, expected);
1269    encode_and_verify_empty_with_attributes(&input, expected);
1270    encode_and_verify_non_empty_with_attributes(&input, expected);
1271  }
1272
1273  #[test]
1274  fn should_encode_push_keyspace_event() {
1275    let expected = ">4\r\n+pubsub\r\n+message\r\n+__keyspace@0__:mykey\r\n+del\r\n";
1276    let input = Frame::Push {
1277      data: vec![
1278        (FrameKind::SimpleString, "pubsub").try_into().unwrap(),
1279        (FrameKind::SimpleString, "message").try_into().unwrap(),
1280        (FrameKind::SimpleString, "__keyspace@0__:mykey").try_into().unwrap(),
1281        (FrameKind::SimpleString, "del").try_into().unwrap(),
1282      ],
1283      attributes: None,
1284    };
1285
1286    assert!(input.is_pubsub_message());
1287    encode_and_verify_empty(&input, expected);
1288    encode_and_verify_non_empty(&input, expected);
1289    encode_and_verify_empty_with_attributes(&input, expected);
1290    encode_and_verify_non_empty_with_attributes(&input, expected);
1291  }
1292
1293  #[test]
1294  fn should_encode_simple_set() {
1295    let expected_start = "~5\r\n";
1296    let expected_middle = ["+orange\r\n", "+apple\r\n", "#t\r\n", ":100\r\n", ":999\r\n"];
1297    let mut inner = resp3_utils::new_set(None);
1298    let v1: Frame = (FrameKind::SimpleString, "orange").try_into().unwrap();
1299    let v2: Frame = (FrameKind::SimpleString, "apple").try_into().unwrap();
1300    let v3: Frame = true.into();
1301    let v4: Frame = 100.into();
1302    let v5: Frame = 999.into();
1303
1304    inner.insert(v1);
1305    inner.insert(v2);
1306    inner.insert(v3);
1307    inner.insert(v4);
1308    inner.insert(v5);
1309    let input = Frame::Set {
1310      data: inner,
1311      attributes: None,
1312    };
1313
1314    encode_and_verify_empty_unordered(&input, expected_start, &expected_middle);
1315    encode_and_verify_non_empty_unordered(&input, expected_start, &expected_middle);
1316    encode_and_verify_empty_with_attributes_unordered(&input, expected_start, &expected_middle);
1317    encode_and_verify_non_empty_with_attributes_unordered(&input, expected_start, &expected_middle);
1318  }
1319
1320  #[test]
1321  fn should_encode_simple_map() {
1322    let expected_start = "%2\r\n";
1323    let expected_middle = ["+first\r\n:1\r\n", "+second\r\n:2\r\n"];
1324    let mut inner = resp3_utils::new_map(None);
1325    let k1: Frame = (FrameKind::SimpleString, "first").try_into().unwrap();
1326    let v1: Frame = 1.into();
1327    let k2: Frame = (FrameKind::SimpleString, "second").try_into().unwrap();
1328    let v2: Frame = 2.into();
1329
1330    inner.insert(k1, v1);
1331    inner.insert(k2, v2);
1332    let input = Frame::Map {
1333      data: inner,
1334      attributes: None,
1335    };
1336
1337    encode_and_verify_empty_unordered(&input, expected_start, &expected_middle);
1338    encode_and_verify_non_empty_unordered(&input, expected_start, &expected_middle);
1339    encode_and_verify_empty_with_attributes_unordered(&input, expected_start, &expected_middle);
1340    encode_and_verify_non_empty_with_attributes_unordered(&input, expected_start, &expected_middle);
1341  }
1342
1343  #[test]
1344  fn should_encode_nested_map() {
1345    let expected_start = "%2\r\n";
1346    let expected_middle = ["+first\r\n:1\r\n", "+second\r\n%1\r\n+third\r\n:3\r\n"];
1347    let mut inner = resp3_utils::new_map(None);
1348    let k1: Frame = (FrameKind::SimpleString, "first").try_into().unwrap();
1349    let v1: Frame = 1.into();
1350    let k2: Frame = (FrameKind::SimpleString, "second").try_into().unwrap();
1351    let k3: Frame = (FrameKind::SimpleString, "third").try_into().unwrap();
1352    let v3: Frame = 3.into();
1353
1354    let mut v2_inner = resp3_utils::new_map(None);
1355    v2_inner.insert(k3, v3);
1356    let v2 = Frame::Map {
1357      data: v2_inner,
1358      attributes: None,
1359    };
1360
1361    inner.insert(k1, v1);
1362    inner.insert(k2, v2);
1363    let input = Frame::Map {
1364      data: inner,
1365      attributes: None,
1366    };
1367
1368    encode_and_verify_empty_unordered(&input, expected_start, &expected_middle);
1369    encode_and_verify_non_empty_unordered(&input, expected_start, &expected_middle);
1370    encode_and_verify_empty_with_attributes_unordered(&input, expected_start, &expected_middle);
1371    encode_and_verify_non_empty_with_attributes_unordered(&input, expected_start, &expected_middle);
1372  }
1373
1374  #[test]
1375  fn should_encode_hello() {
1376    let expected = "HELLO 3\r\n";
1377    let input = Frame::Hello {
1378      version: RespVersion::RESP3,
1379      auth: None,
1380    };
1381
1382    encode_and_verify_empty(&input, expected);
1383    encode_and_verify_non_empty(&input, expected);
1384
1385    let expected = "HELLO 2\r\n";
1386    let input = Frame::Hello {
1387      version: RespVersion::RESP2,
1388      auth: None,
1389    };
1390
1391    encode_and_verify_empty(&input, expected);
1392    encode_and_verify_non_empty(&input, expected);
1393  }
1394
1395  #[test]
1396  fn should_encode_hello_with_auth() {
1397    let expected = "HELLO 3 AUTH default mypassword\r\n";
1398    let input = Frame::Hello {
1399      version: RespVersion::RESP3,
1400      auth: Some(Auth {
1401        username: "default".into(),
1402        password: "mypassword".into(),
1403      }),
1404    };
1405
1406    encode_and_verify_empty(&input, expected);
1407    encode_and_verify_non_empty(&input, expected);
1408  }
1409
1410  #[test]
1411  fn should_encode_streaming_blobstring() {
1412    let expected = "$?\r\n;2\r\nhe\r\n;4\r\nllow\r\n;1\r\no\r\n;3\r\nrld\r\n;0\r\n";
1413    let chunk1 = "he";
1414    let chunk2 = "llow";
1415    let chunk3 = "o";
1416    let chunk4 = "rld";
1417
1418    let mut buf = BytesMut::new();
1419    let mut offset = 0;
1420
1421    offset = streaming::extend_while_encoding(&mut buf, |buf| streaming::encode_start_string(buf, offset)).unwrap();
1422    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1423      streaming::encode_string_chunk(buf, offset, chunk1.as_bytes())
1424    })
1425    .unwrap();
1426    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1427      streaming::encode_string_chunk(buf, offset, chunk2.as_bytes())
1428    })
1429    .unwrap();
1430    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1431      streaming::encode_string_chunk(buf, offset, chunk3.as_bytes())
1432    })
1433    .unwrap();
1434    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1435      streaming::encode_string_chunk(buf, offset, chunk4.as_bytes())
1436    })
1437    .unwrap();
1438    let _ = streaming::extend_while_encoding(&mut buf, |buf| streaming::encode_end_string(buf, offset)).unwrap();
1439
1440    assert_eq!(buf, expected);
1441  }
1442
1443  #[test]
1444  fn should_encode_streaming_array() {
1445    let expected = "*?\r\n:1\r\n+foo\r\n#f\r\n$9\r\nfoobarbaz\r\n.\r\n";
1446    let chunk1 = Frame::Number {
1447      data: 1,
1448      attributes: None,
1449    };
1450    let chunk2 = Frame::SimpleString {
1451      data: "foo".into(),
1452      attributes: None,
1453    };
1454    let chunk3 = Frame::Boolean {
1455      data: false,
1456      attributes: None,
1457    };
1458    let chunk4 = Frame::BlobString {
1459      data: "foobarbaz".as_bytes().into(),
1460      attributes: None,
1461    };
1462
1463    let mut buf = BytesMut::new();
1464    let mut offset = 0;
1465
1466    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1467      streaming::encode_start_aggregate_type(buf, offset, &FrameKind::Array)
1468    })
1469    .unwrap();
1470    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1471      streaming::encode_aggregate_type_inner_value(buf, offset, &chunk1)
1472    })
1473    .unwrap();
1474    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1475      streaming::encode_aggregate_type_inner_value(buf, offset, &chunk2)
1476    })
1477    .unwrap();
1478    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1479      streaming::encode_aggregate_type_inner_value(buf, offset, &chunk3)
1480    })
1481    .unwrap();
1482    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1483      streaming::encode_aggregate_type_inner_value(buf, offset, &chunk4)
1484    })
1485    .unwrap();
1486    let _ =
1487      streaming::extend_while_encoding(&mut buf, |buf| streaming::encode_end_aggregate_type(buf, offset)).unwrap();
1488
1489    assert_eq!(buf, expected);
1490  }
1491
1492  #[test]
1493  fn should_encode_streaming_set() {
1494    let expected = "~?\r\n:1\r\n+foo\r\n#f\r\n$9\r\nfoobarbaz\r\n.\r\n";
1495    let chunk1 = Frame::Number {
1496      data: 1,
1497      attributes: None,
1498    };
1499    let chunk2 = Frame::SimpleString {
1500      data: "foo".into(),
1501      attributes: None,
1502    };
1503    let chunk3 = Frame::Boolean {
1504      data: false,
1505      attributes: None,
1506    };
1507    let chunk4 = Frame::BlobString {
1508      data: "foobarbaz".as_bytes().into(),
1509      attributes: None,
1510    };
1511
1512    let mut buf = BytesMut::new();
1513    let mut offset = 0;
1514
1515    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1516      streaming::encode_start_aggregate_type(buf, offset, &FrameKind::Set)
1517    })
1518    .unwrap();
1519    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1520      streaming::encode_aggregate_type_inner_value(buf, offset, &chunk1)
1521    })
1522    .unwrap();
1523    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1524      streaming::encode_aggregate_type_inner_value(buf, offset, &chunk2)
1525    })
1526    .unwrap();
1527    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1528      streaming::encode_aggregate_type_inner_value(buf, offset, &chunk3)
1529    })
1530    .unwrap();
1531    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1532      streaming::encode_aggregate_type_inner_value(buf, offset, &chunk4)
1533    })
1534    .unwrap();
1535    let _ =
1536      streaming::extend_while_encoding(&mut buf, |buf| streaming::encode_end_aggregate_type(buf, offset)).unwrap();
1537
1538    assert_eq!(buf, expected);
1539  }
1540
1541  #[test]
1542  fn should_encode_streaming_map() {
1543    let expected = "%?\r\n+a\r\n:1\r\n+b\r\n:2\r\n.\r\n";
1544    let k1 = Frame::SimpleString {
1545      data: "a".into(),
1546      attributes: None,
1547    };
1548    let v1 = Frame::Number {
1549      data: 1,
1550      attributes: None,
1551    };
1552    let k2 = Frame::SimpleString {
1553      data: "b".into(),
1554      attributes: None,
1555    };
1556    let v2 = Frame::Number {
1557      data: 2,
1558      attributes: None,
1559    };
1560
1561    let mut buf = BytesMut::new();
1562    let mut offset = 0;
1563
1564    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1565      streaming::encode_start_aggregate_type(buf, offset, &FrameKind::Map)
1566    })
1567    .unwrap();
1568    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1569      streaming::encode_aggregate_type_inner_kv_pair(buf, offset, &k1, &v1)
1570    })
1571    .unwrap();
1572    offset = streaming::extend_while_encoding(&mut buf, |buf| {
1573      streaming::encode_aggregate_type_inner_kv_pair(buf, offset, &k2, &v2)
1574    })
1575    .unwrap();
1576    let _ =
1577      streaming::extend_while_encoding(&mut buf, |buf| streaming::encode_end_aggregate_type(buf, offset)).unwrap();
1578
1579    assert_eq!(buf, expected);
1580  }
1581}