tc_transact/
lib.rs

1//! Provides traits and data structures to define a distributed transaction context.
2//!
3//! This library is part of TinyChain: [http://github.com/haydnv/tinychain](http://github.com/haydnv/tinychain)
4
5use async_trait::async_trait;
6use destream::en;
7use freqfs::DirLock;
8use safecast::CastInto;
9
10use tc_error::*;
11use tc_value::{Link, ToUrl, Value};
12use tcgeneric::{Id, Map};
13
14use public::StateInstance;
15
16pub mod fs;
17mod id;
18pub mod public;
19
20pub use id::{TxnId, MIN_ID};
21
22pub mod hash {
23    use async_trait::async_trait;
24    use futures::future::TryFutureExt;
25    use futures::stream::{FuturesOrdered, TryStreamExt};
26
27    use tc_error::TCResult;
28    use tcgeneric::{Map, Tuple};
29
30    use super::TxnId;
31
32    pub use async_hash::generic_array::GenericArray;
33    pub use async_hash::*;
34
35    /// Defines a method to compute the hash of this state as of a given [`TxnId`]
36    #[async_trait]
37    pub trait AsyncHash: Send + Sync {
38        /// Compute the hash of this state as of a given [`TxnId`]
39        async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>>;
40    }
41
42    #[async_trait]
43    impl<T: AsyncHash> AsyncHash for Map<T> {
44        async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
45            if self.is_empty() {
46                return Ok(default_hash::<Sha256>());
47            }
48
49            let mut entries: FuturesOrdered<_> = self
50                .into_iter()
51                .map(|(key, value)| {
52                    value.hash(txn_id).map_ok(move |value_hash| {
53                        let mut hasher = Sha256::default();
54                        hasher.update(Hash::<Sha256>::hash(key));
55                        hasher.update(value_hash);
56                        hasher.finalize()
57                    })
58                })
59                .collect();
60
61            let mut hasher = Sha256::default();
62            while let Some(hash) = entries.try_next().await? {
63                hasher.update(hash);
64            }
65            Ok(hasher.finalize())
66        }
67    }
68
69    #[async_trait]
70    impl<T: AsyncHash> AsyncHash for Tuple<T> {
71        async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
72            if self.is_empty() {
73                return Ok(default_hash::<Sha256>());
74            }
75
76            let mut items: FuturesOrdered<_> =
77                self.into_iter().map(|item| item.hash(txn_id)).collect();
78
79            let mut hasher = Sha256::default();
80            while let Some(hash) = items.try_next().await? {
81                hasher.update(hash);
82            }
83            Ok(hasher.finalize())
84        }
85    }
86}
87
88pub mod lock {
89    use std::collections::{HashMap, HashSet};
90    use std::sync::Arc;
91
92    use super::TxnId;
93
94    pub use txn_lock::scalar::{TxnLockReadGuard, TxnLockWriteGuard};
95    pub use txn_lock::semaphore::{PermitRead, PermitWrite};
96
97    /// A semaphore used to gate access to a transactional resource
98    pub type Semaphore<C, R> = txn_lock::semaphore::Semaphore<TxnId, C, R>;
99
100    /// A transactional read-write lock on a scalar value
101    pub type TxnLock<T> = txn_lock::scalar::TxnLock<TxnId, T>;
102
103    /// A read guard on a committed transactional version
104    pub type TxnLockVersionGuard<T> = Arc<T>;
105
106    /// A transactional message queue.
107    pub type TxnMessageQueue<M> = txn_lock::queue::message::MessageQueue<TxnId, M>;
108
109    /// A transactional read-write lock on a key-value map
110    pub type TxnMapLock<K, V> = txn_lock::map::TxnMapLock<TxnId, K, V>;
111
112    /// An entry in a transactional key-value map
113    pub type TxnMapLockEntry<K, V> = txn_lock::map::Entry<TxnId, K, V>;
114
115    /// An iterator over the contents of a transactional key-value map
116    pub type TxnMapLockIter<K, V> = txn_lock::map::Iter<TxnId, K, V>;
117
118    /// A read guard on a committed transactional version of a set
119    pub type TxnMapLockVersionGuard<K, V> = HashMap<K, V>;
120
121    /// A transactional read-write lock on a set of values
122    pub type TxnSetLock<T> = txn_lock::set::TxnSetLock<TxnId, T>;
123
124    /// A read guard on a version of a set of values
125    pub type TxnSetLockIter<T> = txn_lock::set::Iter<T>;
126
127    /// A read guard on a committed transactional version of a set
128    pub type TxnSetLockVersion<T> = HashSet<T>;
129
130    /// A transactional task queue.
131    pub type TxnTaskQueue<I, O> = txn_lock::queue::task::TaskQueue<TxnId, I, O>;
132}
133
134/// Access a view which can be encoded with [`en::IntoStream`].
135#[async_trait]
136pub trait IntoView<'en, FE> {
137    /// The type of [`Transaction`] which this state supports
138    type Txn: Transaction<FE>;
139
140    /// The type of encodable view returned by `into_view`
141    type View: en::IntoStream<'en> + Sized + 'en;
142
143    /// Return a `View` which can be encoded with [`en::IntoStream`].
144    async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View>;
145}
146
147/// Transaction lifecycle callbacks
148#[async_trait]
149pub trait Transact {
150    /// A guard which blocks concurrent commits
151    type Commit: Send + Sync;
152
153    /// Commit this transaction.
154    async fn commit(&self, txn_id: TxnId) -> Self::Commit;
155
156    /// Roll back this transaction.
157    async fn rollback(&self, txn_id: &TxnId);
158
159    /// Delete any version data specific to this transaction.
160    // TODO: this should take an owned TxnId
161    async fn finalize(&self, txn_id: &TxnId);
162}
163
164/// Common transaction context properties.
165#[async_trait]
166pub trait Transaction<FE>: Clone + Sized + Send + Sync + 'static {
167    /// The [`TxnId`] of this transaction context.
168    fn id(&self) -> &TxnId;
169
170    /// Allows locking the filesystem directory of this transaction context,
171    /// e.g. to cache un-committed state or to compute an intermediate result.
172    async fn context(&self) -> TCResult<DirLock<FE>>;
173
174    /// Create a new transaction context with the given `id`.
175    fn subcontext<I: Into<Id> + Send>(&self, id: I) -> Self;
176
177    /// Create a new transaction subcontext with its own unique workspace directory.
178    fn subcontext_unique(&self) -> Self;
179}
180
181/// A transactional remote procedure call client
182#[async_trait]
183pub trait Gateway<State: StateInstance<Txn = Self>>: Transaction<State::FE> {
184    /// Resolve a GET op within this transaction context.
185    async fn get<'a, L, V>(&'a self, link: L, key: V) -> TCResult<State>
186    where
187        L: Into<ToUrl<'a>> + Send,
188        V: CastInto<Value> + Send;
189
190    /// Resolve a PUT op within this transaction context.
191    async fn put<'a, L, K, V>(&'a self, link: L, key: K, value: V) -> TCResult<()>
192    where
193        L: Into<ToUrl<'a>> + Send,
194        K: CastInto<Value> + Send,
195        V: CastInto<State> + Send;
196
197    /// Resolve a POST op within this transaction context.
198    async fn post<'a, L, P>(&'a self, link: L, params: P) -> TCResult<State>
199    where
200        L: Into<ToUrl<'a>> + Send,
201        P: CastInto<Map<State>> + Send;
202
203    /// Resolve a DELETE op within this transaction context.
204    async fn delete<'a, L, V>(&'a self, link: L, key: V) -> TCResult<()>
205    where
206        L: Into<ToUrl<'a>> + Send,
207        V: CastInto<Value> + Send;
208}
209
210/// Trait to define synchronization of a mutable state
211#[async_trait]
212pub trait Replicate<Txn>: Send + Sync {
213    /// Update the state of this replica from the given `source`.
214    async fn replicate(&self, txn: &Txn, source: Link) -> TCResult<hash::Output<hash::Sha256>>;
215}