tc_chain/
lib.rs

1//! A [`Chain`] responsible for recovering a [`State`] from a failed transaction.
2
3use std::fmt;
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use destream::{de, en};
10use freqfs::FileSave;
11use futures::future::TryFutureExt;
12use safecast::{AsType, TryCastFrom};
13
14use tc_collection::{Collection, CollectionBase, CollectionBlock};
15use tc_error::*;
16use tc_scalar::Scalar;
17use tc_transact::hash::{AsyncHash, GenericArray, Output, Sha256};
18use tc_transact::lock::TxnTaskQueue;
19use tc_transact::public::{Route, StateInstance};
20use tc_transact::{fs, Replicate};
21use tc_transact::{IntoView, Transact, Transaction, TxnId};
22use tc_value::{Link, Value};
23use tcgeneric::*;
24
25use data::{MutationPending, MutationRecord};
26
27pub use block::BlockChain;
28pub use data::ChainBlock;
29pub use sync::SyncChain;
30
31mod block;
32mod data;
33mod public;
34mod sync;
35
36pub const CHAIN: Label = label("chain");
37pub const HISTORY: Label = label(".history");
38
39const BLOCK_SIZE: usize = 1_000_000; // TODO: reduce to 4,096
40const PREFIX: PathLabel = path_label(&["state", "chain"]);
41
42/// A block in a file managed by a [`Chain`]
43pub trait CacheBlock: AsType<ChainBlock> + CollectionBlock {}
44
45impl<FE> CacheBlock for FE where FE: AsType<ChainBlock> + CollectionBlock {}
46
47/// Defines a method to recover the state of this [`Chain`] from a transaction failure.
48#[async_trait]
49pub trait Recover<FE> {
50    type Txn: Transaction<FE>;
51
52    /// Recover this state after loading, in case the last transaction failed or was interrupted.
53    async fn recover(&self, txn: &Self::Txn) -> TCResult<()>;
54}
55
56/// Methods common to any type of [`Chain`].
57pub trait ChainInstance<State: StateInstance, T> {
58    /// Append the given DELETE op to the latest block in this `Chain`.
59    fn append_delete(&self, txn_id: TxnId, key: Value) -> TCResult<()>;
60
61    /// Append the given PUT op to the latest block in this `Chain`.
62    fn append_put(&self, txn: State::Txn, key: Value, value: State) -> TCResult<()>;
63
64    /// Borrow the subject of this [`Chain`].
65    fn subject(&self) -> &T;
66}
67
68/// The type of a [`Chain`].
69#[derive(Clone, Copy, Eq, PartialEq)]
70pub enum ChainType {
71    Block,
72    Sync,
73}
74
75impl Default for ChainType {
76    fn default() -> Self {
77        Self::Sync
78    }
79}
80
81impl Class for ChainType {}
82
83impl NativeClass for ChainType {
84    fn from_path(path: &[PathSegment]) -> Option<Self> {
85        if path.len() == 3 && &path[0..2] == &PREFIX[..] {
86            match path[2].as_str() {
87                "block" => Some(Self::Block),
88                "sync" => Some(Self::Sync),
89                _ => None,
90            }
91        } else {
92            None
93        }
94    }
95
96    fn path(&self) -> TCPathBuf {
97        let suffix = match self {
98            Self::Block => "block",
99            Self::Sync => "sync",
100        };
101
102        TCPathBuf::from(PREFIX).append(label(suffix))
103    }
104}
105
106impl fmt::Debug for ChainType {
107    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
108        f.write_str(match self {
109            Self::Block => "type BlockChain",
110            Self::Sync => "type SyncChain",
111        })
112    }
113}
114
115/// A data structure responsible for maintaining the integrity of a mutable subject.
116pub enum Chain<State, Txn, FE, T> {
117    Block(block::BlockChain<State, Txn, FE, T>),
118    Sync(sync::SyncChain<State, Txn, FE, T>),
119}
120
121impl<State, Txn, FE, T> Clone for Chain<State, Txn, FE, T>
122where
123    T: Clone,
124{
125    fn clone(&self) -> Self {
126        match self {
127            Self::Block(chain) => Self::Block(chain.clone()),
128            Self::Sync(chain) => Self::Sync(chain.clone()),
129        }
130    }
131}
132
133impl<State, Txn, FE, T> Instance for Chain<State, Txn, FE, T>
134where
135    State: Send + Sync,
136    Txn: Send + Sync,
137    FE: Send + Sync,
138    T: Send + Sync,
139{
140    type Class = ChainType;
141
142    fn class(&self) -> Self::Class {
143        match self {
144            Self::Block(_) => ChainType::Block,
145            Self::Sync(_) => ChainType::Sync,
146        }
147    }
148}
149
150impl<State, T> ChainInstance<State, T> for Chain<State, State::Txn, State::FE, T>
151where
152    State: StateInstance,
153    State::FE: for<'a> FileSave<'a> + CacheBlock,
154    T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + fmt::Debug,
155    Collection<State::Txn, State::FE>: TryCastFrom<State>,
156    Scalar: TryCastFrom<State>,
157{
158    fn append_delete(&self, txn_id: TxnId, key: Value) -> TCResult<()> {
159        match self {
160            Self::Block(chain) => chain.append_delete(txn_id, key),
161            Self::Sync(chain) => chain.append_delete(txn_id, key),
162        }
163    }
164
165    fn append_put(&self, txn: State::Txn, key: Value, value: State) -> TCResult<()> {
166        match self {
167            Self::Block(chain) => chain.append_put(txn, key, value),
168            Self::Sync(chain) => chain.append_put(txn, key, value),
169        }
170    }
171
172    fn subject(&self) -> &T {
173        match self {
174            Self::Block(chain) => chain.subject(),
175            Self::Sync(chain) => chain.subject(),
176        }
177    }
178}
179
180#[async_trait]
181impl<State> Replicate<State::Txn>
182    for Chain<State, State::Txn, State::FE, CollectionBase<State::Txn, State::FE>>
183where
184    State: StateInstance,
185    State::FE: CacheBlock,
186    State: From<Collection<State::Txn, State::FE>> + From<Scalar>,
187    Collection<State::Txn, State::FE>: TryCastFrom<State>,
188    CollectionBase<State::Txn, State::FE>: Route<State> + TryCastFrom<State>,
189    Scalar: TryCastFrom<State>,
190    BlockChain<State, State::Txn, State::FE, CollectionBase<State::Txn, State::FE>>:
191        TryCastFrom<State>,
192    SyncChain<State, State::Txn, State::FE, CollectionBase<State::Txn, State::FE>>:
193        TryCastFrom<State>,
194{
195    async fn replicate(&self, txn: &State::Txn, source: Link) -> TCResult<Output<Sha256>> {
196        match self {
197            Self::Block(chain) => chain.replicate(txn, source).await,
198            Self::Sync(chain) => chain.replicate(txn, source).await,
199        }
200    }
201}
202
203#[async_trait]
204impl<State, T> AsyncHash for Chain<State, State::Txn, State::FE, T>
205where
206    State: StateInstance,
207    State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a> + ThreadSafe,
208    T: AsyncHash + Send + Sync,
209{
210    async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
211        match self {
212            Self::Block(chain) => chain.hash(txn_id).await,
213            Self::Sync(chain) => chain.hash(txn_id).await,
214        }
215    }
216}
217
218#[async_trait]
219impl<State, T> Transact for Chain<State, State::Txn, State::FE, T>
220where
221    State: StateInstance,
222    State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
223    T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + Transact + fmt::Debug,
224{
225    type Commit = T::Commit;
226
227    async fn commit(&self, txn_id: TxnId) -> Self::Commit {
228        match self {
229            Self::Block(chain) => chain.commit(txn_id).await,
230            Self::Sync(chain) => chain.commit(txn_id).await,
231        }
232    }
233
234    async fn rollback(&self, txn_id: &TxnId) {
235        match self {
236            Self::Block(chain) => chain.rollback(txn_id).await,
237            Self::Sync(chain) => chain.rollback(txn_id).await,
238        }
239    }
240
241    async fn finalize(&self, txn_id: &TxnId) {
242        match self {
243            Self::Block(chain) => chain.finalize(txn_id).await,
244            Self::Sync(chain) => chain.finalize(txn_id).await,
245        }
246    }
247}
248
249#[async_trait]
250impl<State, T> Recover<State::FE> for Chain<State, State::Txn, State::FE, T>
251where
252    State: StateInstance + From<Collection<State::Txn, State::FE>> + From<Scalar>,
253    State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
254    T: Route<State> + fmt::Debug + Send + Sync,
255    Collection<State::Txn, State::FE>: TryCastFrom<State>,
256    Scalar: TryCastFrom<State>,
257{
258    type Txn = State::Txn;
259
260    async fn recover(&self, txn: &State::Txn) -> TCResult<()> {
261        match self {
262            Self::Block(chain) => chain.recover(txn).await,
263            Self::Sync(chain) => chain.recover(txn).await,
264        }
265    }
266}
267
268#[async_trait]
269impl<State, T> fs::Persist<State::FE> for Chain<State, State::Txn, State::FE, T>
270where
271    State: StateInstance,
272    State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
273    T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + fmt::Debug,
274{
275    type Txn = State::Txn;
276    type Schema = (ChainType, T::Schema);
277
278    async fn create(
279        txn_id: TxnId,
280        schema: Self::Schema,
281        store: fs::Dir<State::FE>,
282    ) -> TCResult<Self> {
283        let (class, schema) = schema;
284
285        match class {
286            ChainType::Block => {
287                BlockChain::create(txn_id, schema, store)
288                    .map_ok(Self::Block)
289                    .await
290            }
291            ChainType::Sync => {
292                SyncChain::create(txn_id, schema, store)
293                    .map_ok(Self::Sync)
294                    .await
295            }
296        }
297    }
298
299    async fn load(
300        txn_id: TxnId,
301        schema: Self::Schema,
302        store: fs::Dir<State::FE>,
303    ) -> TCResult<Self> {
304        let (class, schema) = schema;
305        match class {
306            ChainType::Block => {
307                BlockChain::load(txn_id, schema, store)
308                    .map_ok(Self::Block)
309                    .await
310            }
311            ChainType::Sync => {
312                SyncChain::load(txn_id, schema, store)
313                    .map_ok(Self::Sync)
314                    .await
315            }
316        }
317    }
318
319    fn dir(&self) -> tc_transact::fs::Inner<State::FE> {
320        match self {
321            Self::Block(chain) => chain.dir(),
322            Self::Sync(chain) => chain.dir(),
323        }
324    }
325}
326
327#[async_trait]
328impl<State, T> fs::CopyFrom<State::FE, Self> for Chain<State, State::Txn, State::FE, T>
329where
330    State: StateInstance,
331    State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
332    T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + fmt::Debug,
333{
334    async fn copy_from(
335        txn: &State::Txn,
336        store: fs::Dir<State::FE>,
337        instance: Self,
338    ) -> TCResult<Self> {
339        match instance {
340            Chain::Block(chain) => {
341                BlockChain::copy_from(txn, store, chain)
342                    .map_ok(Chain::Block)
343                    .await
344            }
345            Chain::Sync(chain) => {
346                SyncChain::copy_from(txn, store, chain)
347                    .map_ok(Chain::Sync)
348                    .await
349            }
350        }
351    }
352}
353
354impl<State, Txn, FE, T> fmt::Debug for Chain<State, Txn, FE, T> {
355    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
356        match self {
357            Self::Block(chain) => chain.fmt(f),
358            Self::Sync(chain) => chain.fmt(f),
359        }
360    }
361}
362
363impl<State, Txn, FE, T> From<BlockChain<State, Txn, FE, T>> for Chain<State, Txn, FE, T> {
364    fn from(chain: BlockChain<State, Txn, FE, T>) -> Self {
365        Self::Block(chain)
366    }
367}
368
369enum ChainViewData<'en, T> {
370    Block((T, data::HistoryView<'en>)),
371    Sync(T),
372}
373
374/// A view of a [`Chain`] within a single `Transaction`, used for serialization.
375pub struct ChainView<'en, T> {
376    class: ChainType,
377    data: ChainViewData<'en, T>,
378}
379
380impl<'en, T> en::IntoStream<'en> for ChainView<'en, T>
381where
382    T: en::IntoStream<'en> + 'en,
383{
384    fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
385        use destream::en::EncodeMap;
386
387        let mut map = encoder.encode_map(Some(1))?;
388
389        map.encode_key(self.class.path().to_string())?;
390        match self.data {
391            ChainViewData::Block(view) => map.encode_value(view),
392            ChainViewData::Sync(view) => map.encode_value(view),
393        }?;
394
395        map.end()
396    }
397}
398
399#[async_trait]
400impl<'en, State, T> IntoView<'en, State::FE> for Chain<State, State::Txn, State::FE, T>
401where
402    State: StateInstance,
403    T: IntoView<'en, State::FE, Txn = State::Txn> + Send + Sync + 'en,
404    BlockChain<State, State::Txn, State::FE, T>: IntoView<'en, State::FE, View = (T::View, data::HistoryView<'en>), Txn = State::Txn>
405        + Send
406        + Sync,
407    SyncChain<State, State::Txn, State::FE, T>:
408        IntoView<'en, State::FE, View = T::View, Txn = State::Txn> + Send + Sync,
409{
410    type Txn = State::Txn;
411    type View = ChainView<'en, T::View>;
412
413    async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View> {
414        let class = self.class();
415
416        let data = match self {
417            Self::Block(chain) => chain.into_view(txn).map_ok(ChainViewData::Block).await,
418            Self::Sync(chain) => chain.into_view(txn).map_ok(ChainViewData::Sync).await,
419        }?;
420
421        Ok(ChainView { class, data })
422    }
423}
424
425#[async_trait]
426impl<State, T> de::FromStream for Chain<State, State::Txn, State::FE, T>
427where
428    State: StateInstance
429        + de::FromStream<Context = State::Txn>
430        + From<Collection<State::Txn, State::FE>>
431        + From<Scalar>,
432    State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
433    T: Route<State> + de::FromStream<Context = State::Txn> + fmt::Debug,
434    (Bytes, Map<Tuple<State>>): TryCastFrom<State>,
435    Collection<State::Txn, State::FE>: TryCastFrom<State>,
436    Scalar: TryCastFrom<State>,
437    Value: TryCastFrom<State>,
438    (Value,): TryCastFrom<State>,
439    (Value, State): TryCastFrom<State>,
440{
441    type Context = State::Txn;
442
443    async fn from_stream<D: de::Decoder>(
444        txn: State::Txn,
445        decoder: &mut D,
446    ) -> Result<Self, D::Error> {
447        decoder.decode_map(ChainVisitor::new(txn)).await
448    }
449}
450
451/// A [`de::Visitor`] for deserializing a [`Chain`].
452pub struct ChainVisitor<State: StateInstance, T> {
453    txn: State::Txn,
454    phantom: PhantomData<T>,
455}
456
457impl<State, T> ChainVisitor<State, T>
458where
459    State: StateInstance,
460{
461    pub fn new(txn: State::Txn) -> Self {
462        Self {
463            txn,
464            phantom: PhantomData,
465        }
466    }
467}
468
469impl<State, T> ChainVisitor<State, T>
470where
471    State: StateInstance
472        + de::FromStream<Context = State::Txn>
473        + From<Collection<State::Txn, State::FE>>
474        + From<Scalar>,
475    State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
476    T: Route<State> + de::FromStream<Context = State::Txn> + fmt::Debug,
477    (Bytes, Map<Tuple<State>>): TryCastFrom<State>,
478    Collection<State::Txn, State::FE>: TryCastFrom<State>,
479    Scalar: TryCastFrom<State>,
480    Value: TryCastFrom<State>,
481    (Value,): TryCastFrom<State>,
482    (Value, State): TryCastFrom<State>,
483{
484    pub async fn visit_map_value<A: de::MapAccess>(
485        self,
486        class: ChainType,
487        access: &mut A,
488    ) -> Result<Chain<State, State::Txn, State::FE, T>, A::Error> {
489        match class {
490            ChainType::Block => {
491                access
492                    .next_value(self.txn)
493                    .map_ok(Chain::Block)
494                    .map_err(|e| de::Error::custom(format!("invalid BlockChain stream: {}", e)))
495                    .await
496            }
497            ChainType::Sync => access.next_value(self.txn).map_ok(Chain::Sync).await,
498        }
499    }
500}
501
502#[async_trait]
503impl<State, T> de::Visitor for ChainVisitor<State, T>
504where
505    State: StateInstance
506        + de::FromStream<Context = State::Txn>
507        + From<Collection<State::Txn, State::FE>>
508        + From<Scalar>,
509    State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
510    T: Route<State> + de::FromStream<Context = State::Txn> + fmt::Debug,
511    (Bytes, Map<Tuple<State>>): TryCastFrom<State>,
512    Collection<State::Txn, State::FE>: TryCastFrom<State>,
513    Scalar: TryCastFrom<State>,
514    Value: TryCastFrom<State>,
515    (Value,): TryCastFrom<State>,
516    (Value, State): TryCastFrom<State>,
517{
518    type Value = Chain<State, State::Txn, State::FE, T>;
519
520    fn expecting() -> &'static str {
521        "a Chain"
522    }
523
524    async fn visit_map<A: de::MapAccess>(self, mut map: A) -> Result<Self::Value, A::Error> {
525        let class = if let Some(path) = map.next_key::<TCPathBuf>(()).await? {
526            ChainType::from_path(&path)
527                .ok_or_else(|| de::Error::invalid_value(path, "a Chain class"))?
528        } else {
529            return Err(de::Error::custom("expected a Chain class"));
530        };
531
532        self.visit_map_value(class, &mut map).await
533    }
534}
535
536fn new_queue<State>(
537    store: data::Store<State::Txn, State::FE>,
538) -> TxnTaskQueue<MutationPending<State::Txn, State::FE>, TCResult<MutationRecord>>
539where
540    State: StateInstance,
541    State::FE: for<'a> FileSave<'a> + CacheBlock + Clone,
542{
543    TxnTaskQueue::new(Arc::pin(move |mutation| {
544        let store = store.clone();
545
546        Box::pin(async move {
547            match mutation {
548                MutationPending::Delete(key) => Ok(MutationRecord::Delete(key)),
549                MutationPending::Put(txn, key, state) => {
550                    let value = store.save_state(&txn, state).await?;
551                    Ok(MutationRecord::Put(key, value))
552                }
553            }
554        })
555    }))
556}
557
558#[inline]
559pub fn null_hash() -> Output<Sha256> {
560    GenericArray::default()
561}