1use std::fmt;
2use std::marker::PhantomData;
3
4use async_trait::async_trait;
5use destream::en;
6use freqfs::FileSave;
7use futures::future::TryFutureExt;
8use log::debug;
9use safecast::*;
10
11use tc_collection::{Collection, CollectionBase, CollectionBlock, CollectionView, Schema};
12use tc_error::*;
13use tc_scalar::{OpRef, Scalar, TCRef};
14use tc_transact::fs;
15use tc_transact::hash::{AsyncHash, Hash, Output, Sha256};
16use tc_transact::public::StateInstance;
17use tc_transact::{IntoView, Transact, Transaction, TxnId};
18use tc_value::Value;
19use tcgeneric::{Id, Instance, NativeClass, ThreadSafe};
20
21pub enum StoreEntry<Txn, FE> {
22 Collection(Collection<Txn, FE>),
23 Scalar(Scalar),
24}
25
26impl<Txn, FE> Clone for StoreEntry<Txn, FE>
27where
28 Collection<Txn, FE>: Clone,
29{
30 fn clone(&self) -> Self {
31 match self {
32 Self::Collection(collection) => Self::Collection(collection.clone()),
33 Self::Scalar(scalar) => Self::Scalar(scalar.clone()),
34 }
35 }
36}
37
38impl<Txn, FE> StoreEntry<Txn, FE> {
39 pub fn try_from_state<State>(state: State) -> TCResult<Self>
40 where
41 State: StateInstance<Txn = Txn, FE = FE>,
42 Txn: Transaction<FE>,
43 FE: CollectionBlock + Clone,
44 Collection<Txn, FE>: TryCastFrom<State>,
45 Scalar: TryCastFrom<State>,
46 {
47 if Collection::<_, _>::can_cast_from(&state) {
48 state
49 .try_cast_into(|s| bad_request!("not a collection: {s:?}"))
50 .map(Self::Collection)
51 } else if Scalar::can_cast_from(&state) {
52 state
53 .try_cast_into(|s| bad_request!("not a scalar: {s:?}"))
54 .map(Self::Scalar)
55 } else {
56 Err(bad_request!("invalid Chain value entry: {state:?}"))
57 }
58 }
59
60 pub fn into_state<State>(self) -> State
61 where
62 State: StateInstance<Txn = Txn, FE = FE> + From<Collection<Txn, FE>> + From<Scalar>,
63 {
64 match self {
65 Self::Collection(collection) => collection.into(),
66 Self::Scalar(scalar) => scalar.into(),
67 }
68 }
69}
70
71#[async_trait]
72impl<'a, Txn, FE> AsyncHash for &'a StoreEntry<Txn, FE>
73where
74 FE: CollectionBlock + Clone,
75 Txn: Transaction<FE>,
76 Collection<Txn, FE>: AsyncHash,
77 Scalar: Hash<Sha256>,
78{
79 async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
80 match self {
81 StoreEntry::Collection(collection) => collection.clone().hash(txn_id).await,
82 StoreEntry::Scalar(scalar) => Ok(Hash::<Sha256>::hash(scalar)),
83 }
84 }
85}
86
87#[async_trait]
88impl<'en, Txn, FE> IntoView<'en, FE> for StoreEntry<Txn, FE>
89where
90 Txn: Transaction<FE>,
91 FE: CollectionBlock + Clone,
92{
93 type Txn = Txn;
94 type View = StoreEntryView<'en>;
95
96 async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View> {
97 match self {
98 Self::Collection(collection) => {
99 collection
100 .into_view(txn)
101 .map_ok(StoreEntryView::Collection)
102 .await
103 }
104 Self::Scalar(scalar) => Ok(StoreEntryView::Scalar(scalar)),
105 }
106 }
107}
108
109impl<Txn, FE> fmt::Debug for StoreEntry<Txn, FE> {
110 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
111 match self {
112 Self::Collection(collection) => collection.fmt(f),
113 Self::Scalar(scalar) => scalar.fmt(f),
114 }
115 }
116}
117
118pub enum StoreEntryView<'en> {
119 Collection(CollectionView<'en>),
120 Scalar(Scalar),
121}
122
123impl<'en> en::IntoStream<'en> for StoreEntryView<'en> {
124 fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
125 match self {
126 Self::Collection(collection) => collection.into_stream(encoder),
127 Self::Scalar(scalar) => scalar.into_stream(encoder),
128 }
129 }
130}
131
132pub struct Store<Txn, FE> {
133 dir: fs::Dir<FE>,
134 txn: PhantomData<Txn>,
135}
136
137impl<Txn, FE> Clone for Store<Txn, FE> {
138 fn clone(&self) -> Self {
139 Self {
140 dir: self.dir.clone(),
141 txn: self.txn.clone(),
142 }
143 }
144}
145
146impl<Txn, FE> Store<Txn, FE> {
147 pub fn new(dir: fs::Dir<FE>) -> Self {
148 Self {
149 dir,
150 txn: PhantomData,
151 }
152 }
153}
154
155impl<Txn, FE> Store<Txn, FE>
156where
157 Txn: Transaction<FE>,
158 FE: for<'a> FileSave<'a> + CollectionBlock + Clone,
159{
160 pub async fn resolve(&self, txn_id: TxnId, scalar: Scalar) -> TCResult<StoreEntry<Txn, FE>> {
161 debug!("History::resolve {:?}", scalar);
162
163 type OpSubject = tc_scalar::Subject;
164
165 if let Scalar::Ref(tc_ref) = scalar {
166 if let TCRef::Op(OpRef::Get((OpSubject::Ref(hash, classpath), schema))) = *tc_ref {
167 let hash = hash.into_id();
168 let store = self.dir.get_dir(txn_id, &hash).await?;
169 let schema = Value::try_cast_from(schema, |s| {
170 internal!("invalid schema for Collection: {s:?}")
171 })
172 .and_then(|schema| Schema::try_from((classpath, schema)))?;
173
174 <CollectionBase<Txn, FE> as fs::Persist<FE>>::load(txn_id, schema, store)
175 .map_ok(Collection::from)
176 .map_ok(StoreEntry::Collection)
177 .await
178 } else {
179 Err(internal!(
180 "invalid subject for historical Chain state {:?}",
181 tc_ref
182 ))
183 }
184 } else {
185 Ok(StoreEntry::Scalar(scalar))
186 }
187 }
188
189 pub async fn save_state(&self, txn: &Txn, state: StoreEntry<Txn, FE>) -> TCResult<Scalar> {
190 debug!("chain data store saving state {:?}...", state);
191
192 match state {
193 StoreEntry::Collection(collection) => {
194 let classpath = collection.class().path();
195 let schema = collection.schema();
196
197 let txn_id = *txn.id();
198 let hash = collection.clone().hash(txn_id).map_ok(Id::from).await?;
199
200 if !self.dir.contains(txn_id, &hash).await? {
201 let store = self.dir.create_dir(txn_id, hash.clone()).await?;
202 let _copy: CollectionBase<_, _> =
203 fs::CopyFrom::copy_from(txn, store, collection).await?;
204 }
205
206 Ok(OpRef::Get((
207 (hash.into(), classpath).into(),
208 Value::cast_from(schema).into(),
209 ))
210 .into())
211 }
212 StoreEntry::Scalar(scalar) => Ok(scalar),
213 }
214 }
215}
216
217#[async_trait]
218impl<Txn, FE> Transact for Store<Txn, FE>
219where
220 FE: ThreadSafe + Clone + for<'a> fs::FileSave<'a>,
221 Txn: Transaction<FE>,
222{
223 type Commit = ();
224
225 async fn commit(&self, txn_id: TxnId) -> Self::Commit {
226 debug!("commit chain data store at {}", txn_id);
227 self.dir.commit(txn_id, true).await
228 }
229
230 async fn rollback(&self, txn_id: &TxnId) {
231 self.dir.rollback(*txn_id, true).await
232 }
233
234 async fn finalize(&self, txn_id: &TxnId) {
235 self.dir.finalize(*txn_id).await
236 }
237}