1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
//! Provides traits and data structures to define a distributed transaction context.
//!
//! This library is part of TinyChain: [http://github.com/haydnv/tinychain](http://github.com/haydnv/tinychain)
use async_hash::Output;
use async_trait::async_trait;
use destream::en;
use freqfs::DirLock;
use safecast::CastInto;
use tc_error::*;
use tc_value::{ToUrl, Value};
use tcgeneric::Id;
pub use id::{TxnId, MIN_ID};
use public::StateInstance;
pub mod fs;
mod id;
pub mod public;
pub mod lock {
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use super::TxnId;
pub use txn_lock::semaphore::{PermitRead, PermitWrite};
/// A semaphore used to gate access to a transactional resource
pub type Semaphore<C, R> = txn_lock::semaphore::Semaphore<TxnId, C, R>;
/// A transactional read-write lock on a scalar value
pub type TxnLock<T> = txn_lock::scalar::TxnLock<TxnId, T>;
/// A read guard on a committed transactional version
pub type TxnLockVersionGuard<T> = Arc<T>;
/// A transactional message queue.
pub type TxnMessageQueue<M> = txn_lock::queue::message::MessageQueue<TxnId, M>;
/// A transactional read-write lock on a key-value map
pub type TxnMapLock<K, V> = txn_lock::map::TxnMapLock<TxnId, K, V>;
/// A read guard on a committed transactional version of a set
pub type TxnMapLockVersionGuard<K, V> = HashMap<K, V>;
/// A transactional read-write lock on a set of values
pub type TxnSetLock<T> = txn_lock::set::TxnSetLock<TxnId, T>;
/// A read guard on a committed transactional version of a set
pub type TxnSetLockVersionGuard<T> = HashSet<T>;
/// A transactional task queue.
pub type TxnTaskQueue<I, O> = txn_lock::queue::task::TaskQueue<TxnId, I, O>;
}
/// Defines a method to compute the hash of this state as of a given [`TxnId`]
#[async_trait]
pub trait AsyncHash {
/// Compute the hash of this state as of a given [`TxnId`]
async fn hash(self, txn_id: TxnId) -> TCResult<Output<async_hash::Sha256>>;
}
/// Access a view which can be encoded with [`en::IntoStream`].
#[async_trait]
pub trait IntoView<'en, FE> {
/// The type of [`Transaction`] which this state supports
type Txn: Transaction<FE>;
/// The type of encodable view returned by `into_view`
type View: en::IntoStream<'en> + Sized + 'en;
/// Return a `View` which can be encoded with [`en::IntoStream`].
async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View>;
}
/// Transaction lifecycle callbacks
#[async_trait]
pub trait Transact {
/// A guard which blocks concurrent commits
type Commit: Send + Sync;
/// Commit this transaction.
async fn commit(&self, txn_id: TxnId) -> Self::Commit;
/// Roll back this transaction.
async fn rollback(&self, txn_id: &TxnId);
/// Delete any version data specific to this transaction.
// TODO: this should take an owned TxnId
async fn finalize(&self, txn_id: &TxnId);
}
/// Common transaction context properties.
#[async_trait]
pub trait Transaction<FE>: Clone + Sized + Send + Sync + 'static {
/// The [`TxnId`] of this transaction context.
fn id(&self) -> &TxnId;
/// Allows locking the filesystem directory of this transaction context,
/// e.g. to cache un-committed state or to compute an intermediate result.
async fn context(&self) -> TCResult<DirLock<FE>>;
/// Create a new transaction context with the given `id`.
fn subcontext<I: Into<Id> + Send>(&self, id: I) -> Self;
/// Create a new transaction subcontext with its own unique [`Dir`].
fn subcontext_unique(&self) -> Self;
}
/// A transactional remote procedure call client
#[async_trait]
pub trait RPCClient<State: StateInstance<Txn = Self>>: Transaction<State::FE> {
/// Resolve a GET op within this transaction context.
async fn get<'a, L, V>(&'a self, link: L, key: V) -> TCResult<State>
where
L: Into<ToUrl<'a>> + Send,
V: CastInto<Value> + Send;
/// Resolve a PUT op within this transaction context.
async fn put<'a, L, K, V>(&'a self, link: L, key: K, value: V) -> TCResult<()>
where
L: Into<ToUrl<'a>> + Send,
K: CastInto<Value> + Send,
V: CastInto<State> + Send;
/// Resolve a POST op within this transaction context.
async fn post<'a, L, P>(&'a self, link: L, params: P) -> TCResult<State>
where
L: Into<ToUrl<'a>> + Send,
P: CastInto<State> + Send;
/// Resolve a DELETE op within this transaction context.
async fn delete<'a, L, V>(&'a self, link: L, key: V) -> TCResult<()>
where
L: Into<ToUrl<'a>> + Send,
V: CastInto<Value> + Send;
}