use async_trait::async_trait;
use destream::en;
use freqfs::DirLock;
use safecast::CastInto;
use tc_error::*;
use tc_value::{Link, ToUrl, Value};
use tcgeneric::{Id, Map};
use public::StateInstance;
pub mod fs;
mod id;
pub mod public;
pub use id::{TxnId, MIN_ID};
pub mod hash {
use async_trait::async_trait;
use futures::future::TryFutureExt;
use futures::stream::{FuturesOrdered, TryStreamExt};
use tc_error::TCResult;
use tcgeneric::{Map, Tuple};
use super::TxnId;
pub use async_hash::generic_array::GenericArray;
pub use async_hash::*;
#[async_trait]
pub trait AsyncHash: Send + Sync {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>>;
}
#[async_trait]
impl<T: AsyncHash> AsyncHash for Map<T> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
if self.is_empty() {
return Ok(default_hash::<Sha256>());
}
let mut entries: FuturesOrdered<_> = self
.into_iter()
.map(|(key, value)| {
value.hash(txn_id).map_ok(move |value_hash| {
let mut hasher = Sha256::default();
hasher.update(Hash::<Sha256>::hash(key));
hasher.update(value_hash);
hasher.finalize()
})
})
.collect();
let mut hasher = Sha256::default();
while let Some(hash) = entries.try_next().await? {
hasher.update(hash);
}
Ok(hasher.finalize())
}
}
#[async_trait]
impl<T: AsyncHash> AsyncHash for Tuple<T> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
if self.is_empty() {
return Ok(default_hash::<Sha256>());
}
let mut items: FuturesOrdered<_> =
self.into_iter().map(|item| item.hash(txn_id)).collect();
let mut hasher = Sha256::default();
while let Some(hash) = items.try_next().await? {
hasher.update(hash);
}
Ok(hasher.finalize())
}
}
}
pub mod lock {
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use super::TxnId;
pub use txn_lock::scalar::{TxnLockReadGuard, TxnLockWriteGuard};
pub use txn_lock::semaphore::{PermitRead, PermitWrite};
pub type Semaphore<C, R> = txn_lock::semaphore::Semaphore<TxnId, C, R>;
pub type TxnLock<T> = txn_lock::scalar::TxnLock<TxnId, T>;
pub type TxnLockVersionGuard<T> = Arc<T>;
pub type TxnMessageQueue<M> = txn_lock::queue::message::MessageQueue<TxnId, M>;
pub type TxnMapLock<K, V> = txn_lock::map::TxnMapLock<TxnId, K, V>;
pub type TxnMapLockEntry<K, V> = txn_lock::map::Entry<TxnId, K, V>;
pub type TxnMapLockIter<K, V> = txn_lock::map::Iter<TxnId, K, V>;
pub type TxnMapLockVersionGuard<K, V> = HashMap<K, V>;
pub type TxnSetLock<T> = txn_lock::set::TxnSetLock<TxnId, T>;
pub type TxnSetLockIter<T> = txn_lock::set::Iter<T>;
pub type TxnSetLockVersion<T> = HashSet<T>;
pub type TxnTaskQueue<I, O> = txn_lock::queue::task::TaskQueue<TxnId, I, O>;
}
#[async_trait]
pub trait IntoView<'en, FE> {
type Txn: Transaction<FE>;
type View: en::IntoStream<'en> + Sized + 'en;
async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View>;
}
#[async_trait]
pub trait Transact {
type Commit: Send + Sync;
async fn commit(&self, txn_id: TxnId) -> Self::Commit;
async fn rollback(&self, txn_id: &TxnId);
async fn finalize(&self, txn_id: &TxnId);
}
#[async_trait]
pub trait Transaction<FE>: Clone + Sized + Send + Sync + 'static {
fn id(&self) -> &TxnId;
async fn context(&self) -> TCResult<DirLock<FE>>;
fn subcontext<I: Into<Id> + Send>(&self, id: I) -> Self;
fn subcontext_unique(&self) -> Self;
}
#[async_trait]
pub trait Gateway<State: StateInstance<Txn = Self>>: Transaction<State::FE> {
async fn get<'a, L, V>(&'a self, link: L, key: V) -> TCResult<State>
where
L: Into<ToUrl<'a>> + Send,
V: CastInto<Value> + Send;
async fn put<'a, L, K, V>(&'a self, link: L, key: K, value: V) -> TCResult<()>
where
L: Into<ToUrl<'a>> + Send,
K: CastInto<Value> + Send,
V: CastInto<State> + Send;
async fn post<'a, L, P>(&'a self, link: L, params: P) -> TCResult<State>
where
L: Into<ToUrl<'a>> + Send,
P: CastInto<Map<State>> + Send;
async fn delete<'a, L, V>(&'a self, link: L, key: V) -> TCResult<()>
where
L: Into<ToUrl<'a>> + Send,
V: CastInto<Value> + Send;
}
#[async_trait]
pub trait Replicate<Txn>: Send + Sync {
async fn replicate(&self, txn: &Txn, source: Link) -> TCResult<hash::Output<hash::Sha256>>;
}