1use std::fmt;
2use std::marker::PhantomData;
3
4use async_trait::async_trait;
5use destream::de;
6use futures::TryFutureExt;
7use log::{debug, info};
8use safecast::TryCastFrom;
9
10use tc_error::*;
11use tc_transact::fs::{CopyFrom, Dir, Persist, Restore};
12use tc_transact::hash::{AsyncHash, Output, Sha256};
13use tc_transact::{fs, IntoView, Transact, Transaction, TxnId};
14use tcgeneric::{Instance, NativeClass, TCPathBuf, ThreadSafe};
15
16#[cfg(feature = "btree")]
17use crate::btree::{BTree, BTreeFile, BTreeInstance};
18#[cfg(feature = "table")]
19use crate::table::{Table, TableFile, TableInstance};
20#[cfg(feature = "tensor")]
21use crate::tensor::{
22 Dense, DenseBase, Sparse, SparseBase, Tensor, TensorBase, TensorInstance, TensorType,
23};
24use crate::{Collection, CollectionBlock, CollectionType, CollectionView, Schema};
25
26pub enum CollectionBase<Txn, FE> {
28 Null(Dir<FE>, PhantomData<Txn>),
29 #[cfg(feature = "btree")]
30 BTree(BTreeFile<Txn, FE>),
31 #[cfg(feature = "table")]
32 Table(TableFile<Txn, FE>),
33 #[cfg(feature = "tensor")]
34 Tensor(TensorBase<Txn, FE>),
35}
36
37impl<Txn, FE> Clone for CollectionBase<Txn, FE> {
38 fn clone(&self) -> Self {
39 match self {
40 Self::Null(dir, txn) => Self::Null(dir.clone(), *txn),
41 #[cfg(feature = "btree")]
42 Self::BTree(btree) => Self::BTree(btree.clone()),
43 #[cfg(feature = "table")]
44 Self::Table(table) => Self::Table(table.clone()),
45 #[cfg(feature = "tensor")]
46 Self::Tensor(tensor) => Self::Tensor(tensor.clone()),
47 }
48 }
49}
50
51impl<Txn, FE> CollectionBase<Txn, FE>
52where
53 Txn: Transaction<FE>,
54 FE: CollectionBlock,
55{
56 pub fn schema(&self) -> Schema {
58 match self {
59 Self::Null(_, _) => Schema::Null,
60 #[cfg(feature = "btree")]
61 Self::BTree(btree) => btree.schema().clone().into(),
62 #[cfg(feature = "table")]
63 Self::Table(table) => table.schema().clone().into(),
64 #[cfg(feature = "tensor")]
65 Self::Tensor(tensor) => match tensor {
66 TensorBase::Dense(dense) => Schema::Dense(dense.schema()),
67 TensorBase::Sparse(sparse) => Schema::Sparse(sparse.schema()),
68 },
69 }
70 }
71}
72
73impl<Txn, FE> Instance for CollectionBase<Txn, FE>
74where
75 Txn: Transaction<FE>,
76 FE: ThreadSafe,
77{
78 type Class = CollectionType;
79
80 fn class(&self) -> CollectionType {
81 match self {
82 Self::Null(_, _) => CollectionType::Null,
83 #[cfg(feature = "btree")]
84 Self::BTree(btree) => btree.class().into(),
85 #[cfg(feature = "table")]
86 Self::Table(table) => table.class().into(),
87 #[cfg(feature = "tensor")]
88 Self::Tensor(tensor) => tensor.class().into(),
89 }
90 }
91}
92
93#[async_trait]
94impl<Txn, FE> Transact for CollectionBase<Txn, FE>
95where
96 Txn: Transaction<FE>,
97 FE: CollectionBlock,
98{
99 type Commit = ();
100
101 #[allow(unused_variables)]
102 async fn commit(&self, txn_id: TxnId) -> Self::Commit {
103 match self {
104 Self::Null(_, _) => (),
105 #[cfg(feature = "btree")]
106 Self::BTree(btree) => btree.commit(txn_id).await,
107 #[cfg(feature = "table")]
108 Self::Table(table) => table.commit(txn_id).await,
109 #[cfg(feature = "tensor")]
110 Self::Tensor(tensor) => tensor.commit(txn_id).await,
111 }
112 }
113
114 #[allow(unused_variables)]
115 async fn rollback(&self, txn_id: &TxnId) {
116 match self {
117 Self::Null(_, _) => (),
118 #[cfg(feature = "btree")]
119 Self::BTree(btree) => btree.rollback(txn_id).await,
120 #[cfg(feature = "table")]
121 Self::Table(table) => table.rollback(txn_id).await,
122 #[cfg(feature = "tensor")]
123 Self::Tensor(tensor) => tensor.rollback(txn_id).await,
124 }
125 }
126
127 #[allow(unused_variables)]
128 async fn finalize(&self, txn_id: &TxnId) {
129 match self {
130 Self::Null(_, _) => (),
131 #[cfg(feature = "btree")]
132 Self::BTree(btree) => btree.finalize(txn_id).await,
133 #[cfg(feature = "table")]
134 Self::Table(table) => table.finalize(txn_id).await,
135 #[cfg(feature = "tensor")]
136 Self::Tensor(tensor) => tensor.finalize(txn_id).await,
137 }
138 }
139}
140
141#[async_trait]
142impl<T, FE> AsyncHash for CollectionBase<T, FE>
143where
144 T: Transaction<FE>,
145 FE: CollectionBlock,
146{
147 async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
148 Collection::from(self.clone()).hash(txn_id).await
149 }
150}
151
152#[async_trait]
153impl<Txn, FE> Persist<FE> for CollectionBase<Txn, FE>
154where
155 Txn: Transaction<FE>,
156 FE: CollectionBlock,
157{
158 type Txn = Txn;
159 type Schema = Schema;
160
161 #[allow(unused_variables)]
162 async fn create(txn_id: TxnId, schema: Schema, store: Dir<FE>) -> TCResult<Self> {
163 debug!("create persistent mutable collection at {store:?}");
164
165 match schema {
166 Schema::Null => Ok(Self::Null(store, PhantomData)),
167 #[cfg(feature = "btree")]
168 Schema::BTree(schema) => {
169 BTreeFile::create(txn_id, schema, store)
170 .map_ok(Self::BTree)
171 .await
172 }
173 #[cfg(feature = "table")]
174 Schema::Table(schema) => {
175 TableFile::create(txn_id, schema, store)
176 .map_ok(Self::Table)
177 .await
178 }
179 #[cfg(feature = "tensor")]
180 Schema::Dense(schema) => {
181 DenseBase::create(txn_id, schema, store)
182 .map_ok(TensorBase::Dense)
183 .map_ok(Self::Tensor)
184 .await
185 }
186 #[cfg(feature = "tensor")]
187 Schema::Sparse(schema) => {
188 SparseBase::create(txn_id, schema.into(), store)
189 .map_ok(TensorBase::Sparse)
190 .map_ok(Self::Tensor)
191 .await
192 }
193 }
194 }
195
196 #[allow(unused_variables)]
197 async fn load(txn_id: TxnId, schema: Schema, store: Dir<FE>) -> TCResult<Self> {
198 info!("load persistent mutable collection at {store:?}");
199
200 match schema {
201 Schema::Null => Ok(Self::Null(store, PhantomData)),
202 #[cfg(feature = "btree")]
203 Schema::BTree(schema) => {
204 BTreeFile::load(txn_id, schema, store)
205 .map_ok(Self::BTree)
206 .await
207 }
208 #[cfg(feature = "table")]
209 Schema::Table(schema) => {
210 TableFile::load(txn_id, schema, store)
211 .map_ok(Self::Table)
212 .await
213 }
214 #[cfg(feature = "tensor")]
215 Schema::Dense(schema) => {
216 DenseBase::load(txn_id, schema, store)
217 .map_ok(TensorBase::Dense)
218 .map_ok(Self::Tensor)
219 .await
220 }
221 #[cfg(feature = "tensor")]
222 Schema::Sparse(schema) => {
223 SparseBase::load(txn_id, schema.into(), store)
224 .map_ok(TensorBase::Sparse)
225 .map_ok(Self::Tensor)
226 .await
227 }
228 }
229 }
230
231 fn dir(&self) -> tc_transact::fs::Inner<FE> {
232 match self {
233 Self::Null(store, _) => store.clone().into_inner(),
234 #[cfg(feature = "btree")]
235 Self::BTree(btree) => btree.dir(),
236 #[cfg(feature = "table")]
237 Self::Table(table) => table.dir(),
238 #[cfg(feature = "tensor")]
239 Self::Tensor(tensor) => tensor.dir(),
240 }
241 }
242}
243
244#[async_trait]
245impl<Txn, FE> CopyFrom<FE, Collection<Txn, FE>> for CollectionBase<Txn, FE>
246where
247 Txn: Transaction<FE>,
248 FE: CollectionBlock,
249{
250 #[allow(unused_variables)]
251 async fn copy_from(txn: &Txn, store: Dir<FE>, instance: Collection<Txn, FE>) -> TCResult<Self> {
252 match instance {
253 Collection::Null(_, _) => Ok(Self::Null(store, PhantomData)),
254 #[cfg(feature = "btree")]
255 Collection::BTree(instance) => {
256 BTreeFile::copy_from(txn, store, instance)
257 .map_ok(Self::BTree)
258 .await
259 }
260 #[cfg(feature = "table")]
261 Collection::Table(instance) => {
262 TableFile::copy_from(txn, store, instance)
263 .map_ok(Self::Table)
264 .await
265 }
266 #[cfg(feature = "tensor")]
267 Collection::Tensor(instance) => {
268 TensorBase::copy_from(txn, store, instance.into())
269 .map_ok(Self::Tensor)
270 .await
271 }
272 }
273 }
274}
275
276#[async_trait]
277impl<Txn, FE> Restore<FE> for CollectionBase<Txn, FE>
278where
279 Txn: Transaction<FE>,
280 FE: CollectionBlock,
281{
282 #[allow(unused_variables)]
283 async fn restore(&self, txn_id: TxnId, backup: &Self) -> TCResult<()> {
284 match (self, backup) {
285 (Self::Null(_, _), Self::Null(_, _)) => Ok(()),
286 #[cfg(feature = "btree")]
287 (Self::BTree(this), Self::BTree(backup)) => this.restore(txn_id, backup).await,
288 #[cfg(feature = "table")]
289 (Self::Table(this), Self::Table(backup)) => this.restore(txn_id, backup).await,
290 #[cfg(feature = "tensor")]
291 (Self::Tensor(this), Self::Tensor(backup)) => this.restore(txn_id, backup).await,
292 #[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
293 (this, that) => Err(bad_request!("cannot restore {:?} from {:?}", this, that)),
294 }
295 }
296}
297
298impl<T, FE> TryCastFrom<Collection<T, FE>> for CollectionBase<T, FE> {
299 fn can_cast_from(collection: &Collection<T, FE>) -> bool {
300 match collection {
301 Collection::Null(_, _) => true,
302 #[cfg(feature = "btree")]
303 Collection::BTree(BTree::File(_)) => true,
304 #[cfg(feature = "table")]
305 Collection::Table(Table::Table(_)) => true,
306 #[cfg(feature = "tensor")]
307 Collection::Tensor(Tensor::Dense(Dense::Base(_))) => true,
308 #[cfg(feature = "tensor")]
309 Collection::Tensor(Tensor::Sparse(Sparse::Base(_))) => true,
310 #[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
311 _ => false,
312 }
313 }
314
315 fn opt_cast_from(collection: Collection<T, FE>) -> Option<Self> {
316 match collection {
317 Collection::Null(dir, data) => Some(Self::Null(dir, data)),
318 #[cfg(feature = "btree")]
319 Collection::BTree(BTree::File(btree)) => Some(Self::BTree(btree)),
320 #[cfg(feature = "table")]
321 Collection::Table(Table::Table(table)) => Some(Self::Table(table)),
322 #[cfg(feature = "tensor")]
323 Collection::Tensor(Tensor::Dense(Dense::Base(dense))) => {
324 Some(Self::Tensor(TensorBase::Dense(dense)))
325 }
326 #[cfg(feature = "tensor")]
327 Collection::Tensor(Tensor::Sparse(Sparse::Base(sparse))) => {
328 Some(Self::Tensor(TensorBase::Sparse(sparse)))
329 }
330 #[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
331 _ => None,
332 }
333 }
334}
335
336#[async_trait]
337impl<'en, T, FE> IntoView<'en, FE> for CollectionBase<T, FE>
338where
339 T: Transaction<FE>,
340 FE: CollectionBlock,
341 Self: 'en,
342{
343 type Txn = T;
344 type View = CollectionView<'en>;
345
346 async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View> {
347 Collection::from(self).into_view(txn).await
348 }
349}
350
351impl<T, FE> fmt::Debug for CollectionBase<T, FE> {
352 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353 f.write_str("a Collection")
354 }
355}
356
357pub struct CollectionVisitor<Txn, FE> {
359 txn: Txn,
360 phantom: PhantomData<FE>,
361}
362
363impl<Txn, FE> CollectionVisitor<Txn, FE>
364where
365 Txn: Transaction<FE>,
366 FE: CollectionBlock,
367{
368 pub fn new(txn: Txn) -> Self {
369 Self {
370 txn,
371 phantom: PhantomData,
372 }
373 }
374
375 pub async fn visit_map_value<A: de::MapAccess>(
376 self,
377 class: CollectionType,
378 access: &mut A,
379 ) -> Result<CollectionBase<Txn, FE>, A::Error> {
380 debug!("CollectionVisitor::visit_map_value with class {:?}", class);
381
382 match class {
383 CollectionType::Null => {
384 let _: () = access.next_value(()).await?;
385 let dir = self
386 .txn
387 .context()
388 .and_then(|dir| fs::Dir::load(*self.txn.id(), dir))
389 .map_err(de::Error::custom)
390 .await?;
391
392 Ok(CollectionBase::Null(dir, PhantomData))
393 }
394
395 #[cfg(feature = "btree")]
396 CollectionType::BTree(_) => {
397 access
398 .next_value(self.txn)
399 .map_ok(CollectionBase::BTree)
400 .await
401 }
402
403 #[cfg(feature = "table")]
404 CollectionType::Table(_) => {
405 access
406 .next_value(self.txn)
407 .map_ok(CollectionBase::Table)
408 .await
409 }
410
411 #[cfg(feature = "tensor")]
412 CollectionType::Tensor(tt) => match tt {
413 TensorType::Dense => {
414 access
415 .next_value(self.txn)
416 .map_ok(TensorBase::Dense)
417 .map_ok(CollectionBase::Tensor)
418 .await
419 }
420 TensorType::Sparse => {
421 access
422 .next_value(self.txn)
423 .map_ok(TensorBase::Sparse)
424 .map_ok(CollectionBase::Tensor)
425 .await
426 }
427 },
428 }
429 }
430}
431
432#[async_trait]
433impl<T, FE> de::Visitor for CollectionVisitor<T, FE>
434where
435 T: Transaction<FE>,
436 FE: CollectionBlock,
437{
438 type Value = CollectionBase<T, FE>;
439
440 fn expecting() -> &'static str {
441 "a Collection"
442 }
443
444 async fn visit_map<A: de::MapAccess>(self, mut map: A) -> Result<Self::Value, A::Error> {
445 let classpath: TCPathBuf = map
446 .next_key(())
447 .await?
448 .ok_or_else(|| de::Error::custom("expected a Collection type"))?;
449
450 let class = CollectionType::from_path(&classpath)
451 .ok_or_else(|| de::Error::invalid_value(classpath, "a Collection type"))?;
452
453 self.visit_map_value(class, &mut map).await
454 }
455}
456
457#[async_trait]
458impl<T, FE> de::FromStream for CollectionBase<T, FE>
459where
460 T: Transaction<FE>,
461 FE: CollectionBlock,
462{
463 type Context = T;
464
465 async fn from_stream<D: de::Decoder>(
466 txn: Self::Context,
467 decoder: &mut D,
468 ) -> Result<Self, D::Error> {
469 decoder.decode_map(CollectionVisitor::new(txn)).await
470 }
471}