1use std::fmt;
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use destream::{de, en};
10use freqfs::FileSave;
11use futures::future::TryFutureExt;
12use safecast::{AsType, TryCastFrom};
13
14use tc_collection::{Collection, CollectionBase, CollectionBlock};
15use tc_error::*;
16use tc_scalar::Scalar;
17use tc_transact::hash::{AsyncHash, GenericArray, Output, Sha256};
18use tc_transact::lock::TxnTaskQueue;
19use tc_transact::public::{Route, StateInstance};
20use tc_transact::{fs, Replicate};
21use tc_transact::{IntoView, Transact, Transaction, TxnId};
22use tc_value::{Link, Value};
23use tcgeneric::*;
24
25use data::{MutationPending, MutationRecord};
26
27pub use block::BlockChain;
28pub use data::ChainBlock;
29pub use sync::SyncChain;
30
31mod block;
32mod data;
33mod public;
34mod sync;
35
36pub const CHAIN: Label = label("chain");
37pub const HISTORY: Label = label(".history");
38
39const BLOCK_SIZE: usize = 1_000_000; const PREFIX: PathLabel = path_label(&["state", "chain"]);
41
42pub trait CacheBlock: AsType<ChainBlock> + CollectionBlock {}
44
45impl<FE> CacheBlock for FE where FE: AsType<ChainBlock> + CollectionBlock {}
46
47#[async_trait]
49pub trait Recover<FE> {
50 type Txn: Transaction<FE>;
51
52 async fn recover(&self, txn: &Self::Txn) -> TCResult<()>;
54}
55
56pub trait ChainInstance<State: StateInstance, T> {
58 fn append_delete(&self, txn_id: TxnId, key: Value) -> TCResult<()>;
60
61 fn append_put(&self, txn: State::Txn, key: Value, value: State) -> TCResult<()>;
63
64 fn subject(&self) -> &T;
66}
67
68#[derive(Clone, Copy, Eq, PartialEq)]
70pub enum ChainType {
71 Block,
72 Sync,
73}
74
75impl Default for ChainType {
76 fn default() -> Self {
77 Self::Sync
78 }
79}
80
81impl Class for ChainType {}
82
83impl NativeClass for ChainType {
84 fn from_path(path: &[PathSegment]) -> Option<Self> {
85 if path.len() == 3 && &path[0..2] == &PREFIX[..] {
86 match path[2].as_str() {
87 "block" => Some(Self::Block),
88 "sync" => Some(Self::Sync),
89 _ => None,
90 }
91 } else {
92 None
93 }
94 }
95
96 fn path(&self) -> TCPathBuf {
97 let suffix = match self {
98 Self::Block => "block",
99 Self::Sync => "sync",
100 };
101
102 TCPathBuf::from(PREFIX).append(label(suffix))
103 }
104}
105
106impl fmt::Debug for ChainType {
107 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
108 f.write_str(match self {
109 Self::Block => "type BlockChain",
110 Self::Sync => "type SyncChain",
111 })
112 }
113}
114
115pub enum Chain<State, Txn, FE, T> {
117 Block(block::BlockChain<State, Txn, FE, T>),
118 Sync(sync::SyncChain<State, Txn, FE, T>),
119}
120
121impl<State, Txn, FE, T> Clone for Chain<State, Txn, FE, T>
122where
123 T: Clone,
124{
125 fn clone(&self) -> Self {
126 match self {
127 Self::Block(chain) => Self::Block(chain.clone()),
128 Self::Sync(chain) => Self::Sync(chain.clone()),
129 }
130 }
131}
132
133impl<State, Txn, FE, T> Instance for Chain<State, Txn, FE, T>
134where
135 State: Send + Sync,
136 Txn: Send + Sync,
137 FE: Send + Sync,
138 T: Send + Sync,
139{
140 type Class = ChainType;
141
142 fn class(&self) -> Self::Class {
143 match self {
144 Self::Block(_) => ChainType::Block,
145 Self::Sync(_) => ChainType::Sync,
146 }
147 }
148}
149
150impl<State, T> ChainInstance<State, T> for Chain<State, State::Txn, State::FE, T>
151where
152 State: StateInstance,
153 State::FE: for<'a> FileSave<'a> + CacheBlock,
154 T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + fmt::Debug,
155 Collection<State::Txn, State::FE>: TryCastFrom<State>,
156 Scalar: TryCastFrom<State>,
157{
158 fn append_delete(&self, txn_id: TxnId, key: Value) -> TCResult<()> {
159 match self {
160 Self::Block(chain) => chain.append_delete(txn_id, key),
161 Self::Sync(chain) => chain.append_delete(txn_id, key),
162 }
163 }
164
165 fn append_put(&self, txn: State::Txn, key: Value, value: State) -> TCResult<()> {
166 match self {
167 Self::Block(chain) => chain.append_put(txn, key, value),
168 Self::Sync(chain) => chain.append_put(txn, key, value),
169 }
170 }
171
172 fn subject(&self) -> &T {
173 match self {
174 Self::Block(chain) => chain.subject(),
175 Self::Sync(chain) => chain.subject(),
176 }
177 }
178}
179
180#[async_trait]
181impl<State> Replicate<State::Txn>
182 for Chain<State, State::Txn, State::FE, CollectionBase<State::Txn, State::FE>>
183where
184 State: StateInstance,
185 State::FE: CacheBlock,
186 State: From<Collection<State::Txn, State::FE>> + From<Scalar>,
187 Collection<State::Txn, State::FE>: TryCastFrom<State>,
188 CollectionBase<State::Txn, State::FE>: Route<State> + TryCastFrom<State>,
189 Scalar: TryCastFrom<State>,
190 BlockChain<State, State::Txn, State::FE, CollectionBase<State::Txn, State::FE>>:
191 TryCastFrom<State>,
192 SyncChain<State, State::Txn, State::FE, CollectionBase<State::Txn, State::FE>>:
193 TryCastFrom<State>,
194{
195 async fn replicate(&self, txn: &State::Txn, source: Link) -> TCResult<Output<Sha256>> {
196 match self {
197 Self::Block(chain) => chain.replicate(txn, source).await,
198 Self::Sync(chain) => chain.replicate(txn, source).await,
199 }
200 }
201}
202
203#[async_trait]
204impl<State, T> AsyncHash for Chain<State, State::Txn, State::FE, T>
205where
206 State: StateInstance,
207 State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a> + ThreadSafe,
208 T: AsyncHash + Send + Sync,
209{
210 async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
211 match self {
212 Self::Block(chain) => chain.hash(txn_id).await,
213 Self::Sync(chain) => chain.hash(txn_id).await,
214 }
215 }
216}
217
218#[async_trait]
219impl<State, T> Transact for Chain<State, State::Txn, State::FE, T>
220where
221 State: StateInstance,
222 State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
223 T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + Transact + fmt::Debug,
224{
225 type Commit = T::Commit;
226
227 async fn commit(&self, txn_id: TxnId) -> Self::Commit {
228 match self {
229 Self::Block(chain) => chain.commit(txn_id).await,
230 Self::Sync(chain) => chain.commit(txn_id).await,
231 }
232 }
233
234 async fn rollback(&self, txn_id: &TxnId) {
235 match self {
236 Self::Block(chain) => chain.rollback(txn_id).await,
237 Self::Sync(chain) => chain.rollback(txn_id).await,
238 }
239 }
240
241 async fn finalize(&self, txn_id: &TxnId) {
242 match self {
243 Self::Block(chain) => chain.finalize(txn_id).await,
244 Self::Sync(chain) => chain.finalize(txn_id).await,
245 }
246 }
247}
248
249#[async_trait]
250impl<State, T> Recover<State::FE> for Chain<State, State::Txn, State::FE, T>
251where
252 State: StateInstance + From<Collection<State::Txn, State::FE>> + From<Scalar>,
253 State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
254 T: Route<State> + fmt::Debug + Send + Sync,
255 Collection<State::Txn, State::FE>: TryCastFrom<State>,
256 Scalar: TryCastFrom<State>,
257{
258 type Txn = State::Txn;
259
260 async fn recover(&self, txn: &State::Txn) -> TCResult<()> {
261 match self {
262 Self::Block(chain) => chain.recover(txn).await,
263 Self::Sync(chain) => chain.recover(txn).await,
264 }
265 }
266}
267
268#[async_trait]
269impl<State, T> fs::Persist<State::FE> for Chain<State, State::Txn, State::FE, T>
270where
271 State: StateInstance,
272 State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
273 T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + fmt::Debug,
274{
275 type Txn = State::Txn;
276 type Schema = (ChainType, T::Schema);
277
278 async fn create(
279 txn_id: TxnId,
280 schema: Self::Schema,
281 store: fs::Dir<State::FE>,
282 ) -> TCResult<Self> {
283 let (class, schema) = schema;
284
285 match class {
286 ChainType::Block => {
287 BlockChain::create(txn_id, schema, store)
288 .map_ok(Self::Block)
289 .await
290 }
291 ChainType::Sync => {
292 SyncChain::create(txn_id, schema, store)
293 .map_ok(Self::Sync)
294 .await
295 }
296 }
297 }
298
299 async fn load(
300 txn_id: TxnId,
301 schema: Self::Schema,
302 store: fs::Dir<State::FE>,
303 ) -> TCResult<Self> {
304 let (class, schema) = schema;
305 match class {
306 ChainType::Block => {
307 BlockChain::load(txn_id, schema, store)
308 .map_ok(Self::Block)
309 .await
310 }
311 ChainType::Sync => {
312 SyncChain::load(txn_id, schema, store)
313 .map_ok(Self::Sync)
314 .await
315 }
316 }
317 }
318
319 fn dir(&self) -> tc_transact::fs::Inner<State::FE> {
320 match self {
321 Self::Block(chain) => chain.dir(),
322 Self::Sync(chain) => chain.dir(),
323 }
324 }
325}
326
327#[async_trait]
328impl<State, T> fs::CopyFrom<State::FE, Self> for Chain<State, State::Txn, State::FE, T>
329where
330 State: StateInstance,
331 State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
332 T: fs::Persist<State::FE, Txn = State::Txn> + Route<State> + fmt::Debug,
333{
334 async fn copy_from(
335 txn: &State::Txn,
336 store: fs::Dir<State::FE>,
337 instance: Self,
338 ) -> TCResult<Self> {
339 match instance {
340 Chain::Block(chain) => {
341 BlockChain::copy_from(txn, store, chain)
342 .map_ok(Chain::Block)
343 .await
344 }
345 Chain::Sync(chain) => {
346 SyncChain::copy_from(txn, store, chain)
347 .map_ok(Chain::Sync)
348 .await
349 }
350 }
351 }
352}
353
354impl<State, Txn, FE, T> fmt::Debug for Chain<State, Txn, FE, T> {
355 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
356 match self {
357 Self::Block(chain) => chain.fmt(f),
358 Self::Sync(chain) => chain.fmt(f),
359 }
360 }
361}
362
363impl<State, Txn, FE, T> From<BlockChain<State, Txn, FE, T>> for Chain<State, Txn, FE, T> {
364 fn from(chain: BlockChain<State, Txn, FE, T>) -> Self {
365 Self::Block(chain)
366 }
367}
368
369enum ChainViewData<'en, T> {
370 Block((T, data::HistoryView<'en>)),
371 Sync(T),
372}
373
374pub struct ChainView<'en, T> {
376 class: ChainType,
377 data: ChainViewData<'en, T>,
378}
379
380impl<'en, T> en::IntoStream<'en> for ChainView<'en, T>
381where
382 T: en::IntoStream<'en> + 'en,
383{
384 fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
385 use destream::en::EncodeMap;
386
387 let mut map = encoder.encode_map(Some(1))?;
388
389 map.encode_key(self.class.path().to_string())?;
390 match self.data {
391 ChainViewData::Block(view) => map.encode_value(view),
392 ChainViewData::Sync(view) => map.encode_value(view),
393 }?;
394
395 map.end()
396 }
397}
398
399#[async_trait]
400impl<'en, State, T> IntoView<'en, State::FE> for Chain<State, State::Txn, State::FE, T>
401where
402 State: StateInstance,
403 T: IntoView<'en, State::FE, Txn = State::Txn> + Send + Sync + 'en,
404 BlockChain<State, State::Txn, State::FE, T>: IntoView<'en, State::FE, View = (T::View, data::HistoryView<'en>), Txn = State::Txn>
405 + Send
406 + Sync,
407 SyncChain<State, State::Txn, State::FE, T>:
408 IntoView<'en, State::FE, View = T::View, Txn = State::Txn> + Send + Sync,
409{
410 type Txn = State::Txn;
411 type View = ChainView<'en, T::View>;
412
413 async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View> {
414 let class = self.class();
415
416 let data = match self {
417 Self::Block(chain) => chain.into_view(txn).map_ok(ChainViewData::Block).await,
418 Self::Sync(chain) => chain.into_view(txn).map_ok(ChainViewData::Sync).await,
419 }?;
420
421 Ok(ChainView { class, data })
422 }
423}
424
425#[async_trait]
426impl<State, T> de::FromStream for Chain<State, State::Txn, State::FE, T>
427where
428 State: StateInstance
429 + de::FromStream<Context = State::Txn>
430 + From<Collection<State::Txn, State::FE>>
431 + From<Scalar>,
432 State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
433 T: Route<State> + de::FromStream<Context = State::Txn> + fmt::Debug,
434 (Bytes, Map<Tuple<State>>): TryCastFrom<State>,
435 Collection<State::Txn, State::FE>: TryCastFrom<State>,
436 Scalar: TryCastFrom<State>,
437 Value: TryCastFrom<State>,
438 (Value,): TryCastFrom<State>,
439 (Value, State): TryCastFrom<State>,
440{
441 type Context = State::Txn;
442
443 async fn from_stream<D: de::Decoder>(
444 txn: State::Txn,
445 decoder: &mut D,
446 ) -> Result<Self, D::Error> {
447 decoder.decode_map(ChainVisitor::new(txn)).await
448 }
449}
450
451pub struct ChainVisitor<State: StateInstance, T> {
453 txn: State::Txn,
454 phantom: PhantomData<T>,
455}
456
457impl<State, T> ChainVisitor<State, T>
458where
459 State: StateInstance,
460{
461 pub fn new(txn: State::Txn) -> Self {
462 Self {
463 txn,
464 phantom: PhantomData,
465 }
466 }
467}
468
469impl<State, T> ChainVisitor<State, T>
470where
471 State: StateInstance
472 + de::FromStream<Context = State::Txn>
473 + From<Collection<State::Txn, State::FE>>
474 + From<Scalar>,
475 State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
476 T: Route<State> + de::FromStream<Context = State::Txn> + fmt::Debug,
477 (Bytes, Map<Tuple<State>>): TryCastFrom<State>,
478 Collection<State::Txn, State::FE>: TryCastFrom<State>,
479 Scalar: TryCastFrom<State>,
480 Value: TryCastFrom<State>,
481 (Value,): TryCastFrom<State>,
482 (Value, State): TryCastFrom<State>,
483{
484 pub async fn visit_map_value<A: de::MapAccess>(
485 self,
486 class: ChainType,
487 access: &mut A,
488 ) -> Result<Chain<State, State::Txn, State::FE, T>, A::Error> {
489 match class {
490 ChainType::Block => {
491 access
492 .next_value(self.txn)
493 .map_ok(Chain::Block)
494 .map_err(|e| de::Error::custom(format!("invalid BlockChain stream: {}", e)))
495 .await
496 }
497 ChainType::Sync => access.next_value(self.txn).map_ok(Chain::Sync).await,
498 }
499 }
500}
501
502#[async_trait]
503impl<State, T> de::Visitor for ChainVisitor<State, T>
504where
505 State: StateInstance
506 + de::FromStream<Context = State::Txn>
507 + From<Collection<State::Txn, State::FE>>
508 + From<Scalar>,
509 State::FE: CacheBlock + for<'a> fs::FileSave<'a>,
510 T: Route<State> + de::FromStream<Context = State::Txn> + fmt::Debug,
511 (Bytes, Map<Tuple<State>>): TryCastFrom<State>,
512 Collection<State::Txn, State::FE>: TryCastFrom<State>,
513 Scalar: TryCastFrom<State>,
514 Value: TryCastFrom<State>,
515 (Value,): TryCastFrom<State>,
516 (Value, State): TryCastFrom<State>,
517{
518 type Value = Chain<State, State::Txn, State::FE, T>;
519
520 fn expecting() -> &'static str {
521 "a Chain"
522 }
523
524 async fn visit_map<A: de::MapAccess>(self, mut map: A) -> Result<Self::Value, A::Error> {
525 let class = if let Some(path) = map.next_key::<TCPathBuf>(()).await? {
526 ChainType::from_path(&path)
527 .ok_or_else(|| de::Error::invalid_value(path, "a Chain class"))?
528 } else {
529 return Err(de::Error::custom("expected a Chain class"));
530 };
531
532 self.visit_map_value(class, &mut map).await
533 }
534}
535
536fn new_queue<State>(
537 store: data::Store<State::Txn, State::FE>,
538) -> TxnTaskQueue<MutationPending<State::Txn, State::FE>, TCResult<MutationRecord>>
539where
540 State: StateInstance,
541 State::FE: for<'a> FileSave<'a> + CacheBlock + Clone,
542{
543 TxnTaskQueue::new(Arc::pin(move |mutation| {
544 let store = store.clone();
545
546 Box::pin(async move {
547 match mutation {
548 MutationPending::Delete(key) => Ok(MutationRecord::Delete(key)),
549 MutationPending::Put(txn, key, state) => {
550 let value = store.save_state(&txn, state).await?;
551 Ok(MutationRecord::Put(key, value))
552 }
553 }
554 })
555 }))
556}
557
558#[inline]
559pub fn null_hash() -> Output<Sha256> {
560 GenericArray::default()
561}