use std::borrow::Cow;
use bytes::Bytes;
use chrono::Utc;
use http_cache_stream_reqwest::Cache;
use http_cache_stream_reqwest::storage::DefaultCacheStorage;
use reqwest::Response;
use reqwest::StatusCode;
use reqwest::header;
use tokio::sync::broadcast;
use tracing::debug;
use url::Url;
use crate::Config;
use crate::Error;
use crate::HttpClient;
use crate::Result;
use crate::TransferEvent;
use crate::USER_AGENT;
use crate::UrlExt;
use crate::backend::StorageBackend;
use crate::backend::Upload;
trait IntoError {
async fn into_error(self) -> Error;
}
impl IntoError for Response {
async fn into_error(self) -> Error {
let status = self.status();
let text: String = match self.text().await {
Ok(text) => text,
Err(e) => return e.into(),
};
Error::Server {
status,
message: text,
}
}
}
pub struct GenericUpload;
impl Upload for GenericUpload {
type Part = ();
async fn put(&self, _: u64, _: u64, _: Bytes) -> Result<Option<Self::Part>> {
unimplemented!()
}
async fn finalize(&self, _: &[Self::Part]) -> Result<()> {
unimplemented!()
}
}
pub struct GenericStorageBackend {
config: Config,
client: HttpClient,
events: Option<broadcast::Sender<TransferEvent>>,
}
impl GenericStorageBackend {
pub fn new(
config: Config,
client: HttpClient,
events: Option<broadcast::Sender<TransferEvent>>,
) -> Self {
Self {
config,
client,
events,
}
}
}
impl StorageBackend for GenericStorageBackend {
type Upload = GenericUpload;
fn config(&self) -> &Config {
&self.config
}
fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
self.client.cache()
}
fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
&self.events
}
fn block_size(&self, _: u64) -> Result<u64> {
if let Some(size) = self.config.block_size {
return Ok(size);
}
Ok(4 * 1024 * 1024)
}
fn is_supported_url(_: &Config, _: &Url) -> bool {
true
}
fn rewrite_url<'a>(_: &Config, url: &'a Url) -> Result<Cow<'a, Url>> {
Ok(Cow::Borrowed(url))
}
fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
{
let mut existing = url.path_segments_mut().expect("url should have path");
existing.pop_if_empty();
existing.extend(segments);
}
Ok(url)
}
async fn head(&self, url: Url, must_exist: bool) -> Result<Response> {
debug!("sending HEAD request for `{url}`", url = url.display());
let response = self
.client
.head(url)
.header(header::USER_AGENT, USER_AGENT)
.header(header::DATE, Utc::now().to_rfc2822())
.send()
.await?;
if !response.status().is_success() {
if !must_exist && response.status() == StatusCode::NOT_FOUND {
return Ok(response);
}
return Err(response.into_error().await);
}
Ok(response)
}
async fn get(&self, url: Url) -> Result<Response> {
debug!("sending GET request for `{url}`", url = url.display());
let response = self
.client
.get(url)
.header(header::USER_AGENT, USER_AGENT)
.header(header::DATE, Utc::now().to_rfc2822())
.send()
.await?;
if !response.status().is_success() {
return Err(response.into_error().await);
}
Ok(response)
}
async fn get_at_offset(&self, url: Url, etag: &str, offset: u64) -> Result<Response> {
debug!(
"sending GET request at offset {offset} for `{url}`",
url = url.display(),
);
let response = self
.client
.get(url)
.header(header::USER_AGENT, USER_AGENT)
.header(header::DATE, Utc::now().to_rfc2822())
.header(header::RANGE, format!("bytes={offset}-"))
.header(header::IF_MATCH, etag)
.send()
.await?;
let status = response.status();
if status == StatusCode::PRECONDITION_FAILED {
return Err(Error::RemoteContentModified);
}
if !status.is_success() {
return Err(response.into_error().await);
}
Ok(response)
}
async fn walk(&self, _: Url) -> Result<Vec<String>> {
Ok(Vec::default())
}
async fn new_upload(&self, _: Url) -> Result<Self::Upload> {
panic!("generic storage backend cannot be used for uploading");
}
}