1use std::fmt;
5use std::marker::PhantomData;
6
7use async_trait::async_trait;
8use destream::{de, FromStream};
9use freqfs::{FileLock, FileSave, FileWriteGuard};
10use futures::TryFutureExt;
11use get_size::GetSize;
12use log::{debug, trace};
13use safecast::{AsType, TryCastFrom, TryCastInto};
14
15use tc_collection::Collection;
16use tc_error::*;
17use tc_scalar::Scalar;
18use tc_transact::hash::{AsyncHash, Output, Sha256};
19use tc_transact::lock::TxnTaskQueue;
20use tc_transact::public::{Route, StateInstance};
21use tc_transact::{fs, Replicate};
22use tc_transact::{Gateway, IntoView, Transact, Transaction, TxnId};
23use tc_value::{Link, Value};
24use tcgeneric::{label, Id, Label};
25
26use crate::data::{MutationPending, MutationRecord, StoreEntry};
27
28use super::{new_queue, null_hash, CacheBlock, ChainBlock, ChainInstance, Recover};
29
30const BLOCKS: Label = label(".blocks");
31const COMMITTED: &str = "committed.chain_block";
32const STORE: Label = label(".store");
33
34pub struct SyncChain<State, Txn, FE, T> {
37 committed: FileLock<FE>,
38 queue: TxnTaskQueue<MutationPending<Txn, FE>, TCResult<MutationRecord>>,
39 store: super::data::Store<Txn, FE>,
40 subject: T,
41 state: PhantomData<State>,
42}
43
44impl<State, Txn, FE, T> Clone for SyncChain<State, Txn, FE, T>
45where
46 T: Clone,
47{
48 fn clone(&self) -> Self {
49 Self {
50 committed: self.committed.clone(),
51 queue: self.queue.clone(),
52 store: self.store.clone(),
53 subject: self.subject.clone(),
54 state: self.state,
55 }
56 }
57}
58
59impl<State, T> SyncChain<State, State::Txn, State::FE, T>
60where
61 State: StateInstance,
62 State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a>,
63{
64 async fn write_ahead(&self, txn_id: TxnId) {
65 trace!("SyncChain::write_ahead {}", txn_id);
66
67 let handles = self.queue.commit(txn_id).await;
68
69 let mutations = handles
70 .into_iter()
71 .collect::<TCResult<Vec<_>>>()
72 .expect("mutations");
73
74 if mutations.is_empty() {
75 return;
76 }
77
78 self.store.commit(txn_id).await;
79
80 {
81 let mut committed: FileWriteGuard<ChainBlock> =
82 self.committed.write().await.expect("SyncChain block");
83
84 committed.mutations.insert(txn_id, mutations);
85 }
86
87 self.committed.sync().await.expect("sync SyncChain block")
88 }
89}
90
91impl<State, T> SyncChain<State, State::Txn, State::FE, T>
92where
93 State: StateInstance,
94 State::FE: AsType<ChainBlock>,
95 T: fs::Persist<State::FE, Txn = State::Txn> + fs::Restore<State::FE> + TryCastFrom<State>,
96 Self: TryCastFrom<State>,
97{
98 pub async fn restore_from(&self, txn: &State::Txn, source: Link, attr: Id) -> TCResult<()> {
99 debug!("restore {self:?} from {source}");
100
101 let backup = txn.get(source, attr).await?;
102 let backup: Self =
103 backup.try_cast_into(|backup| bad_request!("{:?} is not a valid backup", backup))?;
104
105 self.subject.restore(*txn.id(), &backup.subject).await?;
106
107 let mut committed = self.committed.write().await?;
108
109 *committed = ChainBlock::new(null_hash().to_vec());
110
111 Ok(())
112 }
113}
114
115impl<State, T> ChainInstance<State, T> for SyncChain<State, State::Txn, State::FE, T>
116where
117 State: StateInstance,
118 State::FE: CacheBlock,
119 T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + fmt::Debug,
120 Collection<State::Txn, State::FE>: TryCastFrom<State>,
121 Scalar: TryCastFrom<State>,
122{
123 fn append_delete(&self, txn_id: TxnId, key: Value) -> TCResult<()> {
124 self.queue
125 .push(txn_id, MutationPending::Delete(key))
126 .map_err(TCError::from)
127 }
128
129 fn append_put(&self, txn: State::Txn, key: Value, value: State) -> TCResult<()> {
130 let txn_id = *txn.id();
131 let value = StoreEntry::try_from_state(value)?;
132 let mutation = MutationPending::Put(txn, key, value);
133 self.queue.push(txn_id, mutation).map_err(TCError::from)
134 }
135
136 fn subject(&self) -> &T {
137 &self.subject
138 }
139}
140
141#[async_trait]
142impl<State, T> Replicate<State::Txn> for SyncChain<State, State::Txn, State::FE, T>
143where
144 State: StateInstance,
145 State::FE: AsType<ChainBlock>,
146 T: fs::Persist<State::FE, Txn = State::Txn>
147 + fs::Restore<State::FE>
148 + TryCastFrom<State>
149 + AsyncHash
150 + Send
151 + Sync,
152 Self: TryCastFrom<State>,
153{
154 async fn replicate(&self, txn: &State::Txn, mut source: Link) -> TCResult<Output<Sha256>> {
155 let attr = source
156 .path_mut()
157 .pop()
158 .ok_or_else(|| bad_request!("invalid replica link: {source}"))?;
159
160 self.restore_from(txn, source, attr).await?;
161
162 AsyncHash::hash(self, *txn.id()).await
163 }
164}
165
166#[async_trait]
167impl<State, T> AsyncHash for SyncChain<State, State::Txn, State::FE, T>
168where
169 State: StateInstance,
170 T: AsyncHash + Send + Sync,
171{
172 async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
173 self.subject.hash(txn_id).await
174 }
175}
176
177#[async_trait]
178impl<State, T> Transact for SyncChain<State, State::Txn, State::FE, T>
179where
180 State: StateInstance,
181 State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a>,
182 T: Transact + Send + Sync,
183{
184 type Commit = T::Commit;
185
186 async fn commit(&self, txn_id: TxnId) -> Self::Commit {
187 debug!("SyncChain::commit");
188
189 self.write_ahead(txn_id).await;
190 trace!("SyncChain::commit logged the mutations to be applied");
191
192 let guard = self.subject.commit(txn_id).await;
193 trace!("SyncChain committed subject, moving its mutations out of the write-head log...");
194
195 {
198 let mut committed: FileWriteGuard<ChainBlock> =
199 self.committed.write().await.expect("committed");
200
201 committed.mutations.remove(&txn_id);
202 trace!("mutations are out of the write-ahead log");
203 }
204
205 self.committed.sync().await.expect("sync commit block");
206
207 guard
208 }
209
210 async fn rollback(&self, txn_id: &TxnId) {
211 debug!("SyncChain::rollback");
212
213 self.queue.rollback(txn_id);
214 self.subject.rollback(txn_id).await;
215 }
216
217 async fn finalize(&self, txn_id: &TxnId) {
218 self.queue.finalize(*txn_id);
219 self.subject.finalize(txn_id).await
220 }
221}
222
223#[async_trait]
224impl<State, T> fs::Persist<State::FE> for SyncChain<State, State::Txn, State::FE, T>
225where
226 State: StateInstance,
227 State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
228 T: fs::Persist<State::FE, Txn = State::Txn> + Send + Sync,
229{
230 type Txn = State::Txn;
231 type Schema = T::Schema;
232
233 async fn create(
234 txn_id: TxnId,
235 schema: Self::Schema,
236 store: fs::Dir<State::FE>,
237 ) -> TCResult<Self> {
238 debug!("SyncChain::create");
239
240 let subject = T::create(txn_id, schema, store).await?;
241
242 let mut dir = subject.dir().try_write_owned()?;
243
244 let store = {
245 let dir = dir.create_dir(STORE.to_string())?;
246
247 fs::Dir::load(txn_id, dir)
248 .map_ok(super::data::Store::new)
249 .await?
250 };
251
252 let queue = new_queue::<State>(store.clone());
253
254 let mut blocks_dir = dir
256 .create_dir(BLOCKS.to_string())
257 .and_then(|dir| dir.try_write_owned())?;
258
259 let block = ChainBlock::new(null_hash().to_vec());
260 let size_hint = block.get_size();
261 let committed = blocks_dir.create_file(COMMITTED.to_string(), block, size_hint)?;
262
263 Ok(Self {
264 subject,
265 queue,
266 committed,
267 store,
268 state: PhantomData,
269 })
270 }
271
272 async fn load(
273 txn_id: TxnId,
274 schema: Self::Schema,
275 store: fs::Dir<State::FE>,
276 ) -> TCResult<Self> {
277 debug!("SyncChain::load");
278
279 let subject = T::load_or_create(txn_id, schema, store).await?;
280
281 let mut dir = subject.dir().write_owned().await;
282
283 let store = {
284 let dir = dir.get_or_create_dir(STORE.to_string())?;
285 fs::Dir::load(txn_id, dir)
286 .map_ok(super::data::Store::new)
287 .await?
288 };
289
290 let queue = new_queue::<State>(store.clone());
291
292 let mut blocks_dir = dir
293 .get_or_create_dir(BLOCKS.to_string())
294 .and_then(|dir| dir.try_write_owned())?;
295
296 let committed = if let Some(file) = blocks_dir.get_file(&*COMMITTED) {
297 file.clone()
298 } else {
299 let block = ChainBlock::new(null_hash().to_vec());
300 let size_hint = block.get_size();
301 blocks_dir.create_file(COMMITTED.to_string(), block, size_hint)?
302 };
303
304 Ok(Self {
305 subject,
306 queue,
307 committed,
308 store,
309 state: PhantomData,
310 })
311 }
312
313 fn dir(&self) -> fs::Inner<State::FE> {
314 self.subject.dir()
315 }
316}
317
318#[async_trait]
319impl<State, T> Recover<State::FE> for SyncChain<State, State::Txn, State::FE, T>
320where
321 State: StateInstance + From<Collection<State::Txn, State::FE>> + From<Scalar>,
322 State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
323 T: Route<State> + fmt::Debug + Send + Sync,
324 Collection<State::Txn, State::FE>: TryCastFrom<State>,
325 Scalar: TryCastFrom<State>,
326{
327 type Txn = State::Txn;
328
329 async fn recover(&self, txn: &State::Txn) -> TCResult<()> {
330 {
331 let mut committed: FileWriteGuard<ChainBlock> = self.committed.write().await?;
332
333 for (txn_id, mutations) in &committed.mutations {
334 super::data::replay_all(&self.subject, txn_id, mutations, txn, &self.store).await?;
335 }
336
337 committed.mutations.clear()
338 }
339
340 self.committed.sync().map_err(TCError::from).await
341 }
342}
343
344#[async_trait]
345impl<State, T> fs::CopyFrom<State::FE, Self> for SyncChain<State, State::Txn, State::FE, T>
346where
347 State: StateInstance,
348 State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
349 T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + fmt::Debug,
350{
351 async fn copy_from(
352 _txn: &State::Txn,
353 _store: fs::Dir<State::FE>,
354 _instance: Self,
355 ) -> TCResult<Self> {
356 Err(not_implemented!("SyncChain::copy_from"))
357 }
358}
359
360#[async_trait]
361impl<State, T> de::FromStream for SyncChain<State, State::Txn, State::FE, T>
362where
363 State: StateInstance,
364 State::FE: CacheBlock + for<'a> FileSave<'a>,
365 T: FromStream<Context = State::Txn>,
366{
367 type Context = State::Txn;
368
369 async fn from_stream<D: de::Decoder>(
370 txn: Self::Context,
371 decoder: &mut D,
372 ) -> Result<Self, D::Error> {
373 let subject = T::from_stream(txn.clone(), decoder).await?;
374
375 let cxt = txn.context().map_err(de::Error::custom).await?;
376
377 let store = {
378 let dir = {
379 let mut cxt = cxt.write().await;
380
381 cxt.create_dir(STORE.to_string())
382 .map_err(de::Error::custom)?
383 };
384
385 fs::Dir::load(*txn.id(), dir)
386 .map_ok(super::data::Store::new)
387 .map_err(de::Error::custom)
388 .await?
389 };
390
391 let queue = new_queue::<State>(store.clone());
392
393 let mut blocks_dir = {
394 let file = {
395 let mut cxt = cxt.write().await;
396
397 cxt.create_dir(BLOCKS.to_string())
398 .map_err(de::Error::custom)?
399 };
400
401 file.write_owned().await
402 };
403
404 let null_hash = null_hash();
405 let block = ChainBlock::new(null_hash.to_vec());
406 let size_hint = block.get_size();
407 let committed = blocks_dir
408 .create_file(COMMITTED.to_string(), block, size_hint)
409 .map_err(de::Error::custom)?;
410
411 Ok(Self {
412 subject,
413 queue,
414 committed,
415 store,
416 state: PhantomData,
417 })
418 }
419}
420
421#[async_trait]
422impl<'en, State, T> IntoView<'en, State::FE> for SyncChain<State, State::Txn, State::FE, T>
423where
424 State: StateInstance,
425 T: IntoView<'en, State::FE, Txn = State::Txn> + Send + Sync,
426{
427 type Txn = State::Txn;
428 type View = T::View;
429
430 async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View> {
431 self.subject.into_view(txn).await
432 }
433}
434
435impl<State, Txn, FE, T> fmt::Debug for SyncChain<State, Txn, FE, T> {
436 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
437 write!(f, "SyncChain<{}>", std::any::type_name::<T>())
438 }
439}