Module redis_protocol::resp3::decode::streaming[][src]

Expand description

Decoding structs and functions that support streaming frames. The caller is responsible for managing any returned state for streaming frames.

Examples:

Implement a codec that supports decoding streams…

use redis_protocol::resp3::types::*;
use redis_protocol::types::{RedisProtocolError, RedisProtocolErrorKind};
use redis_protocol::resp3::decode::streaming::*;
use redis_protocol::resp3::encode::complete::*;
use bytes::BytesMut;
use tokio_util::codec::{Decoder, Encoder};
use std::collections::VecDeque;

pub struct RedisCodec {
  decoder_stream: Option<StreamedFrame>
}

impl Encoder<Frame> for RedisCodec {
  type Error = RedisProtocolError;

  fn encode(&mut self, item: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
    // in this example we only show support for encoding complete frames
    let _ = encode_bytes(dst, &item)?;
    Ok(())
  }
}

impl Decoder for RedisCodec {
  type Item = Frame;
  type Error = RedisProtocolError;

  // Buffer the results of streamed frame before returning the complete frame to the caller.
  // Callers that want to surface streaming frame chunks up the stack would simply return after calling `decode` here.
  fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
    if src.is_empty() {
      return Ok(None);
    }

    if let Some((frame, amt)) = decode(&src)? {
      // clear the buffer up to the amount decoded so the same bytes aren't repeatedly processed
      let _ = src.split_to(amt);

      if self.decoder_stream.is_some() && frame.is_streaming() {
        // it doesn't make sense to start a stream while inside another stream
        return Err(RedisProtocolError::new(
          RedisProtocolErrorKind::DecodeError,
          "Cannot start a stream while already inside a stream."
        ));
      }

      let result = if let Some(ref mut streamed_frame) = self.decoder_stream {
        // we started receiving streamed data earlier

        // we already checked for streams within streams above
        let frame = frame.into_complete_frame()?;
        streamed_frame.add_frame(frame);

        if streamed_frame.is_finished() {
           // convert the inner stream buffer into the final output frame
           Some(streamed_frame.into_frame()?)
        }else{
          // buffer the stream in memory until it completes
          None
        }
      }else{
        // we're processing a complete frame or starting a new streamed frame
        if frame.is_streaming() {
          // start a new stream, saving the internal buffer to the codec state
          self.decoder_stream = Some(frame.into_streaming_frame()?);
          // don't return anything to the caller until the stream finishes (shown above)
          None
        }else{
          // we're not in the middle of a stream and we found a complete frame
          Some(frame.into_complete_frame()?)
        }
      };

      if result.is_some() {
        // we're either done with the stream or we found a complete frame. either way clear the buffer.
        let _ = self.decoder_stream.take();
      }

      Ok(result)
    }else{
      Ok(None)
    }
  }
}

Functions

Attempt to parse the contents of buf, returning the first valid frame and the number of bytes consumed.