tc_chain/
sync.rs

1//! A [`super::Chain`] which keeps only the data needed to recover the state of its subject in the
2//! event of a transaction failure.
3
4use std::fmt;
5use std::marker::PhantomData;
6
7use async_trait::async_trait;
8use destream::{de, FromStream};
9use freqfs::{FileLock, FileSave, FileWriteGuard};
10use futures::TryFutureExt;
11use get_size::GetSize;
12use log::{debug, trace};
13use safecast::{AsType, TryCastFrom, TryCastInto};
14
15use tc_collection::Collection;
16use tc_error::*;
17use tc_scalar::Scalar;
18use tc_transact::hash::{AsyncHash, Output, Sha256};
19use tc_transact::lock::TxnTaskQueue;
20use tc_transact::public::{Route, StateInstance};
21use tc_transact::{fs, Replicate};
22use tc_transact::{Gateway, IntoView, Transact, Transaction, TxnId};
23use tc_value::{Link, Value};
24use tcgeneric::{label, Id, Label};
25
26use crate::data::{MutationPending, MutationRecord, StoreEntry};
27
28use super::{new_queue, null_hash, CacheBlock, ChainBlock, ChainInstance, Recover};
29
30const BLOCKS: Label = label(".blocks");
31const COMMITTED: &str = "committed.chain_block";
32const STORE: Label = label(".store");
33
34/// A [`super::Chain`] which keeps only the data needed to recover the state of its subject in the
35/// event of a transaction failure.
36pub struct SyncChain<State, Txn, FE, T> {
37    committed: FileLock<FE>,
38    queue: TxnTaskQueue<MutationPending<Txn, FE>, TCResult<MutationRecord>>,
39    store: super::data::Store<Txn, FE>,
40    subject: T,
41    state: PhantomData<State>,
42}
43
44impl<State, Txn, FE, T> Clone for SyncChain<State, Txn, FE, T>
45where
46    T: Clone,
47{
48    fn clone(&self) -> Self {
49        Self {
50            committed: self.committed.clone(),
51            queue: self.queue.clone(),
52            store: self.store.clone(),
53            subject: self.subject.clone(),
54            state: self.state,
55        }
56    }
57}
58
59impl<State, T> SyncChain<State, State::Txn, State::FE, T>
60where
61    State: StateInstance,
62    State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a>,
63{
64    async fn write_ahead(&self, txn_id: TxnId) {
65        trace!("SyncChain::write_ahead {}", txn_id);
66
67        let handles = self.queue.commit(txn_id).await;
68
69        let mutations = handles
70            .into_iter()
71            .collect::<TCResult<Vec<_>>>()
72            .expect("mutations");
73
74        if mutations.is_empty() {
75            return;
76        }
77
78        self.store.commit(txn_id).await;
79
80        {
81            let mut committed: FileWriteGuard<ChainBlock> =
82                self.committed.write().await.expect("SyncChain block");
83
84            committed.mutations.insert(txn_id, mutations);
85        }
86
87        self.committed.sync().await.expect("sync SyncChain block")
88    }
89}
90
91impl<State, T> SyncChain<State, State::Txn, State::FE, T>
92where
93    State: StateInstance,
94    State::FE: AsType<ChainBlock>,
95    T: fs::Persist<State::FE, Txn = State::Txn> + fs::Restore<State::FE> + TryCastFrom<State>,
96    Self: TryCastFrom<State>,
97{
98    pub async fn restore_from(&self, txn: &State::Txn, source: Link, attr: Id) -> TCResult<()> {
99        debug!("restore {self:?} from {source}");
100
101        let backup = txn.get(source, attr).await?;
102        let backup: Self =
103            backup.try_cast_into(|backup| bad_request!("{:?} is not a valid backup", backup))?;
104
105        self.subject.restore(*txn.id(), &backup.subject).await?;
106
107        let mut committed = self.committed.write().await?;
108
109        *committed = ChainBlock::new(null_hash().to_vec());
110
111        Ok(())
112    }
113}
114
115impl<State, T> ChainInstance<State, T> for SyncChain<State, State::Txn, State::FE, T>
116where
117    State: StateInstance,
118    State::FE: CacheBlock,
119    T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + fmt::Debug,
120    Collection<State::Txn, State::FE>: TryCastFrom<State>,
121    Scalar: TryCastFrom<State>,
122{
123    fn append_delete(&self, txn_id: TxnId, key: Value) -> TCResult<()> {
124        self.queue
125            .push(txn_id, MutationPending::Delete(key))
126            .map_err(TCError::from)
127    }
128
129    fn append_put(&self, txn: State::Txn, key: Value, value: State) -> TCResult<()> {
130        let txn_id = *txn.id();
131        let value = StoreEntry::try_from_state(value)?;
132        let mutation = MutationPending::Put(txn, key, value);
133        self.queue.push(txn_id, mutation).map_err(TCError::from)
134    }
135
136    fn subject(&self) -> &T {
137        &self.subject
138    }
139}
140
141#[async_trait]
142impl<State, T> Replicate<State::Txn> for SyncChain<State, State::Txn, State::FE, T>
143where
144    State: StateInstance,
145    State::FE: AsType<ChainBlock>,
146    T: fs::Persist<State::FE, Txn = State::Txn>
147        + fs::Restore<State::FE>
148        + TryCastFrom<State>
149        + AsyncHash
150        + Send
151        + Sync,
152    Self: TryCastFrom<State>,
153{
154    async fn replicate(&self, txn: &State::Txn, mut source: Link) -> TCResult<Output<Sha256>> {
155        let attr = source
156            .path_mut()
157            .pop()
158            .ok_or_else(|| bad_request!("invalid replica link: {source}"))?;
159
160        self.restore_from(txn, source, attr).await?;
161
162        AsyncHash::hash(self, *txn.id()).await
163    }
164}
165
166#[async_trait]
167impl<State, T> AsyncHash for SyncChain<State, State::Txn, State::FE, T>
168where
169    State: StateInstance,
170    T: AsyncHash + Send + Sync,
171{
172    async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
173        self.subject.hash(txn_id).await
174    }
175}
176
177#[async_trait]
178impl<State, T> Transact for SyncChain<State, State::Txn, State::FE, T>
179where
180    State: StateInstance,
181    State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a>,
182    T: Transact + Send + Sync,
183{
184    type Commit = T::Commit;
185
186    async fn commit(&self, txn_id: TxnId) -> Self::Commit {
187        debug!("SyncChain::commit");
188
189        self.write_ahead(txn_id).await;
190        trace!("SyncChain::commit logged the mutations to be applied");
191
192        let guard = self.subject.commit(txn_id).await;
193        trace!("SyncChain committed subject, moving its mutations out of the write-head log...");
194
195        // assume the mutations for the transaction have already been moved and sync'd
196        // from `self.pending` to `self.committed` by calling the `write_ahead` method
197        {
198            let mut committed: FileWriteGuard<ChainBlock> =
199                self.committed.write().await.expect("committed");
200
201            committed.mutations.remove(&txn_id);
202            trace!("mutations are out of the write-ahead log");
203        }
204
205        self.committed.sync().await.expect("sync commit block");
206
207        guard
208    }
209
210    async fn rollback(&self, txn_id: &TxnId) {
211        debug!("SyncChain::rollback");
212
213        self.queue.rollback(txn_id);
214        self.subject.rollback(txn_id).await;
215    }
216
217    async fn finalize(&self, txn_id: &TxnId) {
218        self.queue.finalize(*txn_id);
219        self.subject.finalize(txn_id).await
220    }
221}
222
223#[async_trait]
224impl<State, T> fs::Persist<State::FE> for SyncChain<State, State::Txn, State::FE, T>
225where
226    State: StateInstance,
227    State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
228    T: fs::Persist<State::FE, Txn = State::Txn> + Send + Sync,
229{
230    type Txn = State::Txn;
231    type Schema = T::Schema;
232
233    async fn create(
234        txn_id: TxnId,
235        schema: Self::Schema,
236        store: fs::Dir<State::FE>,
237    ) -> TCResult<Self> {
238        debug!("SyncChain::create");
239
240        let subject = T::create(txn_id, schema, store).await?;
241
242        let mut dir = subject.dir().try_write_owned()?;
243
244        let store = {
245            let dir = dir.create_dir(STORE.to_string())?;
246
247            fs::Dir::load(txn_id, dir)
248                .map_ok(super::data::Store::new)
249                .await?
250        };
251
252        let queue = new_queue::<State>(store.clone());
253
254        // TODO: is this necessary?
255        let mut blocks_dir = dir
256            .create_dir(BLOCKS.to_string())
257            .and_then(|dir| dir.try_write_owned())?;
258
259        let block = ChainBlock::new(null_hash().to_vec());
260        let size_hint = block.get_size();
261        let committed = blocks_dir.create_file(COMMITTED.to_string(), block, size_hint)?;
262
263        Ok(Self {
264            subject,
265            queue,
266            committed,
267            store,
268            state: PhantomData,
269        })
270    }
271
272    async fn load(
273        txn_id: TxnId,
274        schema: Self::Schema,
275        store: fs::Dir<State::FE>,
276    ) -> TCResult<Self> {
277        debug!("SyncChain::load");
278
279        let subject = T::load_or_create(txn_id, schema, store).await?;
280
281        let mut dir = subject.dir().write_owned().await;
282
283        let store = {
284            let dir = dir.get_or_create_dir(STORE.to_string())?;
285            fs::Dir::load(txn_id, dir)
286                .map_ok(super::data::Store::new)
287                .await?
288        };
289
290        let queue = new_queue::<State>(store.clone());
291
292        let mut blocks_dir = dir
293            .get_or_create_dir(BLOCKS.to_string())
294            .and_then(|dir| dir.try_write_owned())?;
295
296        let committed = if let Some(file) = blocks_dir.get_file(&*COMMITTED) {
297            file.clone()
298        } else {
299            let block = ChainBlock::new(null_hash().to_vec());
300            let size_hint = block.get_size();
301            blocks_dir.create_file(COMMITTED.to_string(), block, size_hint)?
302        };
303
304        Ok(Self {
305            subject,
306            queue,
307            committed,
308            store,
309            state: PhantomData,
310        })
311    }
312
313    fn dir(&self) -> fs::Inner<State::FE> {
314        self.subject.dir()
315    }
316}
317
318#[async_trait]
319impl<State, T> Recover<State::FE> for SyncChain<State, State::Txn, State::FE, T>
320where
321    State: StateInstance + From<Collection<State::Txn, State::FE>> + From<Scalar>,
322    State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
323    T: Route<State> + fmt::Debug + Send + Sync,
324    Collection<State::Txn, State::FE>: TryCastFrom<State>,
325    Scalar: TryCastFrom<State>,
326{
327    type Txn = State::Txn;
328
329    async fn recover(&self, txn: &State::Txn) -> TCResult<()> {
330        {
331            let mut committed: FileWriteGuard<ChainBlock> = self.committed.write().await?;
332
333            for (txn_id, mutations) in &committed.mutations {
334                super::data::replay_all(&self.subject, txn_id, mutations, txn, &self.store).await?;
335            }
336
337            committed.mutations.clear()
338        }
339
340        self.committed.sync().map_err(TCError::from).await
341    }
342}
343
344#[async_trait]
345impl<State, T> fs::CopyFrom<State::FE, Self> for SyncChain<State, State::Txn, State::FE, T>
346where
347    State: StateInstance,
348    State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
349    T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + fmt::Debug,
350{
351    async fn copy_from(
352        _txn: &State::Txn,
353        _store: fs::Dir<State::FE>,
354        _instance: Self,
355    ) -> TCResult<Self> {
356        Err(not_implemented!("SyncChain::copy_from"))
357    }
358}
359
360#[async_trait]
361impl<State, T> de::FromStream for SyncChain<State, State::Txn, State::FE, T>
362where
363    State: StateInstance,
364    State::FE: CacheBlock + for<'a> FileSave<'a>,
365    T: FromStream<Context = State::Txn>,
366{
367    type Context = State::Txn;
368
369    async fn from_stream<D: de::Decoder>(
370        txn: Self::Context,
371        decoder: &mut D,
372    ) -> Result<Self, D::Error> {
373        let subject = T::from_stream(txn.clone(), decoder).await?;
374
375        let cxt = txn.context().map_err(de::Error::custom).await?;
376
377        let store = {
378            let dir = {
379                let mut cxt = cxt.write().await;
380
381                cxt.create_dir(STORE.to_string())
382                    .map_err(de::Error::custom)?
383            };
384
385            fs::Dir::load(*txn.id(), dir)
386                .map_ok(super::data::Store::new)
387                .map_err(de::Error::custom)
388                .await?
389        };
390
391        let queue = new_queue::<State>(store.clone());
392
393        let mut blocks_dir = {
394            let file = {
395                let mut cxt = cxt.write().await;
396
397                cxt.create_dir(BLOCKS.to_string())
398                    .map_err(de::Error::custom)?
399            };
400
401            file.write_owned().await
402        };
403
404        let null_hash = null_hash();
405        let block = ChainBlock::new(null_hash.to_vec());
406        let size_hint = block.get_size();
407        let committed = blocks_dir
408            .create_file(COMMITTED.to_string(), block, size_hint)
409            .map_err(de::Error::custom)?;
410
411        Ok(Self {
412            subject,
413            queue,
414            committed,
415            store,
416            state: PhantomData,
417        })
418    }
419}
420
421#[async_trait]
422impl<'en, State, T> IntoView<'en, State::FE> for SyncChain<State, State::Txn, State::FE, T>
423where
424    State: StateInstance,
425    T: IntoView<'en, State::FE, Txn = State::Txn> + Send + Sync,
426{
427    type Txn = State::Txn;
428    type View = T::View;
429
430    async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View> {
431        self.subject.into_view(txn).await
432    }
433}
434
435impl<State, Txn, FE, T> fmt::Debug for SyncChain<State, Txn, FE, T> {
436    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
437        write!(f, "SyncChain<{}>", std::any::type_name::<T>())
438    }
439}