Module streaming

Source
Expand description

Encoding functions for streaming blobs and aggregate types.

This interface is stateful due to the fact that the caller almost always needs to maintain some state outside of these functions. As a result it’s up to the caller to manage the starting, middle, and ending state of data streams in order to call the correct functions in this module.

Examples:


use redis_protocol::resp3::encode::streaming::*;
use redis_protocol::resp3::types::{Frame, FrameKind};
use bytes::BytesMut;
use redis_protocol::types::RedisProtocolError;

fn example(buf: &mut BytesMut) -> Result<usize, RedisProtocolError> {
  // in many cases the starting buffer wont be empty, so this example shows how to track the offset as well
  let mut offset = buf.len();
  let frames: Vec<Frame> = vec![1.into(), 2.into()];

  offset += extend_while_encoding(buf, |buf| {
    encode_start_aggregate_type(buf, offset, &FrameKind::Array)
  })?;

  for frame in frames.iter() {
    offset += extend_while_encoding(buf, |buf| {
      encode_aggregate_type_inner_value(buf, offset, frame)
    })?;
  }

  offset += extend_while_encoding(buf, |buf| {
    encode_end_aggregate_type(buf, offset)
  })?;

  println!("New buffer size: {}", offset);
  Ok(offset)
}

Or a somewhat more practical example…


use redis_protocol::resp3::encode::streaming::*;
use redis_protocol::resp3::types::{Frame, FrameKind};
use redis_protocol::types::RedisProtocolError;
use bytes::BytesMut;
use std::future::Future;
use tokio::net::TcpStream;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

async fn write_all(socket: &mut TcpStream, buf: &mut BytesMut) -> Result<usize, RedisProtocolError> {
  let len = buf.len();
  socket.write_all(&buf).await.expect("Failed to write to socket.");
  // we could just clear the buffer here since we use `write_all`, but in many cases callers don't flush the socket on each `write` call.
  // in those scenarios the caller should split the buffer based on the result from `write`.
  let _ = buf.split_to(len);

  Ok(len)
}

/// Start a new array stream, sending frames received from `rx` out to `socket` and ending the stream when `rx` closes.
async fn stream_array(socket: &mut TcpStream, mut rx: UnboundedReceiver<Frame>) -> Result<(), RedisProtocolError> {
  let mut buf = BytesMut::new();

  let _ = extend_while_encoding(&mut buf, |buf| {
    encode_start_aggregate_type(buf, 0, &FrameKind::Array)
  })?;
  let mut written = write_all(socket, &mut buf).await.expect("Failed to write to socket.");

  loop {
    let frame = match rx.recv().await {
       Some(frame) => frame,
       // break out of the loop when the channel closes
       None => break
    };

    let _ = extend_while_encoding(&mut buf, |buf| {
      encode_aggregate_type_inner_value(buf, 0, &frame)
    })?;
    written += write_all(socket, &mut buf).await.expect("Failed to write to socket.");
  }

  let _ = extend_while_encoding(&mut buf, |buf| {
    encode_end_aggregate_type(buf, 0)
  })?;
  written += write_all(socket, &mut buf).await.expect("Failed to write to socket.");

  println!("Streamed {} bytes to the socket.", written);
  Ok(())  
}

fn generate_frames(tx: UnboundedSender<Frame>) -> Result<(), RedisProtocolError> {
  // read from another socket or somehow generate frames, writing them to `tx`
  unimplemented!()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
  let (tx, rx) = unbounded_channel();
  let mut socket = TcpStream::connect("127.0.0.1:6379").await.expect("Failed to connect");
   
  generate_frames(tx);
  stream_array(&mut socket, rx).await.expect("Failed to stream array.");

  Ok(())
}

Functions§

encode_aggregate_type_inner_kv_pair
Encode the inner frames that make up a key-value pair in a streamed map.
encode_aggregate_type_inner_value
Encode the inner frame inside a streamed array or set.
encode_end_aggregate_type
Encode the terminating bytes at the end of a streaming aggregate type (array, set, or map).
encode_end_string
Encode the terminating bytes at the end of a streaming blob string.
encode_start_aggregate_type
Encode the starting bytes for a streaming aggregate type (array, set, or map).
encode_start_string
Encode the starting bytes for a streaming blob string.
encode_string_chunk
Encode the bytes making up one chunk of a streaming blob string.
extend_while_encoding
A wrapper function for automatically extending the input buffer while encoding frames with a different encoding function.