use std::convert::TryInto;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{fmt, io};
use async_trait::async_trait;
use futures::{join, Future, TryFutureExt};
use safecast::AsType;
use tokio::fs;
use tokio::sync::{
OwnedRwLockMappedWriteGuard, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock,
RwLockMappedWriteGuard, RwLockReadGuard, RwLockWriteGuard,
};
use super::cache::Cache;
use super::Result;
const TMP: &'static str = "_freqfs";
pub type FileReadGuard<'a, F> = RwLockReadGuard<'a, F>;
pub type FileReadGuardOwned<FE, F> = OwnedRwLockReadGuard<Option<FE>, F>;
pub type FileWriteGuard<'a, F> = RwLockMappedWriteGuard<'a, F>;
pub type FileWriteGuardOwned<FE, F> = OwnedRwLockMappedWriteGuard<Option<FE>, F>;
pub trait FileDeref {
type File;
fn as_file(&self) -> &Self::File;
}
impl<'a, F> FileDeref for FileReadGuard<'a, F> {
type File = F;
fn as_file(&self) -> &F {
self.deref()
}
}
impl<'a, F> FileDeref for Arc<FileReadGuard<'a, F>> {
type File = F;
fn as_file(&self) -> &F {
self.deref().as_file()
}
}
impl<FE, F> FileDeref for FileReadGuardOwned<FE, F> {
type File = F;
fn as_file(&self) -> &F {
self.deref()
}
}
impl<FE, F> FileDeref for Arc<FileReadGuardOwned<FE, F>> {
type File = F;
fn as_file(&self) -> &F {
self.deref().as_file()
}
}
impl<'a, F> FileDeref for FileWriteGuard<'a, F> {
type File = F;
fn as_file(&self) -> &F {
self.deref()
}
}
impl<FE, F> FileDeref for FileWriteGuardOwned<FE, F> {
type File = F;
fn as_file(&self) -> &F {
self.deref()
}
}
#[async_trait]
pub trait FileLoad: Send + Sync + Sized + 'static {
async fn load(path: &Path, file: fs::File, metadata: std::fs::Metadata) -> Result<Self>;
}
#[async_trait]
pub trait FileSave<'en>: Send + Sync + Sized + 'static {
async fn save(&'en self, file: &mut fs::File) -> Result<u64>;
}
#[cfg(feature = "stream")]
#[async_trait]
impl<'en, T> FileLoad for T
where
T: destream::de::FromStream<Context = ()> + Send + Sync + 'static,
{
async fn load(_path: &Path, file: fs::File, _metadata: std::fs::Metadata) -> Result<Self> {
tbon::de::read_from((), file)
.map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause))
.await
}
}
#[cfg(feature = "stream")]
#[async_trait]
impl<'en, T> FileSave<'en> for T
where
T: destream::en::ToStream<'en> + Send + Sync + 'static,
{
async fn save(&'en self, file: &mut fs::File) -> Result<u64> {
use futures::TryStreamExt;
let encoded = tbon::en::encode(self)
.map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause))?;
let mut reader = tokio_util::io::StreamReader::new(
encoded
.map_ok(bytes::Bytes::from)
.map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause)),
);
tokio::io::copy(&mut reader, file).await
}
}
#[derive(Copy, Clone)]
enum FileLockState {
Pending,
Read(usize),
Modified(usize),
Deleted(bool),
}
impl FileLockState {
fn is_deleted(&self) -> bool {
match self {
Self::Deleted(_) => true,
_ => false,
}
}
fn is_loaded(&self) -> bool {
match self {
Self::Read(_) | Self::Modified(_) => true,
_ => false,
}
}
fn is_pending(&self) -> bool {
match self {
Self::Pending => true,
_ => false,
}
}
fn upgrade(&mut self) {
let size = match self {
Self::Read(size) | Self::Modified(size) => *size,
_ => unreachable!("upgrade a file not in the cache"),
};
*self = Self::Modified(size);
}
}
pub struct FileLock<FE> {
cache: Arc<Cache<FE>>,
path: Arc<PathBuf>,
state: Arc<RwLock<FileLockState>>,
contents: Arc<RwLock<Option<FE>>>,
}
impl<FE> Clone for FileLock<FE> {
fn clone(&self) -> Self {
Self {
cache: self.cache.clone(),
path: self.path.clone(),
state: self.state.clone(),
contents: self.contents.clone(),
}
}
}
impl<FE> FileLock<FE> {
pub fn new<F>(cache: Arc<Cache<FE>>, path: PathBuf, contents: F, size: usize) -> Self
where
FE: From<F>,
{
Self {
cache,
path: Arc::new(path),
state: Arc::new(RwLock::new(FileLockState::Modified(size))),
contents: Arc::new(RwLock::new(Some(contents.into()))),
}
}
pub fn path(&self) -> &Path {
self.path.as_path()
}
pub fn load<F>(cache: Arc<Cache<FE>>, path: PathBuf) -> Self
where
FE: From<F>,
{
Self {
cache,
path: Arc::new(path),
state: Arc::new(RwLock::new(FileLockState::Pending)),
contents: Arc::new(RwLock::new(None)),
}
}
pub async fn overwrite(&self, other: &Self) -> Result<()>
where
FE: Clone,
{
let (mut this, that) = join!(self.state.write(), other.state.read());
let old_size = match &*this {
FileLockState::Pending | FileLockState::Deleted(_) => 0,
FileLockState::Read(size) | FileLockState::Modified(size) => *size,
};
let new_size = match &*that {
FileLockState::Pending => {
debug_assert!(other.path.exists());
create_dir(self.path.parent().expect("file parent dir")).await?;
match fs::copy(other.path.as_path(), self.path.as_path()).await {
Ok(_) => {}
Err(cause) if cause.kind() == io::ErrorKind::NotFound => {
#[cfg(debug_assertions)]
let message = format!(
"tried to copy a file from a nonexistent source: {}",
other.path.display()
);
#[cfg(not(debug_assertions))]
let message = "tried to copy a file from a nonexistent source";
return Err(io::Error::new(io::ErrorKind::NotFound, message));
}
Err(cause) => return Err(cause),
}
*this = FileLockState::Pending;
0
}
FileLockState::Deleted(_sync) => {
*this = FileLockState::Deleted(true);
0
}
FileLockState::Read(size) | FileLockState::Modified(size) => {
*this = FileLockState::Modified(*size);
*size
}
};
if this.is_loaded() {
let (mut this_data, that_data) = join!(self.contents.write(), other.contents.read());
let that_data = that_data.as_ref().expect("file");
*this_data = Some(FE::clone(&*that_data));
}
self.cache.resize(old_size, new_size);
Ok(())
}
pub async fn read<F>(&self) -> Result<FileReadGuard<F>>
where
F: FileLoad,
FE: AsType<F>,
{
let mut state = self.state.write().await;
if state.is_deleted() {
return Err(deleted());
}
let guard = if state.is_pending() {
let mut contents = self.contents.try_write().expect("file contents");
let (size, entry) = load(&**self.path).await?;
self.cache.bump(&self.path, Some(size));
*state = FileLockState::Read(size);
*contents = Some(entry);
contents.downgrade()
} else {
self.cache.bump(&self.path, None);
self.contents.read().await
};
read_type(guard)
}
pub fn try_read<F>(&self) -> Result<FileReadGuard<F>>
where
F: FileLoad,
FE: AsType<F>,
{
let state = self.state.try_read().map_err(would_block)?;
match &*state {
FileLockState::Pending => Err(would_block("this file is not in the cache")),
FileLockState::Deleted(_sync) => Err(deleted()),
FileLockState::Read(_size) | FileLockState::Modified(_size) => {
self.cache.bump(&self.path, None);
let guard = self.contents.try_read().map_err(would_block)?;
read_type(guard)
}
}
}
pub async fn read_owned<F>(&self) -> Result<FileReadGuardOwned<FE, F>>
where
F: FileLoad,
FE: AsType<F>,
{
let mut state = self.state.write().await;
if state.is_deleted() {
return Err(deleted());
}
let guard = if state.is_pending() {
let mut contents = self
.contents
.clone()
.try_write_owned()
.expect("file contents");
let (size, entry) = load(&**self.path).await?;
self.cache.bump(&self.path, Some(size));
*state = FileLockState::Read(size);
*contents = Some(entry);
contents.downgrade()
} else {
self.cache.bump(&self.path, None);
self.contents.clone().read_owned().await
};
read_type_owned(guard)
}
pub fn try_read_owned<F>(&self) -> Result<FileReadGuardOwned<FE, F>>
where
F: FileLoad,
FE: AsType<F>,
{
let state = self.state.try_read().map_err(would_block)?;
match &*state {
FileLockState::Pending => Err(would_block("this file is not in the cache")),
FileLockState::Deleted(_sync) => Err(deleted()),
FileLockState::Read(_size) | FileLockState::Modified(_size) => {
self.cache.bump(&self.path, None);
let guard = self
.contents
.clone()
.try_read_owned()
.map_err(would_block)?;
read_type_owned(guard)
}
}
}
pub async fn into_read<F>(self) -> Result<FileReadGuardOwned<FE, F>>
where
F: FileLoad,
FE: AsType<F>,
{
let mut state = self.state.write().await;
if state.is_deleted() {
return Err(deleted());
}
let guard = if state.is_pending() {
let mut contents = self.contents.try_write_owned().expect("file contents");
let (size, entry) = load(&**self.path).await?;
self.cache.bump(&self.path, Some(size));
*state = FileLockState::Read(size);
*contents = Some(entry);
contents.downgrade()
} else {
self.cache.bump(&self.path, None);
self.contents.read_owned().await
};
read_type_owned(guard)
}
pub async fn write<F>(&self) -> Result<FileWriteGuard<F>>
where
F: FileLoad,
FE: AsType<F>,
{
let mut state = self.state.write().await;
if state.is_deleted() {
return Err(deleted());
}
let guard = if state.is_pending() {
let mut contents = self.contents.try_write().expect("file contents");
let (size, entry) = load(&**self.path).await?;
self.cache.bump(&self.path, Some(size));
*state = FileLockState::Modified(size);
*contents = Some(entry);
self.cache.bump(&self.path, Some(size));
contents
} else {
state.upgrade();
self.cache.bump(&self.path, None);
self.contents.write().await
};
write_type(guard)
}
pub fn try_write<F>(&self) -> Result<FileWriteGuard<F>>
where
F: FileLoad,
FE: AsType<F>,
{
let mut state = self.state.try_write().map_err(would_block)?;
if state.is_pending() {
Err(would_block("this file is not in the cache"))
} else if state.is_deleted() {
Err(deleted())
} else {
state.upgrade();
self.cache.bump(&self.path, None);
let guard = self.contents.try_write().map_err(would_block)?;
write_type(guard)
}
}
pub async fn write_owned<F>(&self) -> Result<FileWriteGuardOwned<FE, F>>
where
F: FileLoad,
FE: AsType<F>,
{
let mut state = self.state.write().await;
if state.is_deleted() {
return Err(deleted());
}
let guard = if state.is_pending() {
let mut contents = self
.contents
.clone()
.try_write_owned()
.expect("file contents");
let (size, entry) = load(&**self.path).await?;
self.cache.bump(&self.path, Some(size));
*state = FileLockState::Modified(size);
*contents = Some(entry);
contents
} else {
state.upgrade();
self.cache.bump(&self.path, None);
self.contents.clone().write_owned().await
};
write_type_owned(guard)
}
pub fn try_write_owned<F>(&self) -> Result<FileWriteGuardOwned<FE, F>>
where
FE: AsType<F>,
{
let mut state = self.state.try_write().map_err(would_block)?;
if state.is_pending() {
Err(would_block("this file is not in the cache"))
} else if state.is_deleted() {
Err(deleted())
} else {
state.upgrade();
self.cache.bump(&self.path, None);
let guard = self
.contents
.clone()
.try_write_owned()
.map_err(would_block)?;
write_type_owned(guard)
}
}
pub async fn into_write<F>(self) -> Result<FileWriteGuardOwned<FE, F>>
where
F: FileLoad,
FE: AsType<F>,
{
self.write_owned().await
}
pub fn try_into_write<F>(self) -> Result<FileWriteGuardOwned<FE, F>>
where
F: FileLoad,
FE: AsType<F>,
{
self.try_write_owned()
}
pub async fn sync(&self) -> Result<()>
where
FE: for<'a> FileSave<'a>,
{
let mut state = self.state.write().await;
let new_state = match &*state {
FileLockState::Pending => FileLockState::Pending,
FileLockState::Read(size) => FileLockState::Read(*size),
FileLockState::Modified(old_size) => {
#[cfg(feature = "logging")]
log::trace!("sync modified file {}...", self.path.display());
let contents = self.contents.read().await;
let contents = contents.as_ref().expect("file");
let new_size = persist(self.path.as_path(), contents).await?;
self.cache.resize(*old_size, new_size as usize);
FileLockState::Read(new_size as usize)
}
FileLockState::Deleted(needs_sync) => {
if *needs_sync {
if self.path.exists() {
delete_file(&self.path).await?;
}
}
FileLockState::Deleted(false)
}
};
*state = new_state;
Ok(())
}
pub(crate) async fn delete(&self, file_only: bool) {
let mut file_state = self.state.write().await;
let size = match &*file_state {
FileLockState::Pending => 0,
FileLockState::Read(size) => *size,
FileLockState::Modified(size) => *size,
FileLockState::Deleted(_) => return,
};
self.cache.remove(&self.path, size);
*file_state = FileLockState::Deleted(file_only);
}
pub(crate) fn evict(self) -> Option<(usize, impl Future<Output = Result<()>>)>
where
FE: for<'a> FileSave<'a> + 'static,
{
let mut state = self.state.try_write_owned().ok()?;
let (old_size, contents, modified) = match &*state {
FileLockState::Pending => {
return None;
}
FileLockState::Read(size) => {
let contents = self.contents.try_write_owned().ok()?;
(*size, contents, false)
}
FileLockState::Modified(size) => {
let contents = self.contents.try_write_owned().ok()?;
(*size, contents, true)
}
FileLockState::Deleted(_) => unreachable!("evict a deleted file"),
};
let eviction = async move {
if modified {
let contents = contents.as_ref().expect("file");
persist(self.path.as_path(), contents).await?;
}
self.cache.resize(old_size, 0);
*state = FileLockState::Pending;
Ok(())
};
Some((old_size, eviction))
}
}
impl<FE> fmt::Debug for FileLock<FE> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
#[cfg(debug_assertions)]
write!(f, "file at {}", self.path.display())?;
#[cfg(not(debug_assertions))]
f.write_str("a file lock")?;
Ok(())
}
}
async fn load<F: FileLoad, FE: From<F>>(path: &Path) -> Result<(usize, FE)> {
let file = match fs::File::open(path).await {
Ok(file) => file,
Err(cause) if cause.kind() == io::ErrorKind::NotFound => {
#[cfg(debug_assertions)]
let message = format!("there is no file at {}", path.display());
#[cfg(not(debug_assertions))]
let message = "the requested file is not in cache and does not exist on the filesystem";
return Err(io::Error::new(io::ErrorKind::NotFound, message));
}
Err(cause) => return Err(cause),
};
let metadata = file.metadata().await?;
let size = match metadata.len().try_into() {
Ok(size) => size,
_ => {
return Err(io::Error::new(
io::ErrorKind::OutOfMemory,
"this file is too large to load into the cache",
))
}
};
let file = F::load(path, file, metadata).await?;
let entry = FE::from(file);
Ok((size, entry))
}
async fn persist<'a, FE: FileSave<'a>>(path: &Path, file: &'a FE) -> Result<u64> {
let tmp = if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
path.with_extension(format!("{}_{}", ext, TMP))
} else {
path.with_extension(TMP)
};
let size = {
let mut tmp_file = if tmp.exists() {
fs::OpenOptions::new()
.truncate(true)
.write(true)
.open(tmp.as_path())
.await?
} else {
let parent = tmp.parent().expect("dir");
let mut i = 0;
while !parent.exists() {
create_dir(parent).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(i)).await;
i += 1;
}
assert!(parent.exists());
let tmp_file = fs::File::create(tmp.as_path())
.map_err(|cause| {
io::Error::new(
cause.kind(),
format!("failed to create tmp file: {}", cause),
)
})
.await?;
tmp_file
};
assert!(tmp.exists());
assert!(!tmp.is_dir());
let size = file
.save(&mut tmp_file)
.map_err(|cause| {
io::Error::new(cause.kind(), format!("failed to save tmp file: {}", cause))
})
.await?;
size
};
tokio::fs::rename(tmp.as_path(), path)
.map_err(|cause| {
io::Error::new(
cause.kind(),
format!("failed to rename tmp file: {}", cause),
)
})
.await?;
Ok(size)
}
async fn create_dir(path: &Path) -> Result<()> {
if path.exists() {
Ok(())
} else {
match tokio::fs::create_dir_all(path).await {
Ok(()) => Ok(()),
Err(cause) => {
if path.exists() && path.is_dir() {
Ok(())
} else {
return Err(io::Error::new(
cause.kind(),
format!("failed to create directory: {}", cause),
));
}
}
}
}
}
#[inline]
fn read_type<F, T>(maybe_file: RwLockReadGuard<Option<F>>) -> Result<RwLockReadGuard<T>>
where
F: AsType<T>,
{
match RwLockReadGuard::try_map(maybe_file, |file| file.as_ref().expect("file").as_type()) {
Ok(file) => Ok(file),
Err(_) => Err(invalid_data(format!(
"invalid file type, expected {}",
std::any::type_name::<F>()
))),
}
}
#[inline]
fn read_type_owned<F, T>(
maybe_file: OwnedRwLockReadGuard<Option<F>>,
) -> Result<OwnedRwLockReadGuard<Option<F>, T>>
where
F: AsType<T>,
{
match OwnedRwLockReadGuard::try_map(maybe_file, |file| file.as_ref().expect("file").as_type()) {
Ok(file) => Ok(file),
Err(_) => Err(invalid_data(format!(
"invalid file type, expected {}",
std::any::type_name::<F>()
))),
}
}
#[inline]
fn write_type<F, T>(maybe_file: RwLockWriteGuard<Option<F>>) -> Result<RwLockMappedWriteGuard<T>>
where
F: AsType<T>,
{
match RwLockWriteGuard::try_map(maybe_file, |file| {
file.as_mut().expect("file").as_type_mut()
}) {
Ok(file) => Ok(file),
Err(_) => Err(invalid_data(format!(
"invalid file type, expected {}",
std::any::type_name::<F>()
))),
}
}
#[inline]
fn write_type_owned<F, T>(
maybe_file: OwnedRwLockWriteGuard<Option<F>>,
) -> Result<OwnedRwLockMappedWriteGuard<Option<F>, T>>
where
F: AsType<T>,
{
match OwnedRwLockWriteGuard::try_map(maybe_file, |file| {
file.as_mut().expect("file").as_type_mut()
}) {
Ok(file) => Ok(file),
Err(_) => Err(invalid_data(format!(
"invalid file type, expected {}",
std::any::type_name::<F>()
))),
}
}
async fn delete_file(path: &Path) -> Result<()> {
match fs::remove_file(path).await {
Ok(()) => Ok(()),
Err(cause) if cause.kind() == io::ErrorKind::NotFound => {
Ok(())
}
Err(cause) => Err(cause),
}
}
#[inline]
fn deleted() -> io::Error {
io::Error::new(io::ErrorKind::NotFound, "this file has been deleted")
}
#[inline]
fn invalid_data<E>(cause: E) -> io::Error
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
io::Error::new(io::ErrorKind::InvalidData, cause)
}
#[inline]
fn would_block<E>(cause: E) -> io::Error
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
io::Error::new(io::ErrorKind::WouldBlock, cause)
}