use bytes::Bytes;
use tokio::{
io::{AsyncReadExt, DuplexStream},
sync::{mpsc, oneshot},
};
use tracing::{debug, warn};
pub(crate) struct MultipartChunk {
pub part_number: usize,
pub data: Bytes,
}
pub(crate) fn multipart(
multipart_chunk_size: usize,
chunks_channel_depth: usize,
) -> (DuplexStream, mpsc::Receiver<MultipartChunk>) {
let (bytes_sender, mut bytes_receiver) = tokio::io::duplex(multipart_chunk_size);
let (chunks_sender, chunks_receiver) = mpsc::channel(chunks_channel_depth);
let mut part_number = 0usize;
tokio::spawn(async move {
loop {
let mut buffer = bytes::BytesMut::with_capacity(multipart_chunk_size);
while buffer.len() < multipart_chunk_size {
let bytes_read = bytes_receiver
.read_buf(&mut buffer)
.await
.expect("BUG: Reads from DuplexStream are infallible");
if bytes_read == 0 {
break;
}
}
if !buffer.is_empty() {
let chunk = MultipartChunk {
part_number,
data: Bytes::from(buffer),
};
part_number += 1;
if chunks_sender.send(chunk).await.is_err() {
warn!("chunks receiver was dropped; aborting the worker task");
break;
}
} else {
debug!("encountered end of duplex stream; worker task exiting");
break;
}
}
});
(bytes_sender, chunks_receiver)
}
pub(crate) fn unipart(max_bytes: usize) -> (DuplexStream, oneshot::Receiver<Bytes>) {
let (bytes_sender, mut bytes_receiver) = tokio::io::duplex(max_bytes);
let (chunks_sender, chunks_receiver) = oneshot::channel();
tokio::spawn(async move {
let mut buffer = Vec::with_capacity(max_bytes);
bytes_receiver
.read_to_end(&mut buffer)
.await
.expect("BUG: Reads from DuplexStream are infallible");
if chunks_sender.send(Bytes::from(buffer)).is_err() {
warn!("chunks receiver was dropped; unipart chunk is lost");
}
});
(bytes_sender, chunks_receiver)
}