Module streaming

Source
Available on crate feature resp3 only.
Expand description

Encoding functions for streaming blobs and aggregate types.

§Using Bytes and Tokio

Stream an array of frames via a Tokio unbounded channel.


async fn write_all(socket: &mut TcpStream, buf: &mut BytesMut) -> usize {
  let len = buf.len();
  socket.write_all(&buf).await.expect("Failed to write to socket.");
  // we could just clear the buffer here since we use `write_all`, but in many cases it's common to not flush the socket on
  // each `write` call. in those scenarios the caller should split the buffer based on the result from `write`.
  let _ = buf.split_to(len);
  len
}

/// Start a new array stream, sending frames received from `rx` out to `socket` and ending the stream when `rx` closes.
async fn stream_array(socket: &mut TcpStream, mut rx: UnboundedReceiver<BytesFrame>) {
  let mut buf = BytesMut::new();
  let mut written = 0;

  zero_extend(&mut buf, START_STREAM_ENCODE_LEN);
  encode_start_aggregate_type(&mut buf, 0, FrameKind::Array).unwrap();
  written += write_all(socket, &mut buf).await;

  while let Some(frame) = rx.recv().await {
    zero_extend(&mut buf, frame.encode_len(false));
    encode_bytes_aggregate_type_inner_value(&mut buf, 0, &frame, false).unwrap();
    written += write_all(socket, &mut buf).await;
  }

  zero_extend(&mut buf, END_STREAM_AGGREGATE_TYPE_ENCODE_LEN);
  encode_end_aggregate_type(&mut buf, 0).unwrap();
  written += write_all(socket, &mut buf).await;

  println!("Streamed {} bytes to the socket.", written);
}

async fn generate_frames(tx: UnboundedSender<BytesFrame>) {
  // read from another socket or somehow generate frames
  sleep(Duration::from_secs(1)).await;
  tx.send(BytesFrame::BlobString { data: "foo".into(), attributes: None }).unwrap();
  sleep(Duration::from_secs(1)).await;
  tx.send(BytesFrame::BlobString { data: "bar".into(), attributes: None }).unwrap();
  sleep(Duration::from_secs(1)).await;
  tx.send(BytesFrame::BlobString { data: "baz".into(), attributes: None }).unwrap();
}

#[tokio::main]
async fn main() {
  let (tx, rx) = unbounded_channel();
  let mut socket = TcpStream::connect("127.0.0.1:6379").await.unwrap();

  tokio::spawn(generate_frames(tx));
  stream_array(&mut socket, rx).await;
}

Constants§

END_STREAM_AGGREGATE_TYPE_ENCODE_LEN
Number of bytes needed to encode the terminating bytes after an aggregate type.
END_STREAM_STRING_ENCODE_LEN
Number of bytes needed to encode the terminating bytes after a blob string.
START_STREAM_ENCODE_LEN
Number of bytes needed to encode the prefix when starting a stream.

Functions§

encode_borrowed_aggregate_type_inner_kv_pair
Encode the inner borrowed frames that make up a key-value pair in a streamed map.
encode_borrowed_aggregate_type_inner_value
Encode the inner borrowed frame inside a streamed array or set.
encode_bytes_aggregate_type_inner_kv_pairbytes
Encode the inner frames that make up a key-value pair in a streamed map.
encode_bytes_aggregate_type_inner_valuebytes
Encode the inner frame inside a streamed array or set.
encode_end_aggregate_type
Encode the terminating bytes at the end of a streaming aggregate type (array, set, or map).
encode_end_string
Encode the terminating bytes at the end of a streaming blob string.
encode_owned_aggregate_type_inner_kv_pair
Encode the inner frames that make up a key-value pair in a streamed map.
encode_owned_aggregate_type_inner_value
Encode the inner frame inside a streamed array or set.
encode_start_aggregate_type
Encode the starting bytes for a streaming aggregate type (array, set, or map).
encode_start_string
Encode the starting bytes in a streaming blob string.
encode_string_chunk
Encode the bytes making up one chunk of a streaming blob string.