use std::io;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use derive_more::{Display, From};
use futures::stream::TryStreamExt;
use crate::sha256::{Sha256VerifyError, VerifyStream};
use super::{LFSObject, Storage, StorageKey, StorageStream};
#[derive(Debug, Display, From)]
enum Error {
Verify(Sha256VerifyError),
Io(io::Error),
}
impl std::error::Error for Error {}
pub struct Backend<S> {
storage: Arc<S>,
}
impl<S> Backend<S> {
pub fn new(storage: S) -> Self {
Backend {
storage: Arc::new(storage),
}
}
}
#[async_trait]
impl<S> Storage for Backend<S>
where
S: Storage + Send + Sync + 'static,
S::Error: 'static,
{
type Error = S::Error;
async fn get(
&self,
key: &StorageKey,
) -> Result<Option<LFSObject>, Self::Error> {
match self.storage.get(key).await? {
Some(obj) => {
let (len, stream) = obj.into_parts();
let stream = VerifyStream::new(
stream.map_err(Error::from),
len,
*key.oid(),
);
let key = key.clone();
let storage = self.storage.clone();
let stream = stream.map_err(move |err| {
match err {
Error::Verify(err) => {
log::error!(
"Found corrupted object {} ({})",
key.oid(),
err
);
let storage = storage.clone();
let key = key.clone();
tokio::spawn(
async move { storage.delete(&key).await },
);
io::Error::new(
io::ErrorKind::Other,
"found corrupted object",
)
}
Error::Io(err) => err,
}
});
Ok(Some(LFSObject::new(len, Box::pin(stream))))
}
None => Ok(None),
}
}
async fn put(
&self,
key: StorageKey,
value: LFSObject,
) -> Result<(), Self::Error> {
let (len, stream) = value.into_parts();
let stream =
VerifyStream::new(stream.map_err(Error::from), len, *key.oid())
.map_err(move |err| match err {
Error::Verify(err) => {
io::Error::new(io::ErrorKind::Other, err)
}
Error::Io(err) => io::Error::new(io::ErrorKind::Other, err),
});
self.storage
.put(key, LFSObject::new(len, Box::pin(stream)))
.await
}
async fn size(&self, key: &StorageKey) -> Result<Option<u64>, Self::Error> {
self.storage.size(key).await
}
async fn delete(&self, key: &StorageKey) -> Result<(), Self::Error> {
self.storage.delete(key).await
}
fn list(&self) -> StorageStream<(StorageKey, u64), Self::Error> {
self.storage.list()
}
async fn total_size(&self) -> Option<u64> {
self.storage.total_size().await
}
async fn max_size(&self) -> Option<u64> {
self.storage.max_size().await
}
fn public_url(&self, key: &StorageKey) -> Option<String> {
self.storage.public_url(key)
}
async fn upload_url(
&self,
key: &StorageKey,
expires_in: Duration,
) -> Option<String> {
self.storage.upload_url(key, expires_in).await
}
}