tc_chain/data/
store.rs

1use std::fmt;
2use std::marker::PhantomData;
3
4use async_trait::async_trait;
5use destream::en;
6use freqfs::FileSave;
7use futures::future::TryFutureExt;
8use log::debug;
9use safecast::*;
10
11use tc_collection::{Collection, CollectionBase, CollectionBlock, CollectionView, Schema};
12use tc_error::*;
13use tc_scalar::{OpRef, Scalar, TCRef};
14use tc_transact::fs;
15use tc_transact::hash::{AsyncHash, Hash, Output, Sha256};
16use tc_transact::public::StateInstance;
17use tc_transact::{IntoView, Transact, Transaction, TxnId};
18use tc_value::Value;
19use tcgeneric::{Id, Instance, NativeClass, ThreadSafe};
20
21pub enum StoreEntry<Txn, FE> {
22    Collection(Collection<Txn, FE>),
23    Scalar(Scalar),
24}
25
26impl<Txn, FE> Clone for StoreEntry<Txn, FE>
27where
28    Collection<Txn, FE>: Clone,
29{
30    fn clone(&self) -> Self {
31        match self {
32            Self::Collection(collection) => Self::Collection(collection.clone()),
33            Self::Scalar(scalar) => Self::Scalar(scalar.clone()),
34        }
35    }
36}
37
38impl<Txn, FE> StoreEntry<Txn, FE> {
39    pub fn try_from_state<State>(state: State) -> TCResult<Self>
40    where
41        State: StateInstance<Txn = Txn, FE = FE>,
42        Txn: Transaction<FE>,
43        FE: CollectionBlock + Clone,
44        Collection<Txn, FE>: TryCastFrom<State>,
45        Scalar: TryCastFrom<State>,
46    {
47        if Collection::<_, _>::can_cast_from(&state) {
48            state
49                .try_cast_into(|s| bad_request!("not a collection: {s:?}"))
50                .map(Self::Collection)
51        } else if Scalar::can_cast_from(&state) {
52            state
53                .try_cast_into(|s| bad_request!("not a scalar: {s:?}"))
54                .map(Self::Scalar)
55        } else {
56            Err(bad_request!("invalid Chain value entry: {state:?}"))
57        }
58    }
59
60    pub fn into_state<State>(self) -> State
61    where
62        State: StateInstance<Txn = Txn, FE = FE> + From<Collection<Txn, FE>> + From<Scalar>,
63    {
64        match self {
65            Self::Collection(collection) => collection.into(),
66            Self::Scalar(scalar) => scalar.into(),
67        }
68    }
69}
70
71#[async_trait]
72impl<'a, Txn, FE> AsyncHash for &'a StoreEntry<Txn, FE>
73where
74    FE: CollectionBlock + Clone,
75    Txn: Transaction<FE>,
76    Collection<Txn, FE>: AsyncHash,
77    Scalar: Hash<Sha256>,
78{
79    async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
80        match self {
81            StoreEntry::Collection(collection) => collection.clone().hash(txn_id).await,
82            StoreEntry::Scalar(scalar) => Ok(Hash::<Sha256>::hash(scalar)),
83        }
84    }
85}
86
87#[async_trait]
88impl<'en, Txn, FE> IntoView<'en, FE> for StoreEntry<Txn, FE>
89where
90    Txn: Transaction<FE>,
91    FE: CollectionBlock + Clone,
92{
93    type Txn = Txn;
94    type View = StoreEntryView<'en>;
95
96    async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View> {
97        match self {
98            Self::Collection(collection) => {
99                collection
100                    .into_view(txn)
101                    .map_ok(StoreEntryView::Collection)
102                    .await
103            }
104            Self::Scalar(scalar) => Ok(StoreEntryView::Scalar(scalar)),
105        }
106    }
107}
108
109impl<Txn, FE> fmt::Debug for StoreEntry<Txn, FE> {
110    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
111        match self {
112            Self::Collection(collection) => collection.fmt(f),
113            Self::Scalar(scalar) => scalar.fmt(f),
114        }
115    }
116}
117
118pub enum StoreEntryView<'en> {
119    Collection(CollectionView<'en>),
120    Scalar(Scalar),
121}
122
123impl<'en> en::IntoStream<'en> for StoreEntryView<'en> {
124    fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
125        match self {
126            Self::Collection(collection) => collection.into_stream(encoder),
127            Self::Scalar(scalar) => scalar.into_stream(encoder),
128        }
129    }
130}
131
132pub struct Store<Txn, FE> {
133    dir: fs::Dir<FE>,
134    txn: PhantomData<Txn>,
135}
136
137impl<Txn, FE> Clone for Store<Txn, FE> {
138    fn clone(&self) -> Self {
139        Self {
140            dir: self.dir.clone(),
141            txn: self.txn.clone(),
142        }
143    }
144}
145
146impl<Txn, FE> Store<Txn, FE> {
147    pub fn new(dir: fs::Dir<FE>) -> Self {
148        Self {
149            dir,
150            txn: PhantomData,
151        }
152    }
153}
154
155impl<Txn, FE> Store<Txn, FE>
156where
157    Txn: Transaction<FE>,
158    FE: for<'a> FileSave<'a> + CollectionBlock + Clone,
159{
160    pub async fn resolve(&self, txn_id: TxnId, scalar: Scalar) -> TCResult<StoreEntry<Txn, FE>> {
161        debug!("History::resolve {:?}", scalar);
162
163        type OpSubject = tc_scalar::Subject;
164
165        if let Scalar::Ref(tc_ref) = scalar {
166            if let TCRef::Op(OpRef::Get((OpSubject::Ref(hash, classpath), schema))) = *tc_ref {
167                let hash = hash.into_id();
168                let store = self.dir.get_dir(txn_id, &hash).await?;
169                let schema = Value::try_cast_from(schema, |s| {
170                    internal!("invalid schema for Collection: {s:?}")
171                })
172                .and_then(|schema| Schema::try_from((classpath, schema)))?;
173
174                <CollectionBase<Txn, FE> as fs::Persist<FE>>::load(txn_id, schema, store)
175                    .map_ok(Collection::from)
176                    .map_ok(StoreEntry::Collection)
177                    .await
178            } else {
179                Err(internal!(
180                    "invalid subject for historical Chain state {:?}",
181                    tc_ref
182                ))
183            }
184        } else {
185            Ok(StoreEntry::Scalar(scalar))
186        }
187    }
188
189    pub async fn save_state(&self, txn: &Txn, state: StoreEntry<Txn, FE>) -> TCResult<Scalar> {
190        debug!("chain data store saving state {:?}...", state);
191
192        match state {
193            StoreEntry::Collection(collection) => {
194                let classpath = collection.class().path();
195                let schema = collection.schema();
196
197                let txn_id = *txn.id();
198                let hash = collection.clone().hash(txn_id).map_ok(Id::from).await?;
199
200                if !self.dir.contains(txn_id, &hash).await? {
201                    let store = self.dir.create_dir(txn_id, hash.clone()).await?;
202                    let _copy: CollectionBase<_, _> =
203                        fs::CopyFrom::copy_from(txn, store, collection).await?;
204                }
205
206                Ok(OpRef::Get((
207                    (hash.into(), classpath).into(),
208                    Value::cast_from(schema).into(),
209                ))
210                .into())
211            }
212            StoreEntry::Scalar(scalar) => Ok(scalar),
213        }
214    }
215}
216
217#[async_trait]
218impl<Txn, FE> Transact for Store<Txn, FE>
219where
220    FE: ThreadSafe + Clone + for<'a> fs::FileSave<'a>,
221    Txn: Transaction<FE>,
222{
223    type Commit = ();
224
225    async fn commit(&self, txn_id: TxnId) -> Self::Commit {
226        debug!("commit chain data store at {}", txn_id);
227        self.dir.commit(txn_id, true).await
228    }
229
230    async fn rollback(&self, txn_id: &TxnId) {
231        self.dir.rollback(*txn_id, true).await
232    }
233
234    async fn finalize(&self, txn_id: &TxnId) {
235        self.dir.finalize(*txn_id).await
236    }
237}