1use std::fmt::Debug;
50use std::sync::Arc;
51
52use arrow_array::RecordBatch;
53use arrow_schema::SchemaRef;
54use async_trait::async_trait;
55use lance::Dataset;
56use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
57use lance::dataset::{WhenMatched, WhenNotMatched};
58
59use crate::db::{Snapshot, SubTableEntry};
60use crate::error::Result;
61use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
62
63pub(crate) mod sealed {
66 pub trait Sealed {}
71
72 impl Sealed for crate::table_store::TableStore {}
73}
74
75#[derive(Debug, Clone)]
85pub struct SnapshotHandle {
86 pub(crate) inner: Arc<Dataset>,
87}
88
89impl SnapshotHandle {
90 pub(crate) fn new(ds: Dataset) -> Self {
93 Self { inner: Arc::new(ds) }
94 }
95
96 pub(crate) fn dataset(&self) -> &Dataset {
99 &self.inner
100 }
101
102 pub(crate) fn into_arc(self) -> Arc<Dataset> {
105 self.inner
106 }
107
108 pub fn version(&self) -> u64 {
112 self.inner.version().version
113 }
114
115 pub fn uses_stable_row_ids(&self) -> bool {
117 self.inner.manifest.uses_stable_row_ids()
118 }
119}
120
121#[derive(Debug, Clone)]
128pub struct StagedHandle {
129 pub(crate) inner: StagedWrite,
130}
131
132impl StagedHandle {
133 pub(crate) fn new(staged: StagedWrite) -> Self {
134 Self { inner: staged }
135 }
136
137 pub(crate) fn into_staged(self) -> StagedWrite {
140 self.inner
141 }
142}
143
144pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
151 handles.iter().map(|h| h.inner.clone()).collect()
152}
153
154#[async_trait]
163pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
164 async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
167
168 async fn open_snapshot_at_table(
169 &self,
170 snapshot: &Snapshot,
171 table_key: &str,
172 ) -> Result<SnapshotHandle>;
173
174 async fn open_dataset_head(
175 &self,
176 dataset_uri: &str,
177 branch: Option<&str>,
178 ) -> Result<SnapshotHandle>;
179
180 async fn open_dataset_head_for_write(
181 &self,
182 table_key: &str,
183 dataset_uri: &str,
184 branch: Option<&str>,
185 ) -> Result<SnapshotHandle>;
186
187 async fn open_dataset_at_state(
188 &self,
189 table_path: &str,
190 branch: Option<&str>,
191 version: u64,
192 ) -> Result<SnapshotHandle>;
193
194 async fn fork_branch_from_state(
195 &self,
196 dataset_uri: &str,
197 source_branch: Option<&str>,
198 table_key: &str,
199 source_version: u64,
200 target_branch: &str,
201 ) -> Result<SnapshotHandle>;
202
203 async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
204
205 async fn reopen_for_mutation(
206 &self,
207 dataset_uri: &str,
208 branch: Option<&str>,
209 table_key: &str,
210 expected_version: u64,
211 ) -> Result<SnapshotHandle>;
212
213 fn ensure_expected_version(
214 &self,
215 snapshot: &SnapshotHandle,
216 table_key: &str,
217 expected_version: u64,
218 ) -> Result<()>;
219
220 async fn scan(
223 &self,
224 snapshot: &SnapshotHandle,
225 projection: Option<&[&str]>,
226 filter: Option<&str>,
227 order_by: Option<Vec<ColumnOrdering>>,
228 ) -> Result<Vec<RecordBatch>>;
229
230 async fn scan_with_row_id(
231 &self,
232 snapshot: &SnapshotHandle,
233 projection: Option<&[&str]>,
234 filter: Option<&str>,
235 order_by: Option<Vec<ColumnOrdering>>,
236 with_row_id: bool,
237 ) -> Result<Vec<RecordBatch>>;
238
239 async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
240
241 async fn scan_batches_for_rewrite(
242 &self,
243 snapshot: &SnapshotHandle,
244 ) -> Result<Vec<RecordBatch>>;
245
246 async fn count_rows(
247 &self,
248 snapshot: &SnapshotHandle,
249 filter: Option<String>,
250 ) -> Result<usize>;
251
252 async fn count_rows_with_staged(
253 &self,
254 snapshot: &SnapshotHandle,
255 staged: &[StagedHandle],
256 filter: Option<String>,
257 ) -> Result<usize>;
258
259 async fn scan_with_staged(
260 &self,
261 snapshot: &SnapshotHandle,
262 staged: &[StagedHandle],
263 projection: Option<&[&str]>,
264 filter: Option<&str>,
265 ) -> Result<Vec<RecordBatch>>;
266
267 async fn scan_with_pending(
268 &self,
269 snapshot: &SnapshotHandle,
270 pending: &[RecordBatch],
271 pending_schema: Option<SchemaRef>,
272 projection: Option<&[&str]>,
273 filter: Option<&str>,
274 key_column: Option<&str>,
275 ) -> Result<Vec<RecordBatch>>;
276
277 async fn first_row_id_for_filter(
278 &self,
279 snapshot: &SnapshotHandle,
280 filter: &str,
281 ) -> Result<Option<u64>>;
282
283 async fn table_state(
284 &self,
285 dataset_uri: &str,
286 snapshot: &SnapshotHandle,
287 ) -> Result<TableState>;
288
289 async fn stage_append(
292 &self,
293 snapshot: &SnapshotHandle,
294 batch: RecordBatch,
295 prior_stages: &[StagedHandle],
296 ) -> Result<StagedHandle>;
297
298 async fn stage_merge_insert(
299 &self,
300 snapshot: SnapshotHandle,
301 batch: RecordBatch,
302 key_columns: Vec<String>,
303 when_matched: WhenMatched,
304 when_not_matched: WhenNotMatched,
305 ) -> Result<StagedHandle>;
306
307 async fn commit_staged(
308 &self,
309 snapshot: SnapshotHandle,
310 staged: StagedHandle,
311 ) -> Result<SnapshotHandle>;
312
313 async fn stage_overwrite(
315 &self,
316 snapshot: &SnapshotHandle,
317 batch: RecordBatch,
318 ) -> Result<StagedHandle>;
319
320 async fn stage_create_btree_index(
322 &self,
323 snapshot: &SnapshotHandle,
324 columns: &[&str],
325 ) -> Result<StagedHandle>;
326
327 async fn stage_create_inverted_index(
329 &self,
330 snapshot: &SnapshotHandle,
331 column: &str,
332 ) -> Result<StagedHandle>;
333
334 async fn append_batch(
350 &self,
351 dataset_uri: &str,
352 snapshot: SnapshotHandle,
353 batch: RecordBatch,
354 ) -> Result<(SnapshotHandle, TableState)>;
355
356 async fn merge_insert_batches(
357 &self,
358 dataset_uri: &str,
359 snapshot: SnapshotHandle,
360 batches: Vec<RecordBatch>,
361 key_columns: Vec<String>,
362 when_matched: WhenMatched,
363 when_not_matched: WhenNotMatched,
364 ) -> Result<TableState>;
365
366 async fn overwrite_batch(
367 &self,
368 dataset_uri: &str,
369 snapshot: SnapshotHandle,
370 batch: RecordBatch,
371 ) -> Result<(SnapshotHandle, TableState)>;
372
373 async fn delete_where(
374 &self,
375 dataset_uri: &str,
376 snapshot: SnapshotHandle,
377 filter: &str,
378 ) -> Result<(SnapshotHandle, DeleteState)>;
379
380 async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
381 async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
382 async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
383
384 async fn create_btree_index(
385 &self,
386 snapshot: SnapshotHandle,
387 columns: &[&str],
388 ) -> Result<SnapshotHandle>;
389
390 async fn create_inverted_index(
391 &self,
392 snapshot: SnapshotHandle,
393 column: &str,
394 ) -> Result<SnapshotHandle>;
395
396 async fn create_vector_index(
397 &self,
398 snapshot: SnapshotHandle,
399 column: &str,
400 ) -> Result<SnapshotHandle>;
401
402 fn root_uri(&self) -> &str;
409 fn dataset_uri(&self, table_path: &str) -> String;
410
411 async fn scan_stream(
419 &self,
420 snapshot: &SnapshotHandle,
421 projection: Option<&[&str]>,
422 filter: Option<&str>,
423 order_by: Option<Vec<ColumnOrdering>>,
424 with_row_id: bool,
425 ) -> Result<DatasetRecordBatchStream>;
426}
427
428#[async_trait]
431impl TableStorage for TableStore {
432 async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
433 self.open_at_entry(entry).await.map(SnapshotHandle::new)
434 }
435
436 async fn open_snapshot_at_table(
437 &self,
438 snapshot: &Snapshot,
439 table_key: &str,
440 ) -> Result<SnapshotHandle> {
441 self.open_snapshot_table(snapshot, table_key)
442 .await
443 .map(SnapshotHandle::new)
444 }
445
446 async fn open_dataset_head(
447 &self,
448 dataset_uri: &str,
449 branch: Option<&str>,
450 ) -> Result<SnapshotHandle> {
451 TableStore::open_dataset_head(self, dataset_uri, branch)
452 .await
453 .map(SnapshotHandle::new)
454 }
455
456 async fn open_dataset_head_for_write(
457 &self,
458 table_key: &str,
459 dataset_uri: &str,
460 branch: Option<&str>,
461 ) -> Result<SnapshotHandle> {
462 TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
463 .await
464 .map(SnapshotHandle::new)
465 }
466
467 async fn open_dataset_at_state(
468 &self,
469 table_path: &str,
470 branch: Option<&str>,
471 version: u64,
472 ) -> Result<SnapshotHandle> {
473 TableStore::open_dataset_at_state(self, table_path, branch, version)
474 .await
475 .map(SnapshotHandle::new)
476 }
477
478 async fn fork_branch_from_state(
479 &self,
480 dataset_uri: &str,
481 source_branch: Option<&str>,
482 table_key: &str,
483 source_version: u64,
484 target_branch: &str,
485 ) -> Result<SnapshotHandle> {
486 TableStore::fork_branch_from_state(
487 self,
488 dataset_uri,
489 source_branch,
490 table_key,
491 source_version,
492 target_branch,
493 )
494 .await
495 .map(SnapshotHandle::new)
496 }
497
498 async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
499 TableStore::delete_branch(self, dataset_uri, branch).await
500 }
501
502 async fn reopen_for_mutation(
503 &self,
504 dataset_uri: &str,
505 branch: Option<&str>,
506 table_key: &str,
507 expected_version: u64,
508 ) -> Result<SnapshotHandle> {
509 TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
510 .await
511 .map(SnapshotHandle::new)
512 }
513
514 fn ensure_expected_version(
515 &self,
516 snapshot: &SnapshotHandle,
517 table_key: &str,
518 expected_version: u64,
519 ) -> Result<()> {
520 TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
521 }
522
523 async fn scan(
524 &self,
525 snapshot: &SnapshotHandle,
526 projection: Option<&[&str]>,
527 filter: Option<&str>,
528 order_by: Option<Vec<ColumnOrdering>>,
529 ) -> Result<Vec<RecordBatch>> {
530 TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
531 }
532
533 async fn scan_with_row_id(
534 &self,
535 snapshot: &SnapshotHandle,
536 projection: Option<&[&str]>,
537 filter: Option<&str>,
538 order_by: Option<Vec<ColumnOrdering>>,
539 with_row_id: bool,
540 ) -> Result<Vec<RecordBatch>> {
541 TableStore::scan_with(
542 self,
543 snapshot.dataset(),
544 projection,
545 filter,
546 order_by,
547 with_row_id,
548 |_| Ok(()),
549 )
550 .await
551 }
552
553 async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
554 TableStore::scan_batches(self, snapshot.dataset()).await
555 }
556
557 async fn scan_batches_for_rewrite(
558 &self,
559 snapshot: &SnapshotHandle,
560 ) -> Result<Vec<RecordBatch>> {
561 TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
562 }
563
564 async fn count_rows(
565 &self,
566 snapshot: &SnapshotHandle,
567 filter: Option<String>,
568 ) -> Result<usize> {
569 TableStore::count_rows(self, snapshot.dataset(), filter).await
570 }
571
572 async fn count_rows_with_staged(
573 &self,
574 snapshot: &SnapshotHandle,
575 staged: &[StagedHandle],
576 filter: Option<String>,
577 ) -> Result<usize> {
578 let staged_writes = staged_handles_as_writes(staged);
579 TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
580 }
581
582 async fn scan_with_staged(
583 &self,
584 snapshot: &SnapshotHandle,
585 staged: &[StagedHandle],
586 projection: Option<&[&str]>,
587 filter: Option<&str>,
588 ) -> Result<Vec<RecordBatch>> {
589 let staged_writes = staged_handles_as_writes(staged);
590 TableStore::scan_with_staged(
591 self,
592 snapshot.dataset(),
593 &staged_writes,
594 projection,
595 filter,
596 )
597 .await
598 }
599
600 async fn scan_with_pending(
601 &self,
602 snapshot: &SnapshotHandle,
603 pending: &[RecordBatch],
604 pending_schema: Option<SchemaRef>,
605 projection: Option<&[&str]>,
606 filter: Option<&str>,
607 key_column: Option<&str>,
608 ) -> Result<Vec<RecordBatch>> {
609 TableStore::scan_with_pending(
610 self,
611 snapshot.dataset(),
612 pending,
613 pending_schema,
614 projection,
615 filter,
616 key_column,
617 )
618 .await
619 }
620
621 async fn first_row_id_for_filter(
622 &self,
623 snapshot: &SnapshotHandle,
624 filter: &str,
625 ) -> Result<Option<u64>> {
626 TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
627 }
628
629 async fn table_state(
630 &self,
631 dataset_uri: &str,
632 snapshot: &SnapshotHandle,
633 ) -> Result<TableState> {
634 TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
635 }
636
637 async fn stage_append(
638 &self,
639 snapshot: &SnapshotHandle,
640 batch: RecordBatch,
641 prior_stages: &[StagedHandle],
642 ) -> Result<StagedHandle> {
643 let staged_writes = staged_handles_as_writes(prior_stages);
644 TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
645 .await
646 .map(StagedHandle::new)
647 }
648
649 async fn stage_merge_insert(
650 &self,
651 snapshot: SnapshotHandle,
652 batch: RecordBatch,
653 key_columns: Vec<String>,
654 when_matched: WhenMatched,
655 when_not_matched: WhenNotMatched,
656 ) -> Result<StagedHandle> {
657 let ds = Arc::try_unwrap(snapshot.into_arc())
658 .unwrap_or_else(|arc| (*arc).clone());
659 TableStore::stage_merge_insert(
660 self,
661 ds,
662 batch,
663 key_columns,
664 when_matched,
665 when_not_matched,
666 )
667 .await
668 .map(StagedHandle::new)
669 }
670
671 async fn commit_staged(
672 &self,
673 snapshot: SnapshotHandle,
674 staged: StagedHandle,
675 ) -> Result<SnapshotHandle> {
676 let ds_arc = snapshot.into_arc();
677 let transaction = staged.into_staged().transaction;
678 TableStore::commit_staged(self, ds_arc, transaction)
679 .await
680 .map(SnapshotHandle::new)
681 }
682
683 async fn stage_overwrite(
684 &self,
685 snapshot: &SnapshotHandle,
686 batch: RecordBatch,
687 ) -> Result<StagedHandle> {
688 TableStore::stage_overwrite(self, snapshot.dataset(), batch)
689 .await
690 .map(StagedHandle::new)
691 }
692
693 async fn stage_create_btree_index(
694 &self,
695 snapshot: &SnapshotHandle,
696 columns: &[&str],
697 ) -> Result<StagedHandle> {
698 TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
699 .await
700 .map(StagedHandle::new)
701 }
702
703 async fn stage_create_inverted_index(
704 &self,
705 snapshot: &SnapshotHandle,
706 column: &str,
707 ) -> Result<StagedHandle> {
708 TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
709 .await
710 .map(StagedHandle::new)
711 }
712
713 async fn append_batch(
714 &self,
715 dataset_uri: &str,
716 snapshot: SnapshotHandle,
717 batch: RecordBatch,
718 ) -> Result<(SnapshotHandle, TableState)> {
719 let mut ds = Arc::try_unwrap(snapshot.into_arc())
720 .unwrap_or_else(|arc| (*arc).clone());
721 let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?;
722 Ok((SnapshotHandle::new(ds), state))
723 }
724
725 async fn merge_insert_batches(
726 &self,
727 dataset_uri: &str,
728 snapshot: SnapshotHandle,
729 batches: Vec<RecordBatch>,
730 key_columns: Vec<String>,
731 when_matched: WhenMatched,
732 when_not_matched: WhenNotMatched,
733 ) -> Result<TableState> {
734 let ds = Arc::try_unwrap(snapshot.into_arc())
735 .unwrap_or_else(|arc| (*arc).clone());
736 TableStore::merge_insert_batches(
737 self,
738 dataset_uri,
739 ds,
740 batches,
741 key_columns,
742 when_matched,
743 when_not_matched,
744 )
745 .await
746 }
747
748 async fn overwrite_batch(
749 &self,
750 dataset_uri: &str,
751 snapshot: SnapshotHandle,
752 batch: RecordBatch,
753 ) -> Result<(SnapshotHandle, TableState)> {
754 let mut ds = Arc::try_unwrap(snapshot.into_arc())
755 .unwrap_or_else(|arc| (*arc).clone());
756 let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
757 Ok((SnapshotHandle::new(ds), state))
758 }
759
760 async fn delete_where(
761 &self,
762 dataset_uri: &str,
763 snapshot: SnapshotHandle,
764 filter: &str,
765 ) -> Result<(SnapshotHandle, DeleteState)> {
766 let mut ds = Arc::try_unwrap(snapshot.into_arc())
767 .unwrap_or_else(|arc| (*arc).clone());
768 let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
769 Ok((SnapshotHandle::new(ds), state))
770 }
771
772 async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
773 TableStore::has_btree_index(self, snapshot.dataset(), column).await
774 }
775
776 async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
777 TableStore::has_fts_index(self, snapshot.dataset(), column).await
778 }
779
780 async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
781 TableStore::has_vector_index(self, snapshot.dataset(), column).await
782 }
783
784 async fn create_btree_index(
785 &self,
786 snapshot: SnapshotHandle,
787 columns: &[&str],
788 ) -> Result<SnapshotHandle> {
789 let mut ds = Arc::try_unwrap(snapshot.into_arc())
790 .unwrap_or_else(|arc| (*arc).clone());
791 TableStore::create_btree_index(self, &mut ds, columns).await?;
792 Ok(SnapshotHandle::new(ds))
793 }
794
795 async fn create_inverted_index(
796 &self,
797 snapshot: SnapshotHandle,
798 column: &str,
799 ) -> Result<SnapshotHandle> {
800 let mut ds = Arc::try_unwrap(snapshot.into_arc())
801 .unwrap_or_else(|arc| (*arc).clone());
802 TableStore::create_inverted_index(self, &mut ds, column).await?;
803 Ok(SnapshotHandle::new(ds))
804 }
805
806 async fn create_vector_index(
807 &self,
808 snapshot: SnapshotHandle,
809 column: &str,
810 ) -> Result<SnapshotHandle> {
811 let mut ds = Arc::try_unwrap(snapshot.into_arc())
812 .unwrap_or_else(|arc| (*arc).clone());
813 TableStore::create_vector_index(self, &mut ds, column).await?;
814 Ok(SnapshotHandle::new(ds))
815 }
816
817 fn root_uri(&self) -> &str {
818 TableStore::root_uri(self)
819 }
820
821 fn dataset_uri(&self, table_path: &str) -> String {
822 TableStore::dataset_uri(self, table_path)
823 }
824
825 async fn scan_stream(
826 &self,
827 snapshot: &SnapshotHandle,
828 projection: Option<&[&str]>,
829 filter: Option<&str>,
830 order_by: Option<Vec<ColumnOrdering>>,
831 with_row_id: bool,
832 ) -> Result<DatasetRecordBatchStream> {
833 TableStore::scan_stream(snapshot.dataset(), projection, filter, order_by, with_row_id).await
837 }
838}