Available on crate feature resp3 only.
Expand description

Encoding functions for streaming blobs and aggregate types.

§Using Bytes and Tokio

Stream an array of frames via a Tokio unbounded channel.


async fn write_all(socket: &mut TcpStream, buf: &mut BytesMut) -> usize {
  let len = buf.len();
  socket.write_all(&buf).await.expect("Failed to write to socket.");
  // we could just clear the buffer here since we use `write_all`, but in many cases it's common to not flush the socket on
  // each `write` call. in those scenarios the caller should split the buffer based on the result from `write`.
  let _ = buf.split_to(len);
  len
}

/// Start a new array stream, sending frames received from `rx` out to `socket` and ending the stream when `rx` closes.
async fn stream_array(socket: &mut TcpStream, mut rx: UnboundedReceiver<BytesFrame>) {
  let mut buf = BytesMut::new();
  let mut written = 0;

  zero_extend(&mut buf, START_STREAM_ENCODE_LEN);
  encode_start_aggregate_type(&mut buf, 0, FrameKind::Array).unwrap();
  written += write_all(socket, &mut buf).await;

  while let Some(frame) = rx.recv().await {
    zero_extend(&mut buf, frame.encode_len());
    encode_bytes_aggregate_type_inner_value(&mut buf, 0, &frame).unwrap();
    written += write_all(socket, &mut buf).await;
  }

  zero_extend(&mut buf, END_STREAM_AGGREGATE_TYPE_ENCODE_LEN);
  encode_end_aggregate_type(&mut buf, 0).unwrap();
  written += write_all(socket, &mut buf).await;

  println!("Streamed {} bytes to the socket.", written);
}

async fn generate_frames(tx: UnboundedSender<BytesFrame>) {
  // read from another socket or somehow generate frames
  sleep(Duration::from_secs(1)).await;
  tx.send(BytesFrame::BlobString { data: "foo".into(), attributes: None }).unwrap();
  sleep(Duration::from_secs(1)).await;
  tx.send(BytesFrame::BlobString { data: "bar".into(), attributes: None }).unwrap();
  sleep(Duration::from_secs(1)).await;
  tx.send(BytesFrame::BlobString { data: "baz".into(), attributes: None }).unwrap();
}

#[tokio::main]
async fn main() {
  let (tx, rx) = unbounded_channel();
  let mut socket = TcpStream::connect("127.0.0.1:6379").await.unwrap();

  tokio::spawn(generate_frames(tx));
  stream_array(&mut socket, rx).await;
}

Constants§

Functions§