tc_collection/
lib.rs

1use std::fmt;
2use std::marker::PhantomData;
3
4use async_trait::async_trait;
5use destream::{de, en};
6use freqfs::FileSave;
7use futures::TryFutureExt;
8#[cfg(feature = "btree")]
9use safecast::{as_type, AsType, TryCastFrom};
10
11use tc_error::*;
12use tc_transact::fs;
13#[cfg(feature = "btree")]
14use tc_transact::hash::hash_try_stream;
15use tc_transact::hash::{AsyncHash, Digest, Hash, Output, Sha256};
16use tc_transact::IntoView;
17use tc_transact::{Transaction, TxnId};
18use tcgeneric::{
19    label, path_label, Class, Instance, Label, NativeClass, PathLabel, PathSegment, TCPathBuf,
20};
21
22#[cfg(feature = "btree")]
23use btree::{BTreeInstance, BTreeType};
24#[cfg(feature = "table")]
25use table::{TableInstance, TableStream, TableType};
26#[cfg(feature = "tensor")]
27use tensor::TensorType;
28
29pub use base::{CollectionBase, CollectionVisitor};
30#[cfg(feature = "btree")]
31pub use btree::{BTree, BTreeFile, Node as BTreeNode};
32pub use schema::Schema;
33#[cfg(feature = "table")]
34pub use table::{Table, TableFile};
35#[cfg(feature = "tensor")]
36pub use tensor::{
37    Dense, DenseBase, DenseCacheFile, DenseView, Node as TensorNode, Sparse, SparseBase,
38    SparseView, Tensor, TensorBase, TensorInstance, TensorView,
39};
40
41mod base;
42mod schema;
43
44#[cfg(feature = "btree")]
45pub mod btree;
46#[cfg(feature = "table")]
47pub mod table;
48#[cfg(feature = "tensor")]
49pub mod tensor;
50
51pub mod public;
52
53/// The prefix of the absolute path to [`Collection`] data types
54pub const PREFIX: PathLabel = path_label(&["state", "collection"]);
55
56const NULL: Label = label("null");
57
58/// A block in a [`Collection`]
59
60#[cfg(all(feature = "btree", not(feature = "tensor")))]
61pub trait CollectionBlock:
62    AsType<BTreeNode> + tcgeneric::ThreadSafe + Clone + for<'a> FileSave<'a>
63{
64}
65
66#[cfg(all(feature = "btree", not(feature = "tensor")))]
67impl<T> CollectionBlock for T where
68    T: AsType<BTreeNode> + tcgeneric::ThreadSafe + Clone + for<'a> FileSave<'a>
69{
70}
71
72#[cfg(feature = "tensor")]
73pub trait CollectionBlock:
74    DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone + for<'a> FileSave<'a>
75{
76}
77
78#[cfg(feature = "tensor")]
79impl<T> CollectionBlock for T where
80    T: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone + for<'a> FileSave<'a>
81{
82}
83
84#[cfg(not(feature = "btree"))]
85pub trait CollectionBlock: tcgeneric::ThreadSafe + Clone + for<'a> FileSave<'a> {}
86
87#[cfg(not(feature = "btree"))]
88impl<T> CollectionBlock for T where T: tcgeneric::ThreadSafe + Clone + for<'a> FileSave<'a> {}
89
90/// The [`Class`] of a `Collection`.
91#[derive(Clone, Copy, Eq, PartialEq)]
92pub enum CollectionType {
93    Null,
94    #[cfg(feature = "btree")]
95    BTree(BTreeType),
96    #[cfg(feature = "table")]
97    Table(TableType),
98    #[cfg(feature = "tensor")]
99    Tensor(TensorType),
100}
101
102impl Class for CollectionType {}
103
104impl NativeClass for CollectionType {
105    fn from_path(path: &[PathSegment]) -> Option<Self> {
106        if path.len() > 2 && &path[0..2] == &PREFIX[..] {
107            match path[2].as_str() {
108                #[cfg(feature = "btree")]
109                "btree" => BTreeType::from_path(path).map(Self::BTree),
110                #[cfg(feature = "table")]
111                "table" => TableType::from_path(path).map(Self::Table),
112                #[cfg(feature = "tensor")]
113                "tensor" => TensorType::from_path(path).map(Self::Tensor),
114                _ => None,
115            }
116        } else {
117            None
118        }
119    }
120
121    fn path(&self) -> TCPathBuf {
122        match self {
123            Self::Null => TCPathBuf::from(NULL),
124            #[cfg(feature = "btree")]
125            Self::BTree(btt) => btt.path(),
126            #[cfg(feature = "table")]
127            Self::Table(tt) => tt.path(),
128            #[cfg(feature = "tensor")]
129            Self::Tensor(tt) => tt.path(),
130        }
131    }
132}
133
134#[cfg(feature = "btree")]
135as_type!(CollectionType, BTree, BTreeType);
136#[cfg(feature = "table")]
137as_type!(CollectionType, Table, TableType);
138#[cfg(feature = "tensor")]
139as_type!(CollectionType, Tensor, TensorType);
140
141impl fmt::Debug for CollectionType {
142    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
143        match self {
144            Self::Null => f.write_str("null collection"),
145            #[cfg(feature = "btree")]
146            Self::BTree(btt) => fmt::Debug::fmt(btt, f),
147            #[cfg(feature = "table")]
148            Self::Table(tt) => fmt::Debug::fmt(tt, f),
149            #[cfg(feature = "tensor")]
150            Self::Tensor(tt) => fmt::Debug::fmt(tt, f),
151        }
152    }
153}
154
155/// A mutable transactional collection of data.
156pub enum Collection<Txn, FE> {
157    Null(fs::Dir<FE>, PhantomData<Txn>),
158    #[cfg(feature = "btree")]
159    BTree(BTree<Txn, FE>),
160    #[cfg(feature = "table")]
161    Table(Table<Txn, FE>),
162    #[cfg(feature = "tensor")]
163    Tensor(Tensor<Txn, FE>),
164}
165
166impl<Txn, FE> Clone for Collection<Txn, FE> {
167    fn clone(&self) -> Self {
168        match self {
169            Self::Null(dir, data) => Self::Null(dir.clone(), *data),
170            #[cfg(feature = "btree")]
171            Self::BTree(btree) => Self::BTree(btree.clone()),
172            #[cfg(feature = "table")]
173            Self::Table(table) => Self::Table(table.clone()),
174            #[cfg(feature = "tensor")]
175            Self::Tensor(tensor) => Self::Tensor(tensor.clone()),
176        }
177    }
178}
179
180#[cfg(feature = "btree")]
181as_type!(Collection<Txn, FE>, BTree, BTree<Txn, FE>);
182#[cfg(feature = "table")]
183as_type!(Collection<Txn, FE>, Table, Table<Txn, FE>);
184#[cfg(feature = "tensor")]
185as_type!(Collection<Txn, FE>, Tensor, Tensor<Txn, FE>);
186
187impl<Txn, FE> Collection<Txn, FE>
188where
189    Txn: Transaction<FE>,
190    FE: CollectionBlock,
191{
192    /// Return the [`Schema`] of this [`Collection`].
193    pub fn schema(&self) -> Schema {
194        match self {
195            Self::Null(_, _) => Schema::Null,
196            #[cfg(feature = "btree")]
197            Self::BTree(btree) => btree.schema().clone().into(),
198            #[cfg(feature = "table")]
199            Self::Table(table) => table.schema().clone().into(),
200            #[cfg(feature = "tensor")]
201            Self::Tensor(tensor) => match tensor {
202                Tensor::Dense(dense) => Schema::Dense(dense.schema()),
203                Tensor::Sparse(sparse) => Schema::Sparse(sparse.schema()),
204            },
205        }
206    }
207}
208
209impl<Txn, FE> Instance for Collection<Txn, FE>
210where
211    Txn: Send + Sync,
212    FE: Send + Sync,
213{
214    type Class = CollectionType;
215
216    fn class(&self) -> CollectionType {
217        match self {
218            Self::Null(_, _) => CollectionType::Null,
219            #[cfg(feature = "btree")]
220            Self::BTree(btree) => btree.class().into(),
221            #[cfg(feature = "table")]
222            Self::Table(table) => table.class().into(),
223            #[cfg(feature = "tensor")]
224            Self::Tensor(tensor) => tensor.class().into(),
225        }
226    }
227}
228
229#[async_trait]
230impl<Txn, FE> AsyncHash for Collection<Txn, FE>
231where
232    Txn: Transaction<FE>,
233    FE: CollectionBlock + Clone,
234{
235    #[allow(unused_variables)]
236    async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
237        let schema_hash = Hash::<Sha256>::hash(self.schema());
238
239        let contents_hash = match self {
240            Self::Null(_, _) => tc_transact::hash::default_hash::<Sha256>(),
241            #[cfg(feature = "btree")]
242            Self::BTree(btree) => {
243                let keys = btree.clone().keys(txn_id).await?;
244                hash_try_stream::<Sha256, _, _, _>(keys).await?
245            }
246            #[cfg(feature = "table")]
247            Self::Table(table) => {
248                let rows = table.clone().rows(txn_id).await?;
249                hash_try_stream::<Sha256, _, _, _>(rows).await?
250            }
251            #[cfg(feature = "tensor")]
252            Self::Tensor(tensor) => match tensor {
253                Tensor::Dense(dense) => {
254                    let elements = DenseView::from(dense.clone()).into_elements(txn_id).await?;
255                    hash_try_stream::<Sha256, _, _, _>(elements).await?
256                }
257                Tensor::Sparse(sparse) => {
258                    let elements = SparseView::from(sparse.clone())
259                        .into_elements(txn_id)
260                        .await?;
261
262                    hash_try_stream::<Sha256, _, _, _>(elements).await?
263                }
264            },
265        };
266
267        let mut hasher = Sha256::new();
268        hasher.update(schema_hash);
269        hasher.update(contents_hash);
270        Ok(hasher.finalize())
271    }
272}
273
274impl<Txn, FE> From<CollectionBase<Txn, FE>> for Collection<Txn, FE> {
275    fn from(base: CollectionBase<Txn, FE>) -> Self {
276        match base {
277            CollectionBase::Null(dir, data) => Self::Null(dir, data),
278            #[cfg(feature = "btree")]
279            CollectionBase::BTree(btree) => Self::BTree(btree.into()),
280            #[cfg(feature = "table")]
281            CollectionBase::Table(table) => Self::Table(table.into()),
282            #[cfg(feature = "tensor")]
283            CollectionBase::Tensor(tensor) => Self::Tensor(tensor.into()),
284        }
285    }
286}
287
288#[cfg(feature = "btree")]
289impl<Txn, FE> From<BTreeFile<Txn, FE>> for Collection<Txn, FE> {
290    fn from(btree: BTreeFile<Txn, FE>) -> Self {
291        Self::BTree(btree.into())
292    }
293}
294
295#[async_trait]
296impl<'en, Txn, FE> IntoView<'en, FE> for Collection<Txn, FE>
297where
298    Txn: Transaction<FE>,
299    FE: CollectionBlock,
300{
301    type Txn = Txn;
302    type View = CollectionView<'en>;
303
304    #[allow(unused_variables)]
305    async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View> {
306        match self {
307            Self::Null(_dir, data) => Ok(CollectionView::Null(PhantomData)),
308            #[cfg(feature = "btree")]
309            Self::BTree(btree) => btree.into_view(txn).map_ok(CollectionView::BTree).await,
310            #[cfg(feature = "table")]
311            Self::Table(table) => table.into_view(txn).map_ok(CollectionView::Table).await,
312            #[cfg(feature = "tensor")]
313            Self::Tensor(tensor) => tensor.into_view(txn).map_ok(CollectionView::Tensor).await,
314        }
315    }
316}
317
318#[async_trait]
319impl<T, FE> de::FromStream for Collection<T, FE>
320where
321    T: Transaction<FE>,
322    FE: CollectionBlock,
323{
324    type Context = T;
325
326    async fn from_stream<D: de::Decoder>(
327        txn: Self::Context,
328        decoder: &mut D,
329    ) -> Result<Self, D::Error> {
330        decoder
331            .decode_map(CollectionVisitor::new(txn))
332            .map_ok(Self::from)
333            .await
334    }
335}
336
337#[cfg(feature = "btree")]
338impl<Txn, FE> TryCastFrom<Collection<Txn, FE>> for BTree<Txn, FE> {
339    fn can_cast_from(collection: &Collection<Txn, FE>) -> bool {
340        match collection {
341            Collection::BTree(_) => true,
342            _ => false,
343        }
344    }
345
346    fn opt_cast_from(collection: Collection<Txn, FE>) -> Option<Self> {
347        match collection {
348            Collection::BTree(btree) => Some(btree),
349            _ => None,
350        }
351    }
352}
353
354#[cfg(feature = "table")]
355impl<Txn, FE> TryCastFrom<Collection<Txn, FE>> for Table<Txn, FE> {
356    fn can_cast_from(collection: &Collection<Txn, FE>) -> bool {
357        match collection {
358            Collection::Table(_) => true,
359            _ => false,
360        }
361    }
362
363    fn opt_cast_from(collection: Collection<Txn, FE>) -> Option<Self> {
364        match collection {
365            Collection::Table(table) => Some(table),
366            _ => None,
367        }
368    }
369}
370
371#[cfg(feature = "tensor")]
372impl<Txn, FE> TryCastFrom<Collection<Txn, FE>> for Tensor<Txn, FE> {
373    fn can_cast_from(collection: &Collection<Txn, FE>) -> bool {
374        match collection {
375            Collection::Tensor(_) => true,
376            _ => false,
377        }
378    }
379
380    fn opt_cast_from(collection: Collection<Txn, FE>) -> Option<Self> {
381        match collection {
382            Collection::Tensor(tensor) => Some(tensor),
383            _ => None,
384        }
385    }
386}
387
388impl<Txn, FE> fmt::Debug for Collection<Txn, FE> {
389    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
390        match self {
391            Self::Null(_, _) => f.write_str("null collection"),
392            #[cfg(feature = "btree")]
393            Self::BTree(btree) => btree.fmt(f),
394            #[cfg(feature = "table")]
395            Self::Table(table) => table.fmt(f),
396            #[cfg(feature = "tensor")]
397            Self::Tensor(tensor) => tensor.fmt(f),
398        }
399    }
400}
401
402/// A view of a [`Collection`] within a single `Transaction`, used for serialization.
403pub enum CollectionView<'en> {
404    Null(PhantomData<&'en ()>),
405    #[cfg(feature = "btree")]
406    BTree(btree::BTreeView<'en>),
407    #[cfg(feature = "table")]
408    Table(table::TableView<'en>),
409    #[cfg(feature = "tensor")]
410    Tensor(tensor::view::TensorView),
411}
412
413impl<'en> en::IntoStream<'en> for CollectionView<'en> {
414    fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
415        use en::EncodeMap;
416
417        let mut map = encoder.encode_map(Some(1))?;
418
419        match self {
420            Self::Null(_) => map.encode_entry(CollectionType::Null.path(), ())?,
421            #[cfg(feature = "btree")]
422            Self::BTree(btree) => {
423                let classpath = BTreeType::default().path();
424                map.encode_entry(classpath.to_string(), btree)?;
425            }
426            #[cfg(feature = "table")]
427            Self::Table(table) => {
428                let classpath = TableType::default().path();
429                map.encode_entry(classpath.to_string(), table)?;
430            }
431            #[cfg(feature = "tensor")]
432            Self::Tensor(tensor) => {
433                let classpath = match tensor {
434                    tensor::view::TensorView::Dense(_) => TensorType::Dense,
435                    tensor::view::TensorView::Sparse(_) => TensorType::Sparse,
436                }
437                .path();
438
439                map.encode_entry(classpath.to_string(), tensor)?;
440            }
441        }
442
443        map.end()
444    }
445}
446
447#[cfg(feature = "btree")]
448async fn finalize_dir<FE: Send + Sync>(dir: &freqfs::DirLock<FE>, txn_id: &TxnId) {
449    let dir = dir.read().await;
450
451    let versions = dir
452        .get_dir(tc_transact::fs::VERSIONS)
453        .expect("transactional versions directory");
454
455    let mut versions = versions.write().await;
456    versions.delete(txn_id).await;
457}