1use std::fmt;
2use std::marker::PhantomData;
3
4use async_trait::async_trait;
5use destream::{de, en};
6use freqfs::FileSave;
7use futures::TryFutureExt;
8#[cfg(feature = "btree")]
9use safecast::{as_type, AsType, TryCastFrom};
10
11use tc_error::*;
12use tc_transact::fs;
13#[cfg(feature = "btree")]
14use tc_transact::hash::hash_try_stream;
15use tc_transact::hash::{AsyncHash, Digest, Hash, Output, Sha256};
16use tc_transact::IntoView;
17use tc_transact::{Transaction, TxnId};
18use tcgeneric::{
19 label, path_label, Class, Instance, Label, NativeClass, PathLabel, PathSegment, TCPathBuf,
20};
21
22#[cfg(feature = "btree")]
23use btree::{BTreeInstance, BTreeType};
24#[cfg(feature = "table")]
25use table::{TableInstance, TableStream, TableType};
26#[cfg(feature = "tensor")]
27use tensor::TensorType;
28
29pub use base::{CollectionBase, CollectionVisitor};
30#[cfg(feature = "btree")]
31pub use btree::{BTree, BTreeFile, Node as BTreeNode};
32pub use schema::Schema;
33#[cfg(feature = "table")]
34pub use table::{Table, TableFile};
35#[cfg(feature = "tensor")]
36pub use tensor::{
37 Dense, DenseBase, DenseCacheFile, DenseView, Node as TensorNode, Sparse, SparseBase,
38 SparseView, Tensor, TensorBase, TensorInstance, TensorView,
39};
40
41mod base;
42mod schema;
43
44#[cfg(feature = "btree")]
45pub mod btree;
46#[cfg(feature = "table")]
47pub mod table;
48#[cfg(feature = "tensor")]
49pub mod tensor;
50
51pub mod public;
52
53pub const PREFIX: PathLabel = path_label(&["state", "collection"]);
55
56const NULL: Label = label("null");
57
58#[cfg(all(feature = "btree", not(feature = "tensor")))]
61pub trait CollectionBlock:
62 AsType<BTreeNode> + tcgeneric::ThreadSafe + Clone + for<'a> FileSave<'a>
63{
64}
65
66#[cfg(all(feature = "btree", not(feature = "tensor")))]
67impl<T> CollectionBlock for T where
68 T: AsType<BTreeNode> + tcgeneric::ThreadSafe + Clone + for<'a> FileSave<'a>
69{
70}
71
72#[cfg(feature = "tensor")]
73pub trait CollectionBlock:
74 DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone + for<'a> FileSave<'a>
75{
76}
77
78#[cfg(feature = "tensor")]
79impl<T> CollectionBlock for T where
80 T: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone + for<'a> FileSave<'a>
81{
82}
83
84#[cfg(not(feature = "btree"))]
85pub trait CollectionBlock: tcgeneric::ThreadSafe + Clone + for<'a> FileSave<'a> {}
86
87#[cfg(not(feature = "btree"))]
88impl<T> CollectionBlock for T where T: tcgeneric::ThreadSafe + Clone + for<'a> FileSave<'a> {}
89
90#[derive(Clone, Copy, Eq, PartialEq)]
92pub enum CollectionType {
93 Null,
94 #[cfg(feature = "btree")]
95 BTree(BTreeType),
96 #[cfg(feature = "table")]
97 Table(TableType),
98 #[cfg(feature = "tensor")]
99 Tensor(TensorType),
100}
101
102impl Class for CollectionType {}
103
104impl NativeClass for CollectionType {
105 fn from_path(path: &[PathSegment]) -> Option<Self> {
106 if path.len() > 2 && &path[0..2] == &PREFIX[..] {
107 match path[2].as_str() {
108 #[cfg(feature = "btree")]
109 "btree" => BTreeType::from_path(path).map(Self::BTree),
110 #[cfg(feature = "table")]
111 "table" => TableType::from_path(path).map(Self::Table),
112 #[cfg(feature = "tensor")]
113 "tensor" => TensorType::from_path(path).map(Self::Tensor),
114 _ => None,
115 }
116 } else {
117 None
118 }
119 }
120
121 fn path(&self) -> TCPathBuf {
122 match self {
123 Self::Null => TCPathBuf::from(NULL),
124 #[cfg(feature = "btree")]
125 Self::BTree(btt) => btt.path(),
126 #[cfg(feature = "table")]
127 Self::Table(tt) => tt.path(),
128 #[cfg(feature = "tensor")]
129 Self::Tensor(tt) => tt.path(),
130 }
131 }
132}
133
134#[cfg(feature = "btree")]
135as_type!(CollectionType, BTree, BTreeType);
136#[cfg(feature = "table")]
137as_type!(CollectionType, Table, TableType);
138#[cfg(feature = "tensor")]
139as_type!(CollectionType, Tensor, TensorType);
140
141impl fmt::Debug for CollectionType {
142 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
143 match self {
144 Self::Null => f.write_str("null collection"),
145 #[cfg(feature = "btree")]
146 Self::BTree(btt) => fmt::Debug::fmt(btt, f),
147 #[cfg(feature = "table")]
148 Self::Table(tt) => fmt::Debug::fmt(tt, f),
149 #[cfg(feature = "tensor")]
150 Self::Tensor(tt) => fmt::Debug::fmt(tt, f),
151 }
152 }
153}
154
155pub enum Collection<Txn, FE> {
157 Null(fs::Dir<FE>, PhantomData<Txn>),
158 #[cfg(feature = "btree")]
159 BTree(BTree<Txn, FE>),
160 #[cfg(feature = "table")]
161 Table(Table<Txn, FE>),
162 #[cfg(feature = "tensor")]
163 Tensor(Tensor<Txn, FE>),
164}
165
166impl<Txn, FE> Clone for Collection<Txn, FE> {
167 fn clone(&self) -> Self {
168 match self {
169 Self::Null(dir, data) => Self::Null(dir.clone(), *data),
170 #[cfg(feature = "btree")]
171 Self::BTree(btree) => Self::BTree(btree.clone()),
172 #[cfg(feature = "table")]
173 Self::Table(table) => Self::Table(table.clone()),
174 #[cfg(feature = "tensor")]
175 Self::Tensor(tensor) => Self::Tensor(tensor.clone()),
176 }
177 }
178}
179
180#[cfg(feature = "btree")]
181as_type!(Collection<Txn, FE>, BTree, BTree<Txn, FE>);
182#[cfg(feature = "table")]
183as_type!(Collection<Txn, FE>, Table, Table<Txn, FE>);
184#[cfg(feature = "tensor")]
185as_type!(Collection<Txn, FE>, Tensor, Tensor<Txn, FE>);
186
187impl<Txn, FE> Collection<Txn, FE>
188where
189 Txn: Transaction<FE>,
190 FE: CollectionBlock,
191{
192 pub fn schema(&self) -> Schema {
194 match self {
195 Self::Null(_, _) => Schema::Null,
196 #[cfg(feature = "btree")]
197 Self::BTree(btree) => btree.schema().clone().into(),
198 #[cfg(feature = "table")]
199 Self::Table(table) => table.schema().clone().into(),
200 #[cfg(feature = "tensor")]
201 Self::Tensor(tensor) => match tensor {
202 Tensor::Dense(dense) => Schema::Dense(dense.schema()),
203 Tensor::Sparse(sparse) => Schema::Sparse(sparse.schema()),
204 },
205 }
206 }
207}
208
209impl<Txn, FE> Instance for Collection<Txn, FE>
210where
211 Txn: Send + Sync,
212 FE: Send + Sync,
213{
214 type Class = CollectionType;
215
216 fn class(&self) -> CollectionType {
217 match self {
218 Self::Null(_, _) => CollectionType::Null,
219 #[cfg(feature = "btree")]
220 Self::BTree(btree) => btree.class().into(),
221 #[cfg(feature = "table")]
222 Self::Table(table) => table.class().into(),
223 #[cfg(feature = "tensor")]
224 Self::Tensor(tensor) => tensor.class().into(),
225 }
226 }
227}
228
229#[async_trait]
230impl<Txn, FE> AsyncHash for Collection<Txn, FE>
231where
232 Txn: Transaction<FE>,
233 FE: CollectionBlock + Clone,
234{
235 #[allow(unused_variables)]
236 async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
237 let schema_hash = Hash::<Sha256>::hash(self.schema());
238
239 let contents_hash = match self {
240 Self::Null(_, _) => tc_transact::hash::default_hash::<Sha256>(),
241 #[cfg(feature = "btree")]
242 Self::BTree(btree) => {
243 let keys = btree.clone().keys(txn_id).await?;
244 hash_try_stream::<Sha256, _, _, _>(keys).await?
245 }
246 #[cfg(feature = "table")]
247 Self::Table(table) => {
248 let rows = table.clone().rows(txn_id).await?;
249 hash_try_stream::<Sha256, _, _, _>(rows).await?
250 }
251 #[cfg(feature = "tensor")]
252 Self::Tensor(tensor) => match tensor {
253 Tensor::Dense(dense) => {
254 let elements = DenseView::from(dense.clone()).into_elements(txn_id).await?;
255 hash_try_stream::<Sha256, _, _, _>(elements).await?
256 }
257 Tensor::Sparse(sparse) => {
258 let elements = SparseView::from(sparse.clone())
259 .into_elements(txn_id)
260 .await?;
261
262 hash_try_stream::<Sha256, _, _, _>(elements).await?
263 }
264 },
265 };
266
267 let mut hasher = Sha256::new();
268 hasher.update(schema_hash);
269 hasher.update(contents_hash);
270 Ok(hasher.finalize())
271 }
272}
273
274impl<Txn, FE> From<CollectionBase<Txn, FE>> for Collection<Txn, FE> {
275 fn from(base: CollectionBase<Txn, FE>) -> Self {
276 match base {
277 CollectionBase::Null(dir, data) => Self::Null(dir, data),
278 #[cfg(feature = "btree")]
279 CollectionBase::BTree(btree) => Self::BTree(btree.into()),
280 #[cfg(feature = "table")]
281 CollectionBase::Table(table) => Self::Table(table.into()),
282 #[cfg(feature = "tensor")]
283 CollectionBase::Tensor(tensor) => Self::Tensor(tensor.into()),
284 }
285 }
286}
287
288#[cfg(feature = "btree")]
289impl<Txn, FE> From<BTreeFile<Txn, FE>> for Collection<Txn, FE> {
290 fn from(btree: BTreeFile<Txn, FE>) -> Self {
291 Self::BTree(btree.into())
292 }
293}
294
295#[async_trait]
296impl<'en, Txn, FE> IntoView<'en, FE> for Collection<Txn, FE>
297where
298 Txn: Transaction<FE>,
299 FE: CollectionBlock,
300{
301 type Txn = Txn;
302 type View = CollectionView<'en>;
303
304 #[allow(unused_variables)]
305 async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View> {
306 match self {
307 Self::Null(_dir, data) => Ok(CollectionView::Null(PhantomData)),
308 #[cfg(feature = "btree")]
309 Self::BTree(btree) => btree.into_view(txn).map_ok(CollectionView::BTree).await,
310 #[cfg(feature = "table")]
311 Self::Table(table) => table.into_view(txn).map_ok(CollectionView::Table).await,
312 #[cfg(feature = "tensor")]
313 Self::Tensor(tensor) => tensor.into_view(txn).map_ok(CollectionView::Tensor).await,
314 }
315 }
316}
317
318#[async_trait]
319impl<T, FE> de::FromStream for Collection<T, FE>
320where
321 T: Transaction<FE>,
322 FE: CollectionBlock,
323{
324 type Context = T;
325
326 async fn from_stream<D: de::Decoder>(
327 txn: Self::Context,
328 decoder: &mut D,
329 ) -> Result<Self, D::Error> {
330 decoder
331 .decode_map(CollectionVisitor::new(txn))
332 .map_ok(Self::from)
333 .await
334 }
335}
336
337#[cfg(feature = "btree")]
338impl<Txn, FE> TryCastFrom<Collection<Txn, FE>> for BTree<Txn, FE> {
339 fn can_cast_from(collection: &Collection<Txn, FE>) -> bool {
340 match collection {
341 Collection::BTree(_) => true,
342 _ => false,
343 }
344 }
345
346 fn opt_cast_from(collection: Collection<Txn, FE>) -> Option<Self> {
347 match collection {
348 Collection::BTree(btree) => Some(btree),
349 _ => None,
350 }
351 }
352}
353
354#[cfg(feature = "table")]
355impl<Txn, FE> TryCastFrom<Collection<Txn, FE>> for Table<Txn, FE> {
356 fn can_cast_from(collection: &Collection<Txn, FE>) -> bool {
357 match collection {
358 Collection::Table(_) => true,
359 _ => false,
360 }
361 }
362
363 fn opt_cast_from(collection: Collection<Txn, FE>) -> Option<Self> {
364 match collection {
365 Collection::Table(table) => Some(table),
366 _ => None,
367 }
368 }
369}
370
371#[cfg(feature = "tensor")]
372impl<Txn, FE> TryCastFrom<Collection<Txn, FE>> for Tensor<Txn, FE> {
373 fn can_cast_from(collection: &Collection<Txn, FE>) -> bool {
374 match collection {
375 Collection::Tensor(_) => true,
376 _ => false,
377 }
378 }
379
380 fn opt_cast_from(collection: Collection<Txn, FE>) -> Option<Self> {
381 match collection {
382 Collection::Tensor(tensor) => Some(tensor),
383 _ => None,
384 }
385 }
386}
387
388impl<Txn, FE> fmt::Debug for Collection<Txn, FE> {
389 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
390 match self {
391 Self::Null(_, _) => f.write_str("null collection"),
392 #[cfg(feature = "btree")]
393 Self::BTree(btree) => btree.fmt(f),
394 #[cfg(feature = "table")]
395 Self::Table(table) => table.fmt(f),
396 #[cfg(feature = "tensor")]
397 Self::Tensor(tensor) => tensor.fmt(f),
398 }
399 }
400}
401
402pub enum CollectionView<'en> {
404 Null(PhantomData<&'en ()>),
405 #[cfg(feature = "btree")]
406 BTree(btree::BTreeView<'en>),
407 #[cfg(feature = "table")]
408 Table(table::TableView<'en>),
409 #[cfg(feature = "tensor")]
410 Tensor(tensor::view::TensorView),
411}
412
413impl<'en> en::IntoStream<'en> for CollectionView<'en> {
414 fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
415 use en::EncodeMap;
416
417 let mut map = encoder.encode_map(Some(1))?;
418
419 match self {
420 Self::Null(_) => map.encode_entry(CollectionType::Null.path(), ())?,
421 #[cfg(feature = "btree")]
422 Self::BTree(btree) => {
423 let classpath = BTreeType::default().path();
424 map.encode_entry(classpath.to_string(), btree)?;
425 }
426 #[cfg(feature = "table")]
427 Self::Table(table) => {
428 let classpath = TableType::default().path();
429 map.encode_entry(classpath.to_string(), table)?;
430 }
431 #[cfg(feature = "tensor")]
432 Self::Tensor(tensor) => {
433 let classpath = match tensor {
434 tensor::view::TensorView::Dense(_) => TensorType::Dense,
435 tensor::view::TensorView::Sparse(_) => TensorType::Sparse,
436 }
437 .path();
438
439 map.encode_entry(classpath.to_string(), tensor)?;
440 }
441 }
442
443 map.end()
444 }
445}
446
447#[cfg(feature = "btree")]
448async fn finalize_dir<FE: Send + Sync>(dir: &freqfs::DirLock<FE>, txn_id: &TxnId) {
449 let dir = dir.read().await;
450
451 let versions = dir
452 .get_dir(tc_transact::fs::VERSIONS)
453 .expect("transactional versions directory");
454
455 let mut versions = versions.write().await;
456 versions.delete(txn_id).await;
457}