Available on crate feature
resp3
only.Expand description
Encoding functions for streaming blobs and aggregate types.
§Using Bytes
and Tokio
Stream an array of frames via a Tokio unbounded channel.
async fn write_all(socket: &mut TcpStream, buf: &mut BytesMut) -> usize {
let len = buf.len();
socket.write_all(&buf).await.expect("Failed to write to socket.");
// we could just clear the buffer here since we use `write_all`, but in many cases it's common to not flush the socket on
// each `write` call. in those scenarios the caller should split the buffer based on the result from `write`.
let _ = buf.split_to(len);
len
}
/// Start a new array stream, sending frames received from `rx` out to `socket` and ending the stream when `rx` closes.
async fn stream_array(socket: &mut TcpStream, mut rx: UnboundedReceiver<BytesFrame>) {
let mut buf = BytesMut::new();
let mut written = 0;
zero_extend(&mut buf, START_STREAM_ENCODE_LEN);
encode_start_aggregate_type(&mut buf, 0, FrameKind::Array).unwrap();
written += write_all(socket, &mut buf).await;
while let Some(frame) = rx.recv().await {
zero_extend(&mut buf, frame.encode_len(false));
encode_bytes_aggregate_type_inner_value(&mut buf, 0, &frame, false).unwrap();
written += write_all(socket, &mut buf).await;
}
zero_extend(&mut buf, END_STREAM_AGGREGATE_TYPE_ENCODE_LEN);
encode_end_aggregate_type(&mut buf, 0).unwrap();
written += write_all(socket, &mut buf).await;
println!("Streamed {} bytes to the socket.", written);
}
async fn generate_frames(tx: UnboundedSender<BytesFrame>) {
// read from another socket or somehow generate frames
sleep(Duration::from_secs(1)).await;
tx.send(BytesFrame::BlobString { data: "foo".into(), attributes: None }).unwrap();
sleep(Duration::from_secs(1)).await;
tx.send(BytesFrame::BlobString { data: "bar".into(), attributes: None }).unwrap();
sleep(Duration::from_secs(1)).await;
tx.send(BytesFrame::BlobString { data: "baz".into(), attributes: None }).unwrap();
}
#[tokio::main]
async fn main() {
let (tx, rx) = unbounded_channel();
let mut socket = TcpStream::connect("127.0.0.1:6379").await.unwrap();
tokio::spawn(generate_frames(tx));
stream_array(&mut socket, rx).await;
}
Constants§
- END_
STREAM_ AGGREGATE_ TYPE_ ENCODE_ LEN - Number of bytes needed to encode the terminating bytes after an aggregate type.
- END_
STREAM_ STRING_ ENCODE_ LEN - Number of bytes needed to encode the terminating bytes after a blob string.
- START_
STREAM_ ENCODE_ LEN - Number of bytes needed to encode the prefix when starting a stream.
Functions§
- encode_
borrowed_ aggregate_ type_ inner_ kv_ pair - Encode the inner borrowed frames that make up a key-value pair in a streamed map.
- encode_
borrowed_ aggregate_ type_ inner_ value - Encode the inner borrowed frame inside a streamed array or set.
- encode_
bytes_ aggregate_ type_ inner_ kv_ pair bytes
- Encode the inner frames that make up a key-value pair in a streamed map.
- encode_
bytes_ aggregate_ type_ inner_ value bytes
- Encode the inner frame inside a streamed array or set.
- encode_
end_ aggregate_ type - Encode the terminating bytes at the end of a streaming aggregate type (array, set, or map).
- encode_
end_ string - Encode the terminating bytes at the end of a streaming blob string.
- encode_
owned_ aggregate_ type_ inner_ kv_ pair - Encode the inner frames that make up a key-value pair in a streamed map.
- encode_
owned_ aggregate_ type_ inner_ value - Encode the inner frame inside a streamed array or set.
- encode_
start_ aggregate_ type - Encode the starting bytes for a streaming aggregate type (array, set, or map).
- encode_
start_ string - Encode the starting bytes in a streaming blob string.
- encode_
string_ chunk - Encode the bytes making up one chunk of a streaming blob string.