use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures_util::{Stream, StreamExt};
use opendal::Reader;
pub struct BytesStream {
inner: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
}
impl BytesStream {
pub(crate) async fn from_reader(reader: Reader) -> Result<Self, crate::storage::StorageError> {
let stream = reader
.into_bytes_stream(..)
.await
.map_err(crate::storage::StorageError::from)?;
let mapped_stream = stream
.map(|result| result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
Ok(Self {
inner: Box::pin(mapped_stream),
})
}
pub async fn collect(mut self) -> Result<Bytes, std::io::Error> {
let mut buffer = Vec::new();
while let Some(chunk) = self.next().await {
let chunk = chunk?;
buffer.extend_from_slice(&chunk);
}
Ok(Bytes::from(buffer))
}
}
impl Stream for BytesStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
}
}
impl BytesStream {
pub fn into_body(self) -> axum::body::Body {
axum::body::Body::from_stream(self)
}
pub fn from_body_stream<S>(stream: S) -> Self
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
{
Self {
inner: Box::pin(stream),
}
}
}