use crate::SafeUrlPath;
#[cfg(feature = "http")]
use crate::{HttpTransport, HttpTransportBuilder};
use async_trait::async_trait;
use bytes::Bytes;
use dyn_clone::DynClone;
use futures::TryStreamExt;
use futures_core::Stream;
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use std::io::{self, ErrorKind};
use std::path::Path;
use std::pin::Pin;
use tokio_util::io::ReaderStream;
use url::Url;
pub type TransportStream = Pin<Box<dyn Stream<Item = Result<Bytes, TransportError>> + Send + Sync>>;
#[async_trait]
pub trait IntoVec<E> {
async fn into_vec(self) -> Result<Vec<u8>, E>;
}
#[async_trait]
impl<S: Stream<Item = Result<Bytes, E>> + Send, E: Send> IntoVec<E> for S {
async fn into_vec(self) -> Result<Vec<u8>, E> {
self.try_fold(Vec::new(), |mut acc, bytes| {
acc.extend(bytes.as_ref());
std::future::ready(Ok(acc))
})
.await
}
}
#[async_trait]
pub trait Transport: Debug + DynClone + Send + Sync {
async fn fetch(&self, url: Url) -> Result<TransportStream, TransportError>;
}
dyn_clone::clone_trait_object!(Transport);
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[non_exhaustive]
pub enum TransportErrorKind {
UnsupportedUrlScheme,
FileNotFound,
Other,
}
impl Display for TransportErrorKind {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
TransportErrorKind::UnsupportedUrlScheme => "unsupported URL scheme",
TransportErrorKind::FileNotFound => "file not found",
TransportErrorKind::Other => "other",
}
)
}
}
#[derive(Debug)]
pub struct TransportError {
kind: TransportErrorKind,
url: String,
source: Option<Box<dyn Error + Send + Sync>>,
}
impl TransportError {
pub fn new<S>(kind: TransportErrorKind, url: S) -> Self
where
S: AsRef<str>,
{
Self {
kind,
url: url.as_ref().into(),
source: None,
}
}
pub fn new_with_cause<S, E>(kind: TransportErrorKind, url: S, source: E) -> Self
where
E: Into<Box<dyn Error + Send + Sync>>,
S: AsRef<str>,
{
Self {
kind,
url: url.as_ref().into(),
source: Some(source.into()),
}
}
pub fn kind(&self) -> TransportErrorKind {
self.kind
}
pub fn url(&self) -> &str {
self.url.as_str()
}
}
impl Display for TransportError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(e) = self.source.as_ref() {
write!(
f,
"Transport '{}' error fetching '{}': {e}",
self.kind, self.url
)
} else {
write!(f, "Transport '{}' error fetching '{}'", self.kind, self.url)
}
}
}
impl Error for TransportError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.source.as_ref().map(|e| e.as_ref() as &dyn Error)
}
}
#[derive(Debug, Clone, Copy)]
pub struct FilesystemTransport;
impl FilesystemTransport {
async fn open(
file_path: impl AsRef<Path>,
) -> Result<impl Stream<Item = Result<Bytes, io::Error>> + Send, io::Error> {
let f = tokio::fs::File::open(file_path).await?;
let reader = tokio::io::BufReader::new(f);
let stream = ReaderStream::new(reader);
Ok(stream)
}
}
#[async_trait]
impl Transport for FilesystemTransport {
async fn fetch(&self, url: Url) -> Result<TransportStream, TransportError> {
if url.scheme() != "file" {
return Err(TransportError::new(
TransportErrorKind::UnsupportedUrlScheme,
url,
));
}
let file_path = url.safe_url_filepath();
let stream = Self::open(file_path).await;
let map_io_err = move |e: io::Error| -> TransportError {
let kind = match e.kind() {
ErrorKind::NotFound => TransportErrorKind::FileNotFound,
_ => TransportErrorKind::Other,
};
TransportError::new_with_cause(kind, url.clone(), e)
};
Ok(Box::pin(
stream.map_err(map_io_err.clone())?.map_err(map_io_err),
))
}
}
#[derive(Debug, Clone, Copy)]
pub struct DefaultTransport {
file: FilesystemTransport,
#[cfg(feature = "http")]
http: HttpTransport,
}
impl Default for DefaultTransport {
fn default() -> Self {
Self {
file: FilesystemTransport,
#[cfg(feature = "http")]
http: HttpTransport::default(),
}
}
}
impl DefaultTransport {
pub fn new() -> Self {
Self::default()
}
}
#[cfg(feature = "http")]
impl DefaultTransport {
pub fn new_with_http_settings(builder: HttpTransportBuilder) -> Self {
Self {
file: FilesystemTransport,
http: builder.build(),
}
}
}
#[async_trait]
impl Transport for DefaultTransport {
async fn fetch(&self, url: Url) -> Result<TransportStream, TransportError> {
match url.scheme() {
"file" => self.file.fetch(url).await,
"http" | "https" => self.handle_http(url).await,
_ => Err(TransportError::new(
TransportErrorKind::UnsupportedUrlScheme,
url,
)),
}
}
}
impl DefaultTransport {
#[cfg(not(feature = "http"))]
#[allow(clippy::trivially_copy_pass_by_ref, clippy::unused_self)]
async fn handle_http(&self, url: Url) -> Result<TransportStream, TransportError> {
Err(TransportError::new_with_cause(
TransportErrorKind::UnsupportedUrlScheme,
url,
"The library was not compiled with the http feature enabled.",
))
}
#[cfg(feature = "http")]
async fn handle_http(&self, url: Url) -> Result<TransportStream, TransportError> {
self.http.fetch(url).await
}
}