tc_collection/
base.rs

1use std::fmt;
2use std::marker::PhantomData;
3
4use async_trait::async_trait;
5use destream::de;
6use futures::TryFutureExt;
7use log::{debug, info};
8use safecast::TryCastFrom;
9
10use tc_error::*;
11use tc_transact::fs::{CopyFrom, Dir, Persist, Restore};
12use tc_transact::hash::{AsyncHash, Output, Sha256};
13use tc_transact::{fs, IntoView, Transact, Transaction, TxnId};
14use tcgeneric::{Instance, NativeClass, TCPathBuf, ThreadSafe};
15
16#[cfg(feature = "btree")]
17use crate::btree::{BTree, BTreeFile, BTreeInstance};
18#[cfg(feature = "table")]
19use crate::table::{Table, TableFile, TableInstance};
20#[cfg(feature = "tensor")]
21use crate::tensor::{
22    Dense, DenseBase, Sparse, SparseBase, Tensor, TensorBase, TensorInstance, TensorType,
23};
24use crate::{Collection, CollectionBlock, CollectionType, CollectionView, Schema};
25
26/// The base type of a mutable transactional collection of data.
27pub enum CollectionBase<Txn, FE> {
28    Null(Dir<FE>, PhantomData<Txn>),
29    #[cfg(feature = "btree")]
30    BTree(BTreeFile<Txn, FE>),
31    #[cfg(feature = "table")]
32    Table(TableFile<Txn, FE>),
33    #[cfg(feature = "tensor")]
34    Tensor(TensorBase<Txn, FE>),
35}
36
37impl<Txn, FE> Clone for CollectionBase<Txn, FE> {
38    fn clone(&self) -> Self {
39        match self {
40            Self::Null(dir, txn) => Self::Null(dir.clone(), *txn),
41            #[cfg(feature = "btree")]
42            Self::BTree(btree) => Self::BTree(btree.clone()),
43            #[cfg(feature = "table")]
44            Self::Table(table) => Self::Table(table.clone()),
45            #[cfg(feature = "tensor")]
46            Self::Tensor(tensor) => Self::Tensor(tensor.clone()),
47        }
48    }
49}
50
51impl<Txn, FE> CollectionBase<Txn, FE>
52where
53    Txn: Transaction<FE>,
54    FE: CollectionBlock,
55{
56    /// Return the [`Schema`] of this [`Collection`]
57    pub fn schema(&self) -> Schema {
58        match self {
59            Self::Null(_, _) => Schema::Null,
60            #[cfg(feature = "btree")]
61            Self::BTree(btree) => btree.schema().clone().into(),
62            #[cfg(feature = "table")]
63            Self::Table(table) => table.schema().clone().into(),
64            #[cfg(feature = "tensor")]
65            Self::Tensor(tensor) => match tensor {
66                TensorBase::Dense(dense) => Schema::Dense(dense.schema()),
67                TensorBase::Sparse(sparse) => Schema::Sparse(sparse.schema()),
68            },
69        }
70    }
71}
72
73impl<Txn, FE> Instance for CollectionBase<Txn, FE>
74where
75    Txn: Transaction<FE>,
76    FE: ThreadSafe,
77{
78    type Class = CollectionType;
79
80    fn class(&self) -> CollectionType {
81        match self {
82            Self::Null(_, _) => CollectionType::Null,
83            #[cfg(feature = "btree")]
84            Self::BTree(btree) => btree.class().into(),
85            #[cfg(feature = "table")]
86            Self::Table(table) => table.class().into(),
87            #[cfg(feature = "tensor")]
88            Self::Tensor(tensor) => tensor.class().into(),
89        }
90    }
91}
92
93#[async_trait]
94impl<Txn, FE> Transact for CollectionBase<Txn, FE>
95where
96    Txn: Transaction<FE>,
97    FE: CollectionBlock,
98{
99    type Commit = ();
100
101    #[allow(unused_variables)]
102    async fn commit(&self, txn_id: TxnId) -> Self::Commit {
103        match self {
104            Self::Null(_, _) => (),
105            #[cfg(feature = "btree")]
106            Self::BTree(btree) => btree.commit(txn_id).await,
107            #[cfg(feature = "table")]
108            Self::Table(table) => table.commit(txn_id).await,
109            #[cfg(feature = "tensor")]
110            Self::Tensor(tensor) => tensor.commit(txn_id).await,
111        }
112    }
113
114    #[allow(unused_variables)]
115    async fn rollback(&self, txn_id: &TxnId) {
116        match self {
117            Self::Null(_, _) => (),
118            #[cfg(feature = "btree")]
119            Self::BTree(btree) => btree.rollback(txn_id).await,
120            #[cfg(feature = "table")]
121            Self::Table(table) => table.rollback(txn_id).await,
122            #[cfg(feature = "tensor")]
123            Self::Tensor(tensor) => tensor.rollback(txn_id).await,
124        }
125    }
126
127    #[allow(unused_variables)]
128    async fn finalize(&self, txn_id: &TxnId) {
129        match self {
130            Self::Null(_, _) => (),
131            #[cfg(feature = "btree")]
132            Self::BTree(btree) => btree.finalize(txn_id).await,
133            #[cfg(feature = "table")]
134            Self::Table(table) => table.finalize(txn_id).await,
135            #[cfg(feature = "tensor")]
136            Self::Tensor(tensor) => tensor.finalize(txn_id).await,
137        }
138    }
139}
140
141#[async_trait]
142impl<T, FE> AsyncHash for CollectionBase<T, FE>
143where
144    T: Transaction<FE>,
145    FE: CollectionBlock,
146{
147    async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
148        Collection::from(self.clone()).hash(txn_id).await
149    }
150}
151
152#[async_trait]
153impl<Txn, FE> Persist<FE> for CollectionBase<Txn, FE>
154where
155    Txn: Transaction<FE>,
156    FE: CollectionBlock,
157{
158    type Txn = Txn;
159    type Schema = Schema;
160
161    #[allow(unused_variables)]
162    async fn create(txn_id: TxnId, schema: Schema, store: Dir<FE>) -> TCResult<Self> {
163        debug!("create persistent mutable collection at {store:?}");
164
165        match schema {
166            Schema::Null => Ok(Self::Null(store, PhantomData)),
167            #[cfg(feature = "btree")]
168            Schema::BTree(schema) => {
169                BTreeFile::create(txn_id, schema, store)
170                    .map_ok(Self::BTree)
171                    .await
172            }
173            #[cfg(feature = "table")]
174            Schema::Table(schema) => {
175                TableFile::create(txn_id, schema, store)
176                    .map_ok(Self::Table)
177                    .await
178            }
179            #[cfg(feature = "tensor")]
180            Schema::Dense(schema) => {
181                DenseBase::create(txn_id, schema, store)
182                    .map_ok(TensorBase::Dense)
183                    .map_ok(Self::Tensor)
184                    .await
185            }
186            #[cfg(feature = "tensor")]
187            Schema::Sparse(schema) => {
188                SparseBase::create(txn_id, schema.into(), store)
189                    .map_ok(TensorBase::Sparse)
190                    .map_ok(Self::Tensor)
191                    .await
192            }
193        }
194    }
195
196    #[allow(unused_variables)]
197    async fn load(txn_id: TxnId, schema: Schema, store: Dir<FE>) -> TCResult<Self> {
198        info!("load persistent mutable collection at {store:?}");
199
200        match schema {
201            Schema::Null => Ok(Self::Null(store, PhantomData)),
202            #[cfg(feature = "btree")]
203            Schema::BTree(schema) => {
204                BTreeFile::load(txn_id, schema, store)
205                    .map_ok(Self::BTree)
206                    .await
207            }
208            #[cfg(feature = "table")]
209            Schema::Table(schema) => {
210                TableFile::load(txn_id, schema, store)
211                    .map_ok(Self::Table)
212                    .await
213            }
214            #[cfg(feature = "tensor")]
215            Schema::Dense(schema) => {
216                DenseBase::load(txn_id, schema, store)
217                    .map_ok(TensorBase::Dense)
218                    .map_ok(Self::Tensor)
219                    .await
220            }
221            #[cfg(feature = "tensor")]
222            Schema::Sparse(schema) => {
223                SparseBase::load(txn_id, schema.into(), store)
224                    .map_ok(TensorBase::Sparse)
225                    .map_ok(Self::Tensor)
226                    .await
227            }
228        }
229    }
230
231    fn dir(&self) -> tc_transact::fs::Inner<FE> {
232        match self {
233            Self::Null(store, _) => store.clone().into_inner(),
234            #[cfg(feature = "btree")]
235            Self::BTree(btree) => btree.dir(),
236            #[cfg(feature = "table")]
237            Self::Table(table) => table.dir(),
238            #[cfg(feature = "tensor")]
239            Self::Tensor(tensor) => tensor.dir(),
240        }
241    }
242}
243
244#[async_trait]
245impl<Txn, FE> CopyFrom<FE, Collection<Txn, FE>> for CollectionBase<Txn, FE>
246where
247    Txn: Transaction<FE>,
248    FE: CollectionBlock,
249{
250    #[allow(unused_variables)]
251    async fn copy_from(txn: &Txn, store: Dir<FE>, instance: Collection<Txn, FE>) -> TCResult<Self> {
252        match instance {
253            Collection::Null(_, _) => Ok(Self::Null(store, PhantomData)),
254            #[cfg(feature = "btree")]
255            Collection::BTree(instance) => {
256                BTreeFile::copy_from(txn, store, instance)
257                    .map_ok(Self::BTree)
258                    .await
259            }
260            #[cfg(feature = "table")]
261            Collection::Table(instance) => {
262                TableFile::copy_from(txn, store, instance)
263                    .map_ok(Self::Table)
264                    .await
265            }
266            #[cfg(feature = "tensor")]
267            Collection::Tensor(instance) => {
268                TensorBase::copy_from(txn, store, instance.into())
269                    .map_ok(Self::Tensor)
270                    .await
271            }
272        }
273    }
274}
275
276#[async_trait]
277impl<Txn, FE> Restore<FE> for CollectionBase<Txn, FE>
278where
279    Txn: Transaction<FE>,
280    FE: CollectionBlock,
281{
282    #[allow(unused_variables)]
283    async fn restore(&self, txn_id: TxnId, backup: &Self) -> TCResult<()> {
284        match (self, backup) {
285            (Self::Null(_, _), Self::Null(_, _)) => Ok(()),
286            #[cfg(feature = "btree")]
287            (Self::BTree(this), Self::BTree(backup)) => this.restore(txn_id, backup).await,
288            #[cfg(feature = "table")]
289            (Self::Table(this), Self::Table(backup)) => this.restore(txn_id, backup).await,
290            #[cfg(feature = "tensor")]
291            (Self::Tensor(this), Self::Tensor(backup)) => this.restore(txn_id, backup).await,
292            #[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
293            (this, that) => Err(bad_request!("cannot restore {:?} from {:?}", this, that)),
294        }
295    }
296}
297
298impl<T, FE> TryCastFrom<Collection<T, FE>> for CollectionBase<T, FE> {
299    fn can_cast_from(collection: &Collection<T, FE>) -> bool {
300        match collection {
301            Collection::Null(_, _) => true,
302            #[cfg(feature = "btree")]
303            Collection::BTree(BTree::File(_)) => true,
304            #[cfg(feature = "table")]
305            Collection::Table(Table::Table(_)) => true,
306            #[cfg(feature = "tensor")]
307            Collection::Tensor(Tensor::Dense(Dense::Base(_))) => true,
308            #[cfg(feature = "tensor")]
309            Collection::Tensor(Tensor::Sparse(Sparse::Base(_))) => true,
310            #[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
311            _ => false,
312        }
313    }
314
315    fn opt_cast_from(collection: Collection<T, FE>) -> Option<Self> {
316        match collection {
317            Collection::Null(dir, data) => Some(Self::Null(dir, data)),
318            #[cfg(feature = "btree")]
319            Collection::BTree(BTree::File(btree)) => Some(Self::BTree(btree)),
320            #[cfg(feature = "table")]
321            Collection::Table(Table::Table(table)) => Some(Self::Table(table)),
322            #[cfg(feature = "tensor")]
323            Collection::Tensor(Tensor::Dense(Dense::Base(dense))) => {
324                Some(Self::Tensor(TensorBase::Dense(dense)))
325            }
326            #[cfg(feature = "tensor")]
327            Collection::Tensor(Tensor::Sparse(Sparse::Base(sparse))) => {
328                Some(Self::Tensor(TensorBase::Sparse(sparse)))
329            }
330            #[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
331            _ => None,
332        }
333    }
334}
335
336#[async_trait]
337impl<'en, T, FE> IntoView<'en, FE> for CollectionBase<T, FE>
338where
339    T: Transaction<FE>,
340    FE: CollectionBlock,
341    Self: 'en,
342{
343    type Txn = T;
344    type View = CollectionView<'en>;
345
346    async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View> {
347        Collection::from(self).into_view(txn).await
348    }
349}
350
351impl<T, FE> fmt::Debug for CollectionBase<T, FE> {
352    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353        f.write_str("a Collection")
354    }
355}
356
357/// A [`de::Visitor`] used to deserialize a [`Collection`].
358pub struct CollectionVisitor<Txn, FE> {
359    txn: Txn,
360    phantom: PhantomData<FE>,
361}
362
363impl<Txn, FE> CollectionVisitor<Txn, FE>
364where
365    Txn: Transaction<FE>,
366    FE: CollectionBlock,
367{
368    pub fn new(txn: Txn) -> Self {
369        Self {
370            txn,
371            phantom: PhantomData,
372        }
373    }
374
375    pub async fn visit_map_value<A: de::MapAccess>(
376        self,
377        class: CollectionType,
378        access: &mut A,
379    ) -> Result<CollectionBase<Txn, FE>, A::Error> {
380        debug!("CollectionVisitor::visit_map_value with class {:?}", class);
381
382        match class {
383            CollectionType::Null => {
384                let _: () = access.next_value(()).await?;
385                let dir = self
386                    .txn
387                    .context()
388                    .and_then(|dir| fs::Dir::load(*self.txn.id(), dir))
389                    .map_err(de::Error::custom)
390                    .await?;
391
392                Ok(CollectionBase::Null(dir, PhantomData))
393            }
394
395            #[cfg(feature = "btree")]
396            CollectionType::BTree(_) => {
397                access
398                    .next_value(self.txn)
399                    .map_ok(CollectionBase::BTree)
400                    .await
401            }
402
403            #[cfg(feature = "table")]
404            CollectionType::Table(_) => {
405                access
406                    .next_value(self.txn)
407                    .map_ok(CollectionBase::Table)
408                    .await
409            }
410
411            #[cfg(feature = "tensor")]
412            CollectionType::Tensor(tt) => match tt {
413                TensorType::Dense => {
414                    access
415                        .next_value(self.txn)
416                        .map_ok(TensorBase::Dense)
417                        .map_ok(CollectionBase::Tensor)
418                        .await
419                }
420                TensorType::Sparse => {
421                    access
422                        .next_value(self.txn)
423                        .map_ok(TensorBase::Sparse)
424                        .map_ok(CollectionBase::Tensor)
425                        .await
426                }
427            },
428        }
429    }
430}
431
432#[async_trait]
433impl<T, FE> de::Visitor for CollectionVisitor<T, FE>
434where
435    T: Transaction<FE>,
436    FE: CollectionBlock,
437{
438    type Value = CollectionBase<T, FE>;
439
440    fn expecting() -> &'static str {
441        "a Collection"
442    }
443
444    async fn visit_map<A: de::MapAccess>(self, mut map: A) -> Result<Self::Value, A::Error> {
445        let classpath: TCPathBuf = map
446            .next_key(())
447            .await?
448            .ok_or_else(|| de::Error::custom("expected a Collection type"))?;
449
450        let class = CollectionType::from_path(&classpath)
451            .ok_or_else(|| de::Error::invalid_value(classpath, "a Collection type"))?;
452
453        self.visit_map_value(class, &mut map).await
454    }
455}
456
457#[async_trait]
458impl<T, FE> de::FromStream for CollectionBase<T, FE>
459where
460    T: Transaction<FE>,
461    FE: CollectionBlock,
462{
463    type Context = T;
464
465    async fn from_stream<D: de::Decoder>(
466        txn: Self::Context,
467        decoder: &mut D,
468    ) -> Result<Self, D::Error> {
469        decoder.decode_map(CollectionVisitor::new(txn)).await
470    }
471}