Module redis_protocol::resp3::decode::streaming [−][src]
Expand description
Decoding structs and functions that support streaming frames. The caller is responsible for managing any returned state for streaming frames.
Examples:
Implement a codec that supports decoding streams…
ⓘ
use redis_protocol::resp3::types::*;
use redis_protocol::types::{RedisProtocolError, RedisProtocolErrorKind};
use redis_protocol::resp3::decode::streaming::*;
use redis_protocol::resp3::encode::complete::*;
use bytes::BytesMut;
use tokio_util::codec::{Decoder, Encoder};
use std::collections::VecDeque;
pub struct RedisCodec {
decoder_stream: Option<StreamedFrame>
}
impl Encoder<Frame> for RedisCodec {
type Error = RedisProtocolError;
fn encode(&mut self, item: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
// in this example we only show support for encoding complete frames
let _ = encode_bytes(dst, &item)?;
Ok(())
}
}
impl Decoder for RedisCodec {
type Item = Frame;
type Error = RedisProtocolError;
// Buffer the results of streamed frame before returning the complete frame to the caller.
// Callers that want to surface streaming frame chunks up the stack would simply return after calling `decode` here.
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
return Ok(None);
}
if let Some((frame, amt)) = decode(&src)? {
// clear the buffer up to the amount decoded so the same bytes aren't repeatedly processed
let _ = src.split_to(amt);
if self.decoder_stream.is_some() && frame.is_streaming() {
// it doesn't make sense to start a stream while inside another stream
return Err(RedisProtocolError::new(
RedisProtocolErrorKind::DecodeError,
"Cannot start a stream while already inside a stream."
));
}
let result = if let Some(ref mut streamed_frame) = self.decoder_stream {
// we started receiving streamed data earlier
// we already checked for streams within streams above
let frame = frame.into_complete_frame()?;
streamed_frame.add_frame(frame);
if streamed_frame.is_finished() {
// convert the inner stream buffer into the final output frame
Some(streamed_frame.into_frame()?)
}else{
// buffer the stream in memory until it completes
None
}
}else{
// we're processing a complete frame or starting a new streamed frame
if frame.is_streaming() {
// start a new stream, saving the internal buffer to the codec state
self.decoder_stream = Some(frame.into_streaming_frame()?);
// don't return anything to the caller until the stream finishes (shown above)
None
}else{
// we're not in the middle of a stream and we found a complete frame
Some(frame.into_complete_frame()?)
}
};
if result.is_some() {
// we're either done with the stream or we found a complete frame. either way clear the buffer.
let _ = self.decoder_stream.take();
}
Ok(result)
}else{
Ok(None)
}
}
}
Functions
Attempt to parse the contents of buf
, returning the first valid frame and the number of bytes consumed.