use crate::Result;
use crate::model_ext::ObjectHighlights;
use crate::streaming_source::{Payload, StreamingSource};
#[cfg(feature = "unstable-stream")]
use futures::Stream;
#[derive(Debug)]
pub struct ReadObjectResponse {
inner: Box<dyn dynamic::ReadObjectResponse + Send + 'static>,
}
impl ReadObjectResponse {
pub(crate) fn new<T>(inner: Box<T>) -> Self
where
T: dynamic::ReadObjectResponse + Send + 'static,
{
Self { inner }
}
pub(crate) fn into_parts(self) -> Box<dyn dynamic::ReadObjectResponse + Send + 'static> {
self.inner
}
pub fn from_source<T, S>(object: ObjectHighlights, source: T) -> Self
where
T: Into<Payload<S>> + Send + Sync + 'static,
S: StreamingSource + Send + Sync + 'static,
{
Self {
inner: Box::new(FakeReadObjectResponse::<S> {
object,
source: source.into(),
}),
}
}
pub fn object(&self) -> ObjectHighlights {
self.inner.object()
}
pub async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
self.inner.next().await
}
#[cfg(feature = "unstable-stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
pub fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {
use futures::stream::unfold;
Box::pin(unfold(self, |mut stream| async move {
stream.next().await.map(|item| (item, stream))
}))
}
}
pub(crate) mod dynamic {
use crate::Result;
use crate::model_ext::ObjectHighlights;
#[async_trait::async_trait]
pub trait ReadObjectResponse: std::fmt::Debug {
fn object(&self) -> ObjectHighlights;
async fn next(&mut self) -> Option<Result<bytes::Bytes>>;
}
}
struct FakeReadObjectResponse<T>
where
T: StreamingSource + Send + Sync + 'static,
{
object: ObjectHighlights,
source: Payload<T>,
}
impl<T> std::fmt::Debug for FakeReadObjectResponse<T>
where
T: StreamingSource + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FakeReadObjectResponse")
.field("object", &self.object)
.finish()
}
}
#[async_trait::async_trait]
impl<T> dynamic::ReadObjectResponse for FakeReadObjectResponse<T>
where
T: StreamingSource + Send + Sync + 'static,
{
fn object(&self) -> ObjectHighlights {
self.object.clone()
}
async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
self.source
.next()
.await
.map(|r| r.map_err(crate::Error::io))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn from_source() -> anyhow::Result<()> {
const LAZY: &str = "the quick brown fox jumps over the lazy dog";
let object = ObjectHighlights {
etag: "custom-etag".to_string(),
..Default::default()
};
let mut response = ReadObjectResponse::from_source(object.clone(), LAZY);
assert_eq!(&object, &response.object());
let mut contents = Vec::new();
while let Some(chunk) = response.next().await.transpose()? {
contents.extend_from_slice(&chunk);
}
let contents = bytes::Bytes::from_owner(contents);
assert_eq!(contents, LAZY);
Ok(())
}
}