use std::{
io::{self, BufReader},
sync::Arc,
};
use async_convert::async_trait;
use ecow::eco_vec;
use either::Either;
use futures::TryFutureExt;
use headers::ContentLength;
use http_uri::HttpUri;
use rdf_dynsyn::{
correspondence::SYNTAX_TO_MEDIA_TYPE_CORRESPONDENCE,
parser::DynSynParserFactorySet,
serializer::DynSynSerializerFactorySet,
syntax::invariant::{parsable::DynSynParsableSyntax, serializable::DynSynSerializableSyntax},
};
use sophia_api::prelude::{Dataset, MutableDataset};
use tokio::task::spawn_blocking;
use tower::BoxError;
use tracing::{error, info};
use super::{
binary::BinaryRepresentation,
common::data::{
bytes::BytesData,
bytes_inmem::BytesInmem,
bytes_stream::BytesStream,
quads::QuadsData,
quads_inmem::{EcoQuadsInmem, QuadsInmem},
quads_stream::QuadsStream,
},
};
use crate::representation::{
metadata::{KCompleteContentLength, KContentType, RepresentationMetadata},
Representation,
};
#[derive(Clone, Default)]
pub struct BasicRepresentation<D> {
pub data: D,
pub metadata: RepresentationMetadata,
pub base_uri: Option<HttpUri>,
}
impl<D> std::fmt::Debug for BasicRepresentation<D> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BasicRepresentation")
.field("metadata", &self.metadata)
.finish()
}
}
impl<D> Representation for BasicRepresentation<D> {
type Data = D;
fn data(&self) -> &Self::Data {
&self.data
}
fn metadata(&self) -> &RepresentationMetadata {
&self.metadata
}
fn into_parts(self) -> (Self::Data, RepresentationMetadata) {
(self.data, self.metadata)
}
}
impl From<BasicRepresentation<BytesStream>> for BasicRepresentation<BytesData> {
fn from(rep: BasicRepresentation<BytesStream>) -> Self {
Self {
data: rep.data.into(),
metadata: rep.metadata,
base_uri: rep.base_uri,
}
}
}
impl From<BasicRepresentation<BytesInmem>> for BasicRepresentation<BytesStream> {
fn from(rep: BasicRepresentation<BytesInmem>) -> Self {
Self {
data: rep.data.into(),
metadata: rep.metadata,
base_uri: rep.base_uri,
}
}
}
impl From<BasicRepresentation<BytesData>> for BasicRepresentation<BytesStream> {
fn from(rep: BasicRepresentation<BytesData>) -> Self {
Self {
data: rep.data.into(),
metadata: rep.metadata,
base_uri: rep.base_uri,
}
}
}
impl From<BasicRepresentation<BytesInmem>> for BasicRepresentation<BytesData> {
fn from(rep: BasicRepresentation<BytesInmem>) -> Self {
Self {
data: rep.data.into(),
metadata: rep.metadata,
base_uri: rep.base_uri,
}
}
}
impl From<BasicRepresentation<QuadsStream>> for BasicRepresentation<QuadsData> {
fn from(rep: BasicRepresentation<QuadsStream>) -> Self {
Self {
data: rep.data.into(),
metadata: rep.metadata,
base_uri: rep.base_uri,
}
}
}
impl From<BasicRepresentation<EcoQuadsInmem>> for BasicRepresentation<QuadsData> {
fn from(rep: BasicRepresentation<EcoQuadsInmem>) -> Self {
Self {
data: rep.data.into(),
metadata: rep.metadata,
base_uri: rep.base_uri,
}
}
}
#[async_trait]
impl async_convert::TryFrom<BasicRepresentation<BytesData>> for BasicRepresentation<BytesInmem> {
type Error = anyhow::Error;
async fn try_from(rep: BasicRepresentation<BytesData>) -> Result<Self, Self::Error> {
match rep.into_either() {
Either::Left(v) => async_convert::TryFrom::try_from(v).await,
Either::Right(v) => Ok(v),
}
}
}
#[async_trait]
impl async_convert::TryFrom<BasicRepresentation<BytesStream>> for BasicRepresentation<BytesInmem> {
type Error = anyhow::Error;
async fn try_from(rep: BasicRepresentation<BytesStream>) -> Result<Self, Self::Error> {
Ok(BasicRepresentation {
data: async_convert::TryFrom::try_from(rep.data)
.inspect_err(|e| error!("Error in collecting stream data. Error: {e}"))
.await?,
metadata: rep.metadata,
base_uri: rep.base_uri,
})
}
}
impl BasicRepresentation<BytesData> {
#[inline]
pub fn into_binary(self) -> BinaryRepresentation {
self.into()
}
#[inline]
pub fn into_streaming(self) -> BasicRepresentation<BytesStream> {
self.into()
}
#[inline]
pub fn into_stream_size_capped(mut self, size_limit: u64) -> Self {
self.data = match self.data {
BytesData::Stream(d) => BytesData::Stream(d.into_size_capped(size_limit)),
BytesData::Inmem(d) => BytesData::Inmem(d),
};
self
}
pub fn into_either(
self,
) -> Either<BasicRepresentation<BytesStream>, BasicRepresentation<BytesInmem>> {
match self.data {
BytesData::Stream(sd) => Either::Left(BasicRepresentation {
data: sd,
metadata: self.metadata,
base_uri: self.base_uri,
}),
BytesData::Inmem(imd) => Either::Right(BasicRepresentation {
data: imd,
metadata: self.metadata,
base_uri: self.base_uri,
}),
}
}
}
impl BasicRepresentation<BytesStream> {
#[inline]
pub fn into_binary_streaming(self) -> BinaryRepresentation<BytesStream> {
self.into()
}
#[inline]
pub fn into_binary(self) -> BinaryRepresentation {
self.into_binary_streaming().into()
}
#[inline]
pub fn into_size_capped(mut self, size_limit: u64) -> Self {
self.data = self.data.into_size_capped(size_limit);
self
}
}
impl BasicRepresentation<BytesInmem> {
#[inline]
pub fn into_binary_inmem(self) -> BinaryRepresentation<BytesInmem> {
self.into()
}
#[inline]
pub fn into_binary(self) -> BinaryRepresentation {
self.into_binary_inmem().into()
}
pub async fn try_parse_quads<D>(
&self,
parser_factories: Arc<DynSynParserFactorySet>,
) -> Option<Result<QuadsInmem<D>, BoxError>>
where
D: MutableDataset + Default + Send + 'static,
D::MutationError: Send + Sync + 'static,
{
let parsable_syntax = self
.metadata()
.rdf_syntax::<DynSynParsableSyntax>()
.or_else(|| {
info!("Representation content-type is not quadable.");
None
})?
.value;
let data = self.data().clone();
let base_uri = self.base_uri.clone();
spawn_blocking(move || {
let mut ds = D::default();
parser_factories
.parse_collect_quads(
BufReader::new(data.as_read()),
base_uri.as_ref().map(From::from),
parsable_syntax,
&mut ds,
)
.map(|_| QuadsInmem::new(ds))
.map_err(Into::into)
})
.await
.map_or_else(
|e| {
error!("Error in executing parse task. Error: {e}");
Some(Err(e.into()))
},
Some,
)
}
pub async fn try_from_wrap_serializing_quads<D: Dataset + Send + 'static>(
quads: QuadsInmem<D>,
serializer_factories: Arc<DynSynSerializerFactorySet>,
syntax: DynSynSerializableSyntax,
) -> Result<Self, io::Error> {
let dataset = quads.into_inner();
let serialized_bytes = spawn_blocking(move || {
let mut buf = Vec::new();
serializer_factories
.wrapping_serialize_quads(dataset.quads(), &mut buf, syntax)
.map(|_| buf)
.map_err(|e| e.unwrap_sink_error())
})
.await
.map_err(|e| {
error!("Error in spawning serialization task: {e}");
io::Error::new(io::ErrorKind::Other, e)
})??;
let content_type = SYNTAX_TO_MEDIA_TYPE_CORRESPONDENCE[&syntax.into_subject()]
.value
.clone()
.try_into()
.expect("Must be valid media type.");
Ok(Self {
metadata: RepresentationMetadata::new()
.with::<KContentType>(content_type)
.with::<KCompleteContentLength>(ContentLength(serialized_bytes.len() as u64)),
data: BytesInmem::from(eco_vec![serialized_bytes.into()]),
base_uri: None,
})
}
}