Module streaming

Module streaming 

Source
Available on crate feature resp3 only.
Expand description

Encoding functions for streaming blobs and aggregate types.

§Using Bytes and Tokio

Stream an array of frames via a Tokio unbounded channel.


async fn write_all(socket: &mut TcpStream, buf: &mut BytesMut) -> usize {
  let len = buf.len();
  socket.write_all(&buf).await.expect("Failed to write to socket.");
  // we could just clear the buffer here since we use `write_all`, but in many cases it's common to not flush the socket on
  // each `write` call. in those scenarios the caller should split the buffer based on the result from `write`.
  let _ = buf.split_to(len);
  len
}

/// Start a new array stream, sending frames received from `rx` out to `socket` and ending the stream when `rx` closes.
async fn stream_array(socket: &mut TcpStream, mut rx: UnboundedReceiver<BytesFrame>) {
  let mut buf = BytesMut::new();
  let mut written = 0;

  zero_extend(&mut buf, START_STREAM_ENCODE_LEN);
  encode_start_aggregate_type(&mut buf, 0, FrameKind::Array).unwrap();
  written += write_all(socket, &mut buf).await;

  while let Some(frame) = rx.recv().await {
    zero_extend(&mut buf, frame.encode_len(false));
    encode_bytes_aggregate_type_inner_value(&mut buf, 0, &frame, false).unwrap();
    written += write_all(socket, &mut buf).await;
  }

  zero_extend(&mut buf, END_STREAM_AGGREGATE_TYPE_ENCODE_LEN);
  encode_end_aggregate_type(&mut buf, 0).unwrap();
  written += write_all(socket, &mut buf).await;

  println!("Streamed {} bytes to the socket.", written);
}

async fn generate_frames(tx: UnboundedSender<BytesFrame>) {
  // read from another socket or somehow generate frames
  sleep(Duration::from_secs(1)).await;
  tx.send(BytesFrame::BlobString { data: "foo".into(), attributes: None }).unwrap();
  sleep(Duration::from_secs(1)).await;
  tx.send(BytesFrame::BlobString { data: "bar".into(), attributes: None }).unwrap();
  sleep(Duration::from_secs(1)).await;
  tx.send(BytesFrame::BlobString { data: "baz".into(), attributes: None }).unwrap();
}

#[tokio::main]
async fn main() {
  let (tx, rx) = unbounded_channel();
  let mut socket = TcpStream::connect("127.0.0.1:6379").await.unwrap();

  tokio::spawn(generate_frames(tx));
  stream_array(&mut socket, rx).await;
}

Constants§

END_STREAM_AGGREGATE_TYPE_ENCODE_LEN
Number of bytes needed to encode the terminating bytes after an aggregate type.
END_STREAM_STRING_ENCODE_LEN
Number of bytes needed to encode the terminating bytes after a blob string.
START_STREAM_ENCODE_LEN
Number of bytes needed to encode the prefix when starting a stream.

Functions§

encode_borrowed_aggregate_type_inner_kv_pair
Encode the inner borrowed frames that make up a key-value pair in a streamed map.
encode_borrowed_aggregate_type_inner_value
Encode the inner borrowed frame inside a streamed array or set.
encode_bytes_aggregate_type_inner_kv_pairbytes
Encode the inner frames that make up a key-value pair in a streamed map.
encode_bytes_aggregate_type_inner_valuebytes
Encode the inner frame inside a streamed array or set.
encode_end_aggregate_type
Encode the terminating bytes at the end of a streaming aggregate type (array, set, or map).
encode_end_string
Encode the terminating bytes at the end of a streaming blob string.
encode_owned_aggregate_type_inner_kv_pair
Encode the inner frames that make up a key-value pair in a streamed map.
encode_owned_aggregate_type_inner_value
Encode the inner frame inside a streamed array or set.
encode_start_aggregate_type
Encode the starting bytes for a streaming aggregate type (array, set, or map).
encode_start_string
Encode the starting bytes in a streaming blob string.
encode_string_chunk
Encode the bytes making up one chunk of a streaming blob string.