use std::fmt;
use std::marker::PhantomData;
use async_trait::async_trait;
use futures::future::TryFutureExt;
use futures::stream::{self, Stream, StreamExt, TryStreamExt};
use get_size::GetSize;
use safecast::AsType;
use tc_error::*;
use tcgeneric::{Id, ThreadSafe};
use super::{TCResult, Transact, Transaction, TxnId};
pub use freqfs::{FileLoad, FileSave};
pub use txfs::{Key, VERSIONS};
pub type Inner<FE> = freqfs::DirLock<FE>;
pub type BlockRead<FE, B> = txfs::FileVersionRead<TxnId, FE, B>;
pub type BlockWrite<FE, B> = txfs::FileVersionWrite<TxnId, FE, B>;
pub enum DirEntry<FE, B> {
Dir(Dir<FE>),
File(File<FE, B>),
}
impl<FE, B> DirEntry<FE, B> {
pub fn is_dir(&self) -> bool {
match self {
Self::Dir(_) => true,
Self::File(_) => false,
}
}
pub fn is_file(&self) -> bool {
match self {
Self::Dir(_) => false,
Self::File(_) => true,
}
}
}
impl<FE, B> fmt::Debug for DirEntry<FE, B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Dir(dir) => dir.fmt(f),
Self::File(file) => file.fmt(f),
}
}
}
pub struct Dir<FE> {
inner: txfs::Dir<TxnId, FE>,
}
impl<FE> Clone for Dir<FE> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<FE: ThreadSafe + Clone> Dir<FE> {
pub async fn load(txn_id: TxnId, canon: freqfs::DirLock<FE>) -> TCResult<Self> {
txfs::Dir::load(txn_id, canon)
.map_ok(|inner| Self { inner })
.map_err(TCError::from)
.await
}
pub fn into_inner(self) -> Inner<FE> {
self.inner.into_inner()
}
pub async fn contains(&self, txn_id: TxnId, name: &Id) -> TCResult<bool> {
self.inner
.contains(txn_id, name)
.map_err(TCError::from)
.await
}
pub async fn create_dir(&self, txn_id: TxnId, name: Id) -> TCResult<Self> {
self.inner
.create_dir(txn_id, name.into())
.map_ok(|inner| Self { inner })
.map_err(TCError::from)
.await
}
pub async fn create_file<B>(&self, txn_id: TxnId, name: Id) -> TCResult<File<FE, B>>
where
B: GetSize + Clone,
FE: AsType<B>,
{
self.inner
.create_dir(txn_id, name.into())
.map_ok(File::new)
.map_err(TCError::from)
.await
}
pub async fn entry_names(&self, txn_id: TxnId) -> TCResult<impl Iterator<Item = Key>> {
self.inner.dir_names(txn_id).map_err(TCError::from).await
}
pub async fn files<B>(
&self,
txn_id: TxnId,
) -> TCResult<impl Iterator<Item = (Key, File<FE, B>)>> {
let entries = self.inner.iter(txn_id).await?;
Ok(entries.map(|(name, entry)| {
let file = match &*entry {
txfs::DirEntry::Dir(blocks_dir) => File::new(blocks_dir.clone()),
other => panic!("not a block directory: {:?}", other),
};
(name, file)
}))
}
pub async fn get_dir(&self, txn_id: TxnId, name: &Id) -> TCResult<Self> {
if let Some(dir) = self.inner.get_dir(txn_id, name).await? {
Ok(Self { inner: dir.clone() })
} else {
Err(TCError::not_found(name))
}
}
pub async fn get_or_create_dir(&self, txn_id: TxnId, name: Id) -> TCResult<Self> {
if let Some(dir) = self.inner.get_dir(txn_id, &name).await? {
Ok(Self { inner: dir.clone() })
} else {
self.create_dir(txn_id, name).await
}
}
pub async fn get_file<B>(&self, txn_id: TxnId, name: &Id) -> TCResult<File<FE, B>>
where
B: GetSize + Clone,
FE: AsType<B>,
{
if let Some(blocks) = self.inner.get_dir(txn_id, name).await? {
Ok(File::new(blocks.clone()))
} else {
Err(TCError::not_found(name))
}
}
pub async fn is_empty(&self, txn_id: TxnId) -> TCResult<bool> {
self.inner.is_empty(txn_id).map_err(TCError::from).await
}
pub async fn entries<B>(
&self,
txn_id: TxnId,
) -> TCResult<impl Stream<Item = TCResult<(Key, DirEntry<FE, B>)>> + Unpin + Send + '_> {
let entries = self.inner.iter(txn_id).await?;
let entries = stream::iter(entries).then(move |(name, entry)| async move {
let entry = match &*entry {
txfs::DirEntry::Dir(dir) => {
if dir.is_empty(txn_id).await? {
panic!("an empty filesystem directory is ambiguous");
} else if dir.contains_files(txn_id).await? {
DirEntry::File(File::new(dir.clone()))
} else {
DirEntry::Dir(Self { inner: dir.clone() })
}
}
txfs::DirEntry::File(block) => panic!(
"a transactional Dir should never contain blocks: {:?}",
block
),
};
Ok((name, entry))
});
Ok(Box::pin(entries))
}
pub async fn trim(&self, txn_id: TxnId) -> TCResult<()> {
let mut to_delete = Vec::new();
let entries = self.inner.iter(txn_id).await?;
for (name, entry) in entries {
if let txfs::DirEntry::Dir(dir) = &*entry {
if dir.is_empty(txn_id).await? {
to_delete.push(name);
}
}
}
for name in to_delete {
self.inner.delete(txn_id, (&*name).clone()).await?;
}
Ok(())
}
}
impl<FE: ThreadSafe + Clone + for<'a> FileSave<'a>> Dir<FE> {
pub async fn commit(&self, txn_id: TxnId, recursive: bool) {
self.inner.commit(txn_id, recursive).await
}
pub async fn rollback(&self, txn_id: TxnId, recursive: bool) {
self.inner.rollback(txn_id, recursive).await
}
pub async fn finalize(&self, txn_id: TxnId) {
self.inner.finalize(txn_id).await
}
}
impl<FE> fmt::Debug for Dir<FE> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt(f)
}
}
pub struct File<FE, B> {
inner: txfs::Dir<TxnId, FE>,
block: PhantomData<B>,
}
impl<FE, B> Clone for File<FE, B> {
fn clone(&self) -> Self {
Self::new(self.inner.clone())
}
}
impl<FE, B> File<FE, B> {
fn new(inner: txfs::Dir<TxnId, FE>) -> Self {
Self {
inner,
block: PhantomData,
}
}
pub fn into_inner(self) -> Inner<FE>
where
FE: Send + Sync,
{
self.inner.into_inner()
}
}
impl<FE, B> File<FE, B>
where
FE: for<'a> FileSave<'a> + AsType<B> + Clone + Send + Sync,
B: FileLoad + GetSize + Clone,
{
pub async fn block_ids(&self, txn_id: TxnId) -> TCResult<impl Iterator<Item = Key>> {
self.inner.file_names(txn_id).map_err(TCError::from).await
}
pub async fn create_block(
&self,
txn_id: TxnId,
name: Id,
contents: B,
) -> TCResult<BlockWrite<FE, B>> {
let block = self
.inner
.create_file(txn_id, name.into(), contents)
.await?;
block.into_write(txn_id).map_err(TCError::from).await
}
pub async fn delete_block(&self, txn_id: TxnId, name: Id) -> TCResult<bool> {
self.inner.delete(txn_id, name).map_err(TCError::from).await
}
pub async fn iter(
&self,
txn_id: TxnId,
) -> TCResult<impl Stream<Item = TCResult<(Key, BlockRead<FE, B>)>> + Send + Unpin + '_> {
self.inner
.files(txn_id)
.map_ok(|blocks| blocks.map_err(TCError::from))
.map_err(TCError::from)
.await
}
pub async fn read_block(&self, txn_id: TxnId, name: &Id) -> TCResult<BlockRead<FE, B>> {
self.inner
.read_file(txn_id, name)
.map_err(TCError::from)
.await
}
pub async fn write_block(&self, txn_id: TxnId, name: &Id) -> TCResult<BlockWrite<FE, B>> {
self.inner
.write_file(txn_id, name)
.map_err(TCError::from)
.await
}
}
#[async_trait]
impl<FE, B> Transact for File<FE, B>
where
FE: for<'a> FileSave<'a> + AsType<B> + Clone + Send + Sync,
B: FileLoad + GetSize + Clone,
Self: Send + Sync,
{
type Commit = ();
async fn commit(&self, txn_id: TxnId) -> Self::Commit {
self.inner.commit(txn_id, true).await
}
async fn rollback(&self, txn_id: &TxnId) {
self.inner.rollback(*txn_id, true).await
}
async fn finalize(&self, txn_id: &TxnId) {
self.inner.finalize(*txn_id).await
}
}
impl<FE, B> fmt::Debug for File<FE, B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"a transactional File with blocks of type {}",
std::any::type_name::<B>()
)
}
}
#[async_trait]
pub trait Persist<FE: ThreadSafe + Clone>: Sized {
type Txn: Transaction<FE>;
type Schema: Clone + Send + Sync;
async fn create(txn_id: TxnId, schema: Self::Schema, store: Dir<FE>) -> TCResult<Self>;
async fn load(txn_id: TxnId, schema: Self::Schema, store: Dir<FE>) -> TCResult<Self>;
async fn load_or_create(txn_id: TxnId, schema: Self::Schema, store: Dir<FE>) -> TCResult<Self> {
if store.is_empty(txn_id).await? {
Self::create(txn_id, schema, store).await
} else {
Self::load(txn_id, schema, store).await
}
}
fn dir(&self) -> Inner<FE>;
}
#[async_trait]
pub trait CopyFrom<FE: ThreadSafe + Clone, I>: Persist<FE> {
async fn copy_from(
txn: &<Self as Persist<FE>>::Txn,
store: Dir<FE>,
instance: I,
) -> TCResult<Self>;
}
#[async_trait]
pub trait Restore<FE: ThreadSafe + Clone>: Persist<FE> {
async fn restore(&self, txn_id: TxnId, backup: &Self) -> TCResult<()>;
}