Expand description
Encoding functions for streaming blobs and aggregate types.
This interface is stateful due to the fact that the caller almost always needs to maintain some state outside of these functions. As a result it’s up to the caller to manage the starting, middle, and ending state of data streams in order to call the correct functions in this module.
Examples:
use redis_protocol::resp3::encode::streaming::*;
use redis_protocol::resp3::types::{Frame, FrameKind};
use bytes::BytesMut;
use redis_protocol::types::RedisProtocolError;
fn example(buf: &mut BytesMut) -> Result<usize, RedisProtocolError> {
// in many cases the starting buffer wont be empty, so this example shows how to track the offset as well
let mut offset = buf.len();
let frames: Vec<Frame> = vec![1.into(), 2.into()];
offset += extend_while_encoding(buf, |buf| {
encode_start_aggregate_type(buf, offset, &FrameKind::Array)
})?;
for frame in frames.iter() {
offset += extend_while_encoding(buf, |buf| {
encode_aggregate_type_inner_value(buf, offset, frame)
})?;
}
offset += extend_while_encoding(buf, |buf| {
encode_end_aggregate_type(buf, offset)
})?;
println!("New buffer size: {}", offset);
Ok(offset)
}
Or a somewhat more practical example…
ⓘ
use redis_protocol::resp3::encode::streaming::*;
use redis_protocol::resp3::types::{Frame, FrameKind};
use redis_protocol::types::RedisProtocolError;
use bytes::BytesMut;
use std::future::Future;
use tokio::net::TcpStream;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
async fn write_all(socket: &mut TcpStream, buf: &mut BytesMut) -> Result<usize, RedisProtocolError> {
let len = buf.len();
socket.write_all(&buf).await.expect("Failed to write to socket.");
// we could just clear the buffer here since we use `write_all`, but in many cases callers don't flush the socket on each `write` call.
// in those scenarios the caller should split the buffer based on the result from `write`.
let _ = buf.split_to(len);
Ok(len)
}
/// Start a new array stream, sending frames received from `rx` out to `socket` and ending the stream when `rx` closes.
async fn stream_array(socket: &mut TcpStream, mut rx: UnboundedReceiver<Frame>) -> Result<(), RedisProtocolError> {
let mut buf = BytesMut::new();
let _ = extend_while_encoding(&mut buf, |buf| {
encode_start_aggregate_type(buf, 0, &FrameKind::Array)
})?;
let mut written = write_all(socket, &mut buf).await.expect("Failed to write to socket.");
loop {
let frame = match rx.recv().await {
Some(frame) => frame,
// break out of the loop when the channel closes
None => break
};
let _ = extend_while_encoding(&mut buf, |buf| {
encode_aggregate_type_inner_value(buf, 0, &frame)
})?;
written += write_all(socket, &mut buf).await.expect("Failed to write to socket.");
}
let _ = extend_while_encoding(&mut buf, |buf| {
encode_end_aggregate_type(buf, 0)
})?;
written += write_all(socket, &mut buf).await.expect("Failed to write to socket.");
println!("Streamed {} bytes to the socket.", written);
Ok(())
}
fn generate_frames(tx: UnboundedSender<Frame>) -> Result<(), RedisProtocolError> {
// read from another socket or somehow generate frames, writing them to `tx`
unimplemented!()
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = unbounded_channel();
let mut socket = TcpStream::connect("127.0.0.1:6379").await.expect("Failed to connect");
generate_frames(tx);
stream_array(&mut socket, rx).await.expect("Failed to stream array.");
Ok(())
}
Functions§
- encode_
aggregate_ type_ inner_ kv_ pair - Encode the inner frames that make up a key-value pair in a streamed map.
- encode_
aggregate_ type_ inner_ value - Encode the inner frame inside a streamed array or set.
- encode_
end_ aggregate_ type - Encode the terminating bytes at the end of a streaming aggregate type (array, set, or map).
- encode_
end_ string - Encode the terminating bytes at the end of a streaming blob string.
- encode_
start_ aggregate_ type - Encode the starting bytes for a streaming aggregate type (array, set, or map).
- encode_
start_ string - Encode the starting bytes for a streaming blob string.
- encode_
string_ chunk - Encode the bytes making up one chunk of a streaming blob string.
- extend_
while_ encoding - A wrapper function for automatically extending the input buffer while encoding frames with a different encoding function.