use crate::{body::Chunk, error::Error};
use http::{HeaderMap, HeaderName, HeaderValue};
use tokio::sync::mpsc;
const STREAMING_CHANNEL_SIZE: usize = 8;
#[derive(Debug)]
pub struct StreamingBody {
sender: mpsc::Sender<StreamingBodyItem>,
pub(crate) trailers: HeaderMap,
}
#[derive(Debug)]
pub enum StreamingBodyItem {
Chunk(Chunk),
Finished(HeaderMap),
}
impl StreamingBody {
pub fn new() -> (StreamingBody, mpsc::Receiver<StreamingBodyItem>) {
let (sender, receiver) = mpsc::channel(STREAMING_CHANNEL_SIZE);
(
StreamingBody {
sender,
trailers: HeaderMap::new(),
},
receiver,
)
}
pub async fn send_chunk(&mut self, chunk: impl Into<Chunk>) -> Result<(), Error> {
self.sender
.send(StreamingBodyItem::Chunk(chunk.into()))
.await
.map_err(|_| Error::StreamingChunkSend)
}
pub fn append_trailer(&mut self, name: HeaderName, value: HeaderValue) {
self.trailers.append(name, value);
}
pub async fn await_ready(&mut self) {
let _ = self.sender.reserve().await;
}
pub fn finish(self) -> Result<(), Error> {
match self
.sender
.try_send(StreamingBodyItem::Finished(self.trailers))
{
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Closed(_)) => Ok(()),
Err(mpsc::error::TrySendError::Full(StreamingBodyItem::Finished(trailers))) => {
tokio::task::spawn(async move {
let _ = self
.sender
.send(StreamingBodyItem::Finished(trailers))
.await;
});
Ok(())
}
Err(mpsc::error::TrySendError::Full(_)) => {
unreachable!("Only a StreamingBodyItem::Finished should be reachable")
}
}
}
}