use super::active_read::ActiveRead;
use crate::Error;
use crate::model::Object;
use crate::read_object::dynamic::ReadObjectResponse;
use crate::{error::ReadError, model_ext::ObjectHighlights};
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
#[derive(Debug)]
pub struct RangeReader {
inner: Receiver<Result<bytes::Bytes, ReadError>>,
object: Arc<Object>,
_tx: Sender<ActiveRead>,
}
impl RangeReader {
pub fn new(
inner: Receiver<Result<bytes::Bytes, ReadError>>,
object: Arc<Object>,
tx: Sender<ActiveRead>,
) -> Self {
Self {
inner,
object,
_tx: tx,
}
}
}
#[async_trait::async_trait]
impl ReadObjectResponse for RangeReader {
fn object(&self) -> ObjectHighlights {
ObjectHighlights {
generation: self.object.generation,
metageneration: self.object.metageneration,
size: self.object.size,
content_encoding: self.object.content_encoding.clone(),
checksums: self.object.checksums.clone(),
storage_class: self.object.storage_class.clone(),
content_language: self.object.content_language.clone(),
content_type: self.object.content_type.clone(),
content_disposition: self.object.content_disposition.clone(),
etag: self.object.etag.clone(),
}
}
async fn next(&mut self) -> Option<crate::Result<bytes::Bytes>> {
let msg = self.inner.recv().await?;
Some(msg.map_err(Error::io))
}
}
#[cfg(test)]
mod tests {
use super::super::tests::permanent_error;
use super::*;
use crate::model::{Object, ObjectChecksums};
use std::error::Error as _;
#[tokio::test]
async fn object() -> anyhow::Result<()> {
let object = Object::new()
.set_generation(123456)
.set_metageneration(234567)
.set_size(1024)
.set_checksums(ObjectChecksums::new().set_crc32c(456789_u32))
.set_etag("test-etag")
.set_storage_class("STANDARD")
.set_content_encoding("content-encoding")
.set_content_language("content-language")
.set_content_type("content-type")
.set_content_disposition("content-disposition");
let object = Arc::new(object);
let (tx, inner) = tokio::sync::mpsc::channel(1);
let (pending_tx, mut pending_rx) = tokio::sync::mpsc::channel(100);
let mut reader = RangeReader::new(inner, object.clone(), pending_tx);
let got = reader.object();
let want = ObjectHighlights {
generation: 123456,
metageneration: 234567,
size: 1024,
checksums: Some(ObjectChecksums::new().set_crc32c(456789_u32)),
etag: "test-etag".into(),
storage_class: "STANDARD".into(),
content_encoding: "content-encoding".into(),
content_language: "content-language".into(),
content_type: "content-type".into(),
content_disposition: "content-disposition".into(),
};
assert_eq!(got, want);
let data = bytes::Bytes::from_static(b"the quick brown fox jumps over the lazy dog");
tx.send(Ok(data.clone())).await?;
let got = reader.next().await;
assert!(matches!(got, Some(Ok(ref d)) if *d == data), "{got:?}");
let error = ReadError::UnrecoverableBidiReadInterrupt(Arc::new(permanent_error()));
tx.send(Err(error)).await?;
let got = reader.next().await;
assert!(matches!(got, Some(Err(_))), "{got:?}");
let got = got.unwrap().unwrap_err();
assert!(got.is_io(), "{got:?}");
let source = got.source().and_then(|e| e.downcast_ref::<ReadError>());
assert!(
matches!(source, Some(ReadError::UnrecoverableBidiReadInterrupt(_))),
"{source:?}"
);
drop(tx);
let got = reader.next().await;
assert!(got.is_none(), "{got:?}");
drop(reader);
let done = pending_rx.recv().await;
assert!(done.is_none(), "{done:?}");
Ok(())
}
}