pub mod path;
use std::{
fmt::{Debug, Display, Formatter},
io::{self, ErrorKind},
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{Stream, TryStream, TryStreamExt};
use path::{LocalPath, node_from_path_rec};
use serde::{Deserialize, Serialize};
use tokio::{
fs::{self, File},
io::AsyncRead,
};
use tokio_util::io::{ReaderStream, StreamReader};
use crate::{
Error,
update::{IsModified, ModificationState},
vfs::{DirTree, FileMeta, Vfs, VfsNode, VirtualPath},
};
use super::{FSBackend, FsBackendError, FsInstanceDescription, Named};
#[derive(Error, Debug)]
pub enum LocalDirError {
#[error("io error with local fs, on path {path}")]
IoError {
path: String,
#[source]
source: io::Error,
},
#[error("could not parse path {0}")]
InvalidPath(String),
}
impl LocalDirError {
pub fn io<P: Debug>(path: &P, source: io::Error) -> Self {
Self::IoError {
path: format!("{:?}", path),
source,
}
}
pub fn invalid_path<P: Debug>(path: &P) -> Self {
Self::InvalidPath(format!("{:?}", path))
}
}
impl From<LocalDirError> for FsBackendError {
fn from(value: LocalDirError) -> Self {
Self(Arc::new(value))
}
}
pub struct LocalFileStream<R: AsyncRead> {
path: PathBuf,
stream: ReaderStream<R>,
}
impl<R: AsyncRead> LocalFileStream<R> {
fn stream_mut(self: Pin<&mut Self>) -> Pin<&mut ReaderStream<R>> {
unsafe { self.map_unchecked_mut(|s| &mut s.stream) }
}
fn new(reader: R, path: &Path) -> Self {
Self {
path: path.to_owned(),
stream: ReaderStream::new(reader),
}
}
}
impl<R: AsyncRead> Stream for LocalFileStream<R> {
type Item = Result<Bytes, LocalDirError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let path = self.path.to_owned();
self.stream_mut()
.poll_next(cx)
.map_err(|e| LocalDirError::io(&path, e))
}
}
#[derive(Debug)]
pub struct LocalDir {
path: PathBuf,
}
impl LocalDir {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, LocalDirError> {
let path = path.as_ref();
Ok(Self {
path: path.to_path_buf(),
})
}
pub fn full_path(&self, path: &VirtualPath) -> PathBuf {
let path: &str = path.into();
let trimmed = path.trim_start_matches('/');
self.path.join(trimmed)
}
}
impl FSBackend for LocalDir {
type SyncInfo = LocalSyncInfo;
type IoError = LocalDirError;
type CreationInfo = LocalDirCreationInfo;
type Description = LocalDirDescription;
async fn validate(info: &Self::CreationInfo) -> Result<(), Self::IoError> {
if info.0.exists() {
Ok(())
} else {
Err(LocalDirError::invalid_path(&info.0))
}
}
fn description(&self) -> Self::Description {
LocalDirDescription::new(&self.path)
}
async fn get_sync_info(&self, path: &VirtualPath) -> Result<Self::SyncInfo, Self::IoError> {
let full_path = self.full_path(path);
Ok(LocalSyncInfo::new(
full_path
.modification_time()
.map_err(|e| LocalDirError::io(&self.path, e))?
.into(),
))
}
async fn load_virtual(&self) -> Result<Vfs<Self::SyncInfo>, Self::IoError> {
let sync = LocalSyncInfo::new(
self.path
.modification_time()
.map_err(|e| LocalDirError::io(&self.path, e))?
.into(),
);
let root = if self.path.is_file() {
VfsNode::File(FileMeta::new(
self.path
.file_name()
.and_then(|s| s.to_str())
.ok_or(LocalDirError::invalid_path(&self.path))?,
self.path
.file_size()
.map_err(|e| LocalDirError::io(&self.path, e))?,
sync,
))
} else if self.path.is_dir() {
let mut root = DirTree::new("", sync);
let children = self
.path
.read_dir()
.map_err(|e| LocalDirError::io(&self.path, e))?
.map(|entry| entry.unwrap())
.collect::<Vec<_>>();
node_from_path_rec(&mut root, &children)?;
VfsNode::Dir(root)
} else {
return Err(LocalDirError::invalid_path(&self.path));
};
Ok(Vfs::new(root))
}
async fn read_file(
&self,
path: &VirtualPath,
) -> Result<impl Stream<Item = Result<Bytes, Self::IoError>> + 'static, Self::IoError> {
let full_path = self.full_path(path);
File::open(&full_path)
.await
.map(|reader| LocalFileStream::new(reader, &full_path))
.map_err(|e| LocalDirError::io(&full_path, e))
}
async fn write_file<Data: TryStream + Send + 'static + Unpin>(
&self,
path: &VirtualPath,
data: Data,
) -> Result<Self::SyncInfo, Self::IoError>
where
Data::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<Data::Ok>,
{
let full_path = self.full_path(path);
let mut f = File::create(&full_path)
.await
.map_err(|e| LocalDirError::io(&self.path, e))?;
let mut reader = StreamReader::new(
data.map_ok(Bytes::from)
.map_err(|e| io::Error::new(ErrorKind::Other, e)),
);
tokio::io::copy(&mut reader, &mut f)
.await
.map_err(|e| LocalDirError::io(&self.path, e))?;
full_path
.modification_time()
.map(|time| LocalSyncInfo::new(time.into()))
.map_err(|e| LocalDirError::io(&self.path, e))
}
async fn rm(&self, path: &VirtualPath) -> Result<(), Self::IoError> {
let full_path = self.full_path(path);
fs::remove_file(&full_path)
.await
.map_err(|e| LocalDirError::io(&self.path, e))
}
async fn mkdir(&self, path: &VirtualPath) -> Result<Self::SyncInfo, Self::IoError> {
let full_path = self.full_path(path);
fs::create_dir(&full_path)
.await
.map_err(|e| LocalDirError::io(&self.path, e))?;
full_path
.modification_time()
.map(|time| LocalSyncInfo::new(time.into()))
.map_err(|e| LocalDirError::io(&self.path, e))
}
async fn rmdir(&self, path: &VirtualPath) -> Result<(), Self::IoError> {
let full_path = self.full_path(path);
fs::remove_dir_all(&full_path)
.await
.map_err(|e| LocalDirError::io(&self.path, e))
}
}
impl From<io::Error> for FsBackendError {
fn from(value: io::Error) -> Self {
Self(Arc::new(value))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalSyncInfo {
last_modified: DateTime<Utc>,
}
impl LocalSyncInfo {
pub fn new(last_modified: DateTime<Utc>) -> Self {
Self { last_modified }
}
}
impl IsModified for LocalSyncInfo {
fn modification_state(&self, reference: &Self) -> ModificationState {
if self.last_modified != reference.last_modified {
ModificationState::Modified
} else {
ModificationState::ShallowUnmodified
}
}
}
impl Named for LocalSyncInfo {
const TYPE_NAME: &'static str = "local FileSystem";
}
impl<'a> From<&'a LocalSyncInfo> for LocalSyncInfo {
fn from(value: &'a LocalSyncInfo) -> Self {
value.to_owned()
}
}
impl<'a> From<&'a LocalSyncInfo> for () {
fn from(_value: &'a LocalSyncInfo) -> Self {}
}
#[derive(Clone, Hash, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub struct LocalDirDescription {
path: PathBuf,
name: String,
}
impl LocalDirDescription {
pub fn new<P: AsRef<Path>>(path: &P) -> Self {
Self {
path: path.as_ref().to_path_buf(),
name: path
.as_ref()
.canonicalize()
.map(|path| path.file_name().unwrap().to_string_lossy().into_owned())
.unwrap_or("<anonymous>".to_string()),
}
}
}
impl FsInstanceDescription for LocalDirDescription {
fn name(&self) -> &str {
&self.name
}
}
impl Display for LocalDirDescription {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.path.display())
}
}
impl From<LocalDirCreationInfo> for LocalDirDescription {
fn from(value: LocalDirCreationInfo) -> Self {
Self::new(&value.0)
}
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct LocalDirCreationInfo(PathBuf);
impl LocalDirCreationInfo {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
Self(path.as_ref().to_path_buf())
}
}
impl TryFrom<LocalDirCreationInfo> for LocalDir {
type Error = <Self as FSBackend>::IoError;
fn try_from(value: LocalDirCreationInfo) -> Result<Self, Self::Error> {
Self::new(value.0)
}
}