use async_convert::async_trait;
use ecow::EcoVec;
use futures::{stream::BoxStream, TryStreamExt};
use hyper::body::SizeHint;
use rdf_utils::model::{dataset::CompatDataset, quad::ArcQuad};
use super::quads_inmem::{EcoQuadsInmem, QuadsInmem};
pub type BoxQuadsStream = BoxStream<'static, Result<ArcQuad, anyhow::Error>>;
pub struct QuadsStream {
pub stream: BoxQuadsStream,
pub size_hint: SizeHint,
}
impl std::fmt::Debug for QuadsStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QuadsStream")
.field("size_hint", &self.size_hint)
.finish()
}
}
impl From<EcoQuadsInmem> for QuadsStream {
fn from(value: EcoQuadsInmem) -> Self {
Self {
size_hint: SizeHint::with_exact(value.len() as u64),
stream: Box::pin(futures::stream::iter(
value.into_inner().0.into_iter().map(Ok),
)),
}
}
}
#[async_trait]
impl async_convert::TryFrom<QuadsStream> for EcoQuadsInmem {
type Error = anyhow::Error;
async fn try_from(data: QuadsStream) -> Result<Self, Self::Error> {
Ok(QuadsInmem::new(CompatDataset(
data.stream.try_collect::<EcoVec<_>>().await?,
)))
}
}