mod byte_counter;
pub mod local;
pub mod nextcloud;
use std::error::Error;
use std::fmt::{Debug, Display};
use std::future::Future;
use std::hash::Hash;
use std::io::{self, ErrorKind};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use byte_counter::ByteCounterExt;
use bytes::Bytes;
use futures::{Stream, TryStream, TryStreamExt};
use log::{error, info};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::io::AsyncReadExt;
use tokio_util::io::StreamReader;
use xxhash_rust::xxh3::xxh3_64;
use crate::update::IsModified;
use crate::vfs::{InvalidPathError, Vfs, VirtualPath};
#[derive(Error, Debug, Clone)]
#[error(transparent)]
pub struct FsBackendError(Arc<dyn std::error::Error + Send + Sync>);
#[derive(Error, Debug)]
pub enum ConcreteUpdateApplicationError {
#[error("invalid path provided for update")]
InvalidPath(#[from] InvalidPathError),
#[error("cannot apply an update to the root dir itself")]
PathIsRoot,
}
pub trait FsInstanceDescription: Display {
fn name(&self) -> &str;
}
pub trait Named {
const TYPE_NAME: &'static str;
}
pub trait FSBackend:
Named + Sized + TryFrom<Self::CreationInfo, Error = <Self as FSBackend>::IoError>
{
type SyncInfo: IsModified + Debug + Named + Clone;
type IoError: Error + Send + Sync + 'static + Into<FsBackendError>;
type CreationInfo: Debug + Clone + Serialize + for<'a> Deserialize<'a>;
type Description: FsInstanceDescription
+ From<Self::CreationInfo>
+ Clone
+ Hash
+ PartialEq
+ Debug
+ Serialize
+ for<'a> Deserialize<'a>;
fn validate(info: &Self::CreationInfo) -> impl Future<Output = Result<(), Self::IoError>>;
fn description(&self) -> Self::Description;
fn get_sync_info(
&self,
path: &VirtualPath,
) -> impl Future<Output = Result<Self::SyncInfo, Self::IoError>>;
fn load_virtual(&self) -> impl Future<Output = Result<Vfs<Self::SyncInfo>, Self::IoError>>;
fn read_file(
&self,
path: &VirtualPath,
) -> impl Future<
Output = Result<
impl Stream<Item = Result<Bytes, Self::Error>> + Send + Unpin + 'static,
Self::IoError,
>,
>;
fn write_file<Data: TryStream + Send + 'static + Unpin>(
&self,
path: &VirtualPath,
data: Data,
) -> impl Future<Output = Result<Self::SyncInfo, Self::IoError>>
where
Data::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<Data::Ok>;
fn rm(&self, path: &VirtualPath) -> impl Future<Output = Result<(), Self::IoError>>;
fn mkdir(
&self,
path: &VirtualPath,
) -> impl Future<Output = Result<Self::SyncInfo, Self::IoError>>;
fn rmdir(&self, path: &VirtualPath) -> impl Future<Output = Result<(), Self::IoError>>;
}
impl<T: FSBackend> Named for T {
const TYPE_NAME: &'static str = T::SyncInfo::TYPE_NAME;
}
pub struct ConcreteFileCloneResult<SrcSyncInfo, DstSyncInfo> {
src_file_info: SrcSyncInfo,
dst_file_info: DstSyncInfo,
file_size: u64,
}
impl<SrcSyncInfo, DstSyncInfo> ConcreteFileCloneResult<SrcSyncInfo, DstSyncInfo> {
pub fn new(file_size: u64, src_file_info: SrcSyncInfo, dst_file_info: DstSyncInfo) -> Self {
Self {
file_size,
src_file_info,
dst_file_info,
}
}
pub fn file_size(&self) -> u64 {
self.file_size
}
}
impl<SrcSyncInfo, DstSyncInfo> From<ConcreteFileCloneResult<SrcSyncInfo, DstSyncInfo>>
for (SrcSyncInfo, DstSyncInfo)
{
fn from(value: ConcreteFileCloneResult<SrcSyncInfo, DstSyncInfo>) -> Self {
(value.src_file_info, value.dst_file_info)
}
}
#[derive(Debug)]
pub struct ConcreteFS<Backend: FSBackend> {
backend: Backend,
}
impl<Backend: FSBackend> ConcreteFS<Backend> {
pub fn new(backend: Backend) -> Self {
Self { backend }
}
pub fn backend(&self) -> &Backend {
&self.backend
}
async fn hash_file(&self, path: &VirtualPath) -> Result<u64, FsBackendError> {
let stream = self.backend.read_file(path).await.map_err(|e| e.into())?;
let mut reader = StreamReader::new(stream.map_err(|e| io::Error::new(ErrorKind::Other, e)));
let mut data = Vec::new();
reader.read_to_end(&mut data).await?;
Ok(xxh3_64(&data))
}
pub async fn eq_file<OtherBackend: FSBackend>(
&self,
other: &ConcreteFS<OtherBackend>,
path: &VirtualPath,
) -> Result<bool, (FsBackendError, &'static str)> {
let (local_hash, remote_hash) = tokio::join!(self.hash_file(path), other.hash_file(path));
Ok(local_hash.map_err(|e| (e, Backend::TYPE_NAME))?
== remote_hash.map_err(|e| (e, OtherBackend::TYPE_NAME))?)
}
pub async fn clone_file<RefBackend: FSBackend>(
&self,
ref_concrete: &ConcreteFS<RefBackend>,
path: &VirtualPath,
) -> Result<ConcreteFileCloneResult<RefBackend::SyncInfo, Backend::SyncInfo>, FsBackendError>
{
info!(
"Cloning file {:?} from {} to {}",
path,
RefBackend::TYPE_NAME,
Backend::TYPE_NAME
);
let counter = Arc::new(AtomicU64::new(0));
let stream = ref_concrete
.backend
.read_file(path)
.await
.map_err(|e| {
error!("Failed to read file {path:?}: {e:?}");
e.into()
})?
.count_bytes(counter.clone());
let dst_info = self.backend().write_file(path, stream).await.map_err(|e| {
error!("Failed to clone file {path:?}: {e:?}");
e.into()
})?;
let src_info = ref_concrete
.backend
.get_sync_info(path)
.await
.map_err(|e| {
error!("Failed to read src sync info {path:?}: {e:?}");
e.into()
})?;
let size = counter.load(std::sync::atomic::Ordering::SeqCst);
info!("File {path:?} successfully cloned");
Ok(ConcreteFileCloneResult::new(size, src_info, dst_info))
}
}