1use std::fmt::Debug;
54use std::sync::Arc;
55
56use arrow_array::RecordBatch;
57use arrow_schema::SchemaRef;
58use async_trait::async_trait;
59use lance::Dataset;
60use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
61use lance::dataset::{WhenMatched, WhenNotMatched};
62
63use crate::db::{Snapshot, SubTableEntry};
64use crate::error::Result;
65use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
66
67pub(crate) mod sealed {
70 pub trait Sealed {}
75
76 impl Sealed for crate::table_store::TableStore {}
77}
78
79#[derive(Debug, Clone)]
89pub struct SnapshotHandle {
90 pub(crate) inner: Arc<Dataset>,
91}
92
93impl SnapshotHandle {
94 pub(crate) fn new(ds: Dataset) -> Self {
97 Self { inner: Arc::new(ds) }
98 }
99
100 pub(crate) fn dataset(&self) -> &Dataset {
103 &self.inner
104 }
105
106 pub(crate) fn into_arc(self) -> Arc<Dataset> {
109 self.inner
110 }
111
112 pub fn version(&self) -> u64 {
116 self.inner.version().version
117 }
118
119 pub fn uses_stable_row_ids(&self) -> bool {
121 self.inner.manifest.uses_stable_row_ids()
122 }
123}
124
125#[derive(Debug, Clone)]
132pub struct StagedHandle {
133 pub(crate) inner: StagedWrite,
134}
135
136impl StagedHandle {
137 pub(crate) fn new(staged: StagedWrite) -> Self {
138 Self { inner: staged }
139 }
140
141 pub(crate) fn into_staged(self) -> StagedWrite {
144 self.inner
145 }
146}
147
148pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
155 handles.iter().map(|h| h.inner.clone()).collect()
156}
157
158#[async_trait]
167pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
168 async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
171
172 async fn open_snapshot_at_table(
173 &self,
174 snapshot: &Snapshot,
175 table_key: &str,
176 ) -> Result<SnapshotHandle>;
177
178 async fn open_dataset_head(
179 &self,
180 dataset_uri: &str,
181 branch: Option<&str>,
182 ) -> Result<SnapshotHandle>;
183
184 async fn open_dataset_head_for_write(
185 &self,
186 table_key: &str,
187 dataset_uri: &str,
188 branch: Option<&str>,
189 ) -> Result<SnapshotHandle>;
190
191 async fn open_dataset_at_state(
192 &self,
193 table_path: &str,
194 branch: Option<&str>,
195 version: u64,
196 ) -> Result<SnapshotHandle>;
197
198 async fn fork_branch_from_state(
199 &self,
200 dataset_uri: &str,
201 source_branch: Option<&str>,
202 table_key: &str,
203 source_version: u64,
204 target_branch: &str,
205 ) -> Result<SnapshotHandle>;
206
207 async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
208
209 async fn reopen_for_mutation(
210 &self,
211 dataset_uri: &str,
212 branch: Option<&str>,
213 table_key: &str,
214 expected_version: u64,
215 ) -> Result<SnapshotHandle>;
216
217 fn ensure_expected_version(
218 &self,
219 snapshot: &SnapshotHandle,
220 table_key: &str,
221 expected_version: u64,
222 ) -> Result<()>;
223
224 async fn scan(
227 &self,
228 snapshot: &SnapshotHandle,
229 projection: Option<&[&str]>,
230 filter: Option<&str>,
231 order_by: Option<Vec<ColumnOrdering>>,
232 ) -> Result<Vec<RecordBatch>>;
233
234 async fn scan_with_row_id(
235 &self,
236 snapshot: &SnapshotHandle,
237 projection: Option<&[&str]>,
238 filter: Option<&str>,
239 order_by: Option<Vec<ColumnOrdering>>,
240 with_row_id: bool,
241 ) -> Result<Vec<RecordBatch>>;
242
243 async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
244
245 async fn scan_batches_for_rewrite(
246 &self,
247 snapshot: &SnapshotHandle,
248 ) -> Result<Vec<RecordBatch>>;
249
250 async fn count_rows(
251 &self,
252 snapshot: &SnapshotHandle,
253 filter: Option<String>,
254 ) -> Result<usize>;
255
256 async fn count_rows_with_staged(
257 &self,
258 snapshot: &SnapshotHandle,
259 staged: &[StagedHandle],
260 filter: Option<String>,
261 ) -> Result<usize>;
262
263 async fn scan_with_staged(
264 &self,
265 snapshot: &SnapshotHandle,
266 staged: &[StagedHandle],
267 projection: Option<&[&str]>,
268 filter: Option<&str>,
269 ) -> Result<Vec<RecordBatch>>;
270
271 async fn scan_with_pending(
272 &self,
273 snapshot: &SnapshotHandle,
274 pending: &[RecordBatch],
275 pending_schema: Option<SchemaRef>,
276 projection: Option<&[&str]>,
277 filter: Option<&str>,
278 key_column: Option<&str>,
279 ) -> Result<Vec<RecordBatch>>;
280
281 async fn first_row_id_for_filter(
282 &self,
283 snapshot: &SnapshotHandle,
284 filter: &str,
285 ) -> Result<Option<u64>>;
286
287 async fn table_state(
288 &self,
289 dataset_uri: &str,
290 snapshot: &SnapshotHandle,
291 ) -> Result<TableState>;
292
293 async fn stage_append(
296 &self,
297 snapshot: &SnapshotHandle,
298 batch: RecordBatch,
299 prior_stages: &[StagedHandle],
300 ) -> Result<StagedHandle>;
301
302 async fn stage_merge_insert(
303 &self,
304 snapshot: SnapshotHandle,
305 batch: RecordBatch,
306 key_columns: Vec<String>,
307 when_matched: WhenMatched,
308 when_not_matched: WhenNotMatched,
309 ) -> Result<StagedHandle>;
310
311 async fn commit_staged(
312 &self,
313 snapshot: SnapshotHandle,
314 staged: StagedHandle,
315 ) -> Result<SnapshotHandle>;
316
317 async fn stage_overwrite(
319 &self,
320 snapshot: &SnapshotHandle,
321 batch: RecordBatch,
322 ) -> Result<StagedHandle>;
323
324 async fn stage_create_btree_index(
326 &self,
327 snapshot: &SnapshotHandle,
328 columns: &[&str],
329 ) -> Result<StagedHandle>;
330
331 async fn stage_create_inverted_index(
333 &self,
334 snapshot: &SnapshotHandle,
335 column: &str,
336 ) -> Result<StagedHandle>;
337
338 async fn append_batch(
354 &self,
355 dataset_uri: &str,
356 snapshot: SnapshotHandle,
357 batch: RecordBatch,
358 ) -> Result<(SnapshotHandle, TableState)>;
359
360 async fn merge_insert_batches(
361 &self,
362 dataset_uri: &str,
363 snapshot: SnapshotHandle,
364 batches: Vec<RecordBatch>,
365 key_columns: Vec<String>,
366 when_matched: WhenMatched,
367 when_not_matched: WhenNotMatched,
368 ) -> Result<TableState>;
369
370 async fn overwrite_batch(
371 &self,
372 dataset_uri: &str,
373 snapshot: SnapshotHandle,
374 batch: RecordBatch,
375 ) -> Result<(SnapshotHandle, TableState)>;
376
377 async fn delete_where(
378 &self,
379 dataset_uri: &str,
380 snapshot: SnapshotHandle,
381 filter: &str,
382 ) -> Result<(SnapshotHandle, DeleteState)>;
383
384 async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
385 async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
386 async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
387
388 async fn create_btree_index(
389 &self,
390 snapshot: SnapshotHandle,
391 columns: &[&str],
392 ) -> Result<SnapshotHandle>;
393
394 async fn create_inverted_index(
395 &self,
396 snapshot: SnapshotHandle,
397 column: &str,
398 ) -> Result<SnapshotHandle>;
399
400 async fn create_vector_index(
401 &self,
402 snapshot: SnapshotHandle,
403 column: &str,
404 ) -> Result<SnapshotHandle>;
405
406 fn root_uri(&self) -> &str;
413 fn dataset_uri(&self, table_path: &str) -> String;
414
415 async fn scan_stream(
423 &self,
424 snapshot: &SnapshotHandle,
425 projection: Option<&[&str]>,
426 filter: Option<&str>,
427 order_by: Option<Vec<ColumnOrdering>>,
428 with_row_id: bool,
429 ) -> Result<DatasetRecordBatchStream>;
430}
431
432#[async_trait]
435impl TableStorage for TableStore {
436 async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
437 self.open_at_entry(entry).await.map(SnapshotHandle::new)
438 }
439
440 async fn open_snapshot_at_table(
441 &self,
442 snapshot: &Snapshot,
443 table_key: &str,
444 ) -> Result<SnapshotHandle> {
445 self.open_snapshot_table(snapshot, table_key)
446 .await
447 .map(SnapshotHandle::new)
448 }
449
450 async fn open_dataset_head(
451 &self,
452 dataset_uri: &str,
453 branch: Option<&str>,
454 ) -> Result<SnapshotHandle> {
455 TableStore::open_dataset_head(self, dataset_uri, branch)
456 .await
457 .map(SnapshotHandle::new)
458 }
459
460 async fn open_dataset_head_for_write(
461 &self,
462 table_key: &str,
463 dataset_uri: &str,
464 branch: Option<&str>,
465 ) -> Result<SnapshotHandle> {
466 TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
467 .await
468 .map(SnapshotHandle::new)
469 }
470
471 async fn open_dataset_at_state(
472 &self,
473 table_path: &str,
474 branch: Option<&str>,
475 version: u64,
476 ) -> Result<SnapshotHandle> {
477 TableStore::open_dataset_at_state(self, table_path, branch, version)
478 .await
479 .map(SnapshotHandle::new)
480 }
481
482 async fn fork_branch_from_state(
483 &self,
484 dataset_uri: &str,
485 source_branch: Option<&str>,
486 table_key: &str,
487 source_version: u64,
488 target_branch: &str,
489 ) -> Result<SnapshotHandle> {
490 TableStore::fork_branch_from_state(
491 self,
492 dataset_uri,
493 source_branch,
494 table_key,
495 source_version,
496 target_branch,
497 )
498 .await
499 .map(SnapshotHandle::new)
500 }
501
502 async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
503 TableStore::delete_branch(self, dataset_uri, branch).await
504 }
505
506 async fn reopen_for_mutation(
507 &self,
508 dataset_uri: &str,
509 branch: Option<&str>,
510 table_key: &str,
511 expected_version: u64,
512 ) -> Result<SnapshotHandle> {
513 TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
514 .await
515 .map(SnapshotHandle::new)
516 }
517
518 fn ensure_expected_version(
519 &self,
520 snapshot: &SnapshotHandle,
521 table_key: &str,
522 expected_version: u64,
523 ) -> Result<()> {
524 TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
525 }
526
527 async fn scan(
528 &self,
529 snapshot: &SnapshotHandle,
530 projection: Option<&[&str]>,
531 filter: Option<&str>,
532 order_by: Option<Vec<ColumnOrdering>>,
533 ) -> Result<Vec<RecordBatch>> {
534 TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
535 }
536
537 async fn scan_with_row_id(
538 &self,
539 snapshot: &SnapshotHandle,
540 projection: Option<&[&str]>,
541 filter: Option<&str>,
542 order_by: Option<Vec<ColumnOrdering>>,
543 with_row_id: bool,
544 ) -> Result<Vec<RecordBatch>> {
545 TableStore::scan_with(
546 self,
547 snapshot.dataset(),
548 projection,
549 filter,
550 order_by,
551 with_row_id,
552 |_| Ok(()),
553 )
554 .await
555 }
556
557 async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
558 TableStore::scan_batches(self, snapshot.dataset()).await
559 }
560
561 async fn scan_batches_for_rewrite(
562 &self,
563 snapshot: &SnapshotHandle,
564 ) -> Result<Vec<RecordBatch>> {
565 TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
566 }
567
568 async fn count_rows(
569 &self,
570 snapshot: &SnapshotHandle,
571 filter: Option<String>,
572 ) -> Result<usize> {
573 TableStore::count_rows(self, snapshot.dataset(), filter).await
574 }
575
576 async fn count_rows_with_staged(
577 &self,
578 snapshot: &SnapshotHandle,
579 staged: &[StagedHandle],
580 filter: Option<String>,
581 ) -> Result<usize> {
582 let staged_writes = staged_handles_as_writes(staged);
583 TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
584 }
585
586 async fn scan_with_staged(
587 &self,
588 snapshot: &SnapshotHandle,
589 staged: &[StagedHandle],
590 projection: Option<&[&str]>,
591 filter: Option<&str>,
592 ) -> Result<Vec<RecordBatch>> {
593 let staged_writes = staged_handles_as_writes(staged);
594 TableStore::scan_with_staged(
595 self,
596 snapshot.dataset(),
597 &staged_writes,
598 projection,
599 filter,
600 )
601 .await
602 }
603
604 async fn scan_with_pending(
605 &self,
606 snapshot: &SnapshotHandle,
607 pending: &[RecordBatch],
608 pending_schema: Option<SchemaRef>,
609 projection: Option<&[&str]>,
610 filter: Option<&str>,
611 key_column: Option<&str>,
612 ) -> Result<Vec<RecordBatch>> {
613 TableStore::scan_with_pending(
614 self,
615 snapshot.dataset(),
616 pending,
617 pending_schema,
618 projection,
619 filter,
620 key_column,
621 )
622 .await
623 }
624
625 async fn first_row_id_for_filter(
626 &self,
627 snapshot: &SnapshotHandle,
628 filter: &str,
629 ) -> Result<Option<u64>> {
630 TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
631 }
632
633 async fn table_state(
634 &self,
635 dataset_uri: &str,
636 snapshot: &SnapshotHandle,
637 ) -> Result<TableState> {
638 TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
639 }
640
641 async fn stage_append(
642 &self,
643 snapshot: &SnapshotHandle,
644 batch: RecordBatch,
645 prior_stages: &[StagedHandle],
646 ) -> Result<StagedHandle> {
647 let staged_writes = staged_handles_as_writes(prior_stages);
648 TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
649 .await
650 .map(StagedHandle::new)
651 }
652
653 async fn stage_merge_insert(
654 &self,
655 snapshot: SnapshotHandle,
656 batch: RecordBatch,
657 key_columns: Vec<String>,
658 when_matched: WhenMatched,
659 when_not_matched: WhenNotMatched,
660 ) -> Result<StagedHandle> {
661 let ds = Arc::try_unwrap(snapshot.into_arc())
662 .unwrap_or_else(|arc| (*arc).clone());
663 TableStore::stage_merge_insert(
664 self,
665 ds,
666 batch,
667 key_columns,
668 when_matched,
669 when_not_matched,
670 )
671 .await
672 .map(StagedHandle::new)
673 }
674
675 async fn commit_staged(
676 &self,
677 snapshot: SnapshotHandle,
678 staged: StagedHandle,
679 ) -> Result<SnapshotHandle> {
680 let ds_arc = snapshot.into_arc();
681 let transaction = staged.into_staged().transaction;
682 TableStore::commit_staged(self, ds_arc, transaction)
683 .await
684 .map(SnapshotHandle::new)
685 }
686
687 async fn stage_overwrite(
688 &self,
689 snapshot: &SnapshotHandle,
690 batch: RecordBatch,
691 ) -> Result<StagedHandle> {
692 TableStore::stage_overwrite(self, snapshot.dataset(), batch)
693 .await
694 .map(StagedHandle::new)
695 }
696
697 async fn stage_create_btree_index(
698 &self,
699 snapshot: &SnapshotHandle,
700 columns: &[&str],
701 ) -> Result<StagedHandle> {
702 TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
703 .await
704 .map(StagedHandle::new)
705 }
706
707 async fn stage_create_inverted_index(
708 &self,
709 snapshot: &SnapshotHandle,
710 column: &str,
711 ) -> Result<StagedHandle> {
712 TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
713 .await
714 .map(StagedHandle::new)
715 }
716
717 async fn append_batch(
718 &self,
719 dataset_uri: &str,
720 snapshot: SnapshotHandle,
721 batch: RecordBatch,
722 ) -> Result<(SnapshotHandle, TableState)> {
723 let mut ds = Arc::try_unwrap(snapshot.into_arc())
724 .unwrap_or_else(|arc| (*arc).clone());
725 let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?;
726 Ok((SnapshotHandle::new(ds), state))
727 }
728
729 async fn merge_insert_batches(
730 &self,
731 dataset_uri: &str,
732 snapshot: SnapshotHandle,
733 batches: Vec<RecordBatch>,
734 key_columns: Vec<String>,
735 when_matched: WhenMatched,
736 when_not_matched: WhenNotMatched,
737 ) -> Result<TableState> {
738 let ds = Arc::try_unwrap(snapshot.into_arc())
739 .unwrap_or_else(|arc| (*arc).clone());
740 TableStore::merge_insert_batches(
741 self,
742 dataset_uri,
743 ds,
744 batches,
745 key_columns,
746 when_matched,
747 when_not_matched,
748 )
749 .await
750 }
751
752 async fn overwrite_batch(
753 &self,
754 dataset_uri: &str,
755 snapshot: SnapshotHandle,
756 batch: RecordBatch,
757 ) -> Result<(SnapshotHandle, TableState)> {
758 let mut ds = Arc::try_unwrap(snapshot.into_arc())
759 .unwrap_or_else(|arc| (*arc).clone());
760 let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
761 Ok((SnapshotHandle::new(ds), state))
762 }
763
764 async fn delete_where(
765 &self,
766 dataset_uri: &str,
767 snapshot: SnapshotHandle,
768 filter: &str,
769 ) -> Result<(SnapshotHandle, DeleteState)> {
770 let mut ds = Arc::try_unwrap(snapshot.into_arc())
771 .unwrap_or_else(|arc| (*arc).clone());
772 let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
773 Ok((SnapshotHandle::new(ds), state))
774 }
775
776 async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
777 TableStore::has_btree_index(self, snapshot.dataset(), column).await
778 }
779
780 async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
781 TableStore::has_fts_index(self, snapshot.dataset(), column).await
782 }
783
784 async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
785 TableStore::has_vector_index(self, snapshot.dataset(), column).await
786 }
787
788 async fn create_btree_index(
789 &self,
790 snapshot: SnapshotHandle,
791 columns: &[&str],
792 ) -> Result<SnapshotHandle> {
793 let mut ds = Arc::try_unwrap(snapshot.into_arc())
794 .unwrap_or_else(|arc| (*arc).clone());
795 TableStore::create_btree_index(self, &mut ds, columns).await?;
796 Ok(SnapshotHandle::new(ds))
797 }
798
799 async fn create_inverted_index(
800 &self,
801 snapshot: SnapshotHandle,
802 column: &str,
803 ) -> Result<SnapshotHandle> {
804 let mut ds = Arc::try_unwrap(snapshot.into_arc())
805 .unwrap_or_else(|arc| (*arc).clone());
806 TableStore::create_inverted_index(self, &mut ds, column).await?;
807 Ok(SnapshotHandle::new(ds))
808 }
809
810 async fn create_vector_index(
811 &self,
812 snapshot: SnapshotHandle,
813 column: &str,
814 ) -> Result<SnapshotHandle> {
815 let mut ds = Arc::try_unwrap(snapshot.into_arc())
816 .unwrap_or_else(|arc| (*arc).clone());
817 TableStore::create_vector_index(self, &mut ds, column).await?;
818 Ok(SnapshotHandle::new(ds))
819 }
820
821 fn root_uri(&self) -> &str {
822 TableStore::root_uri(self)
823 }
824
825 fn dataset_uri(&self, table_path: &str) -> String {
826 TableStore::dataset_uri(self, table_path)
827 }
828
829 async fn scan_stream(
830 &self,
831 snapshot: &SnapshotHandle,
832 projection: Option<&[&str]>,
833 filter: Option<&str>,
834 order_by: Option<Vec<ColumnOrdering>>,
835 with_row_id: bool,
836 ) -> Result<DatasetRecordBatchStream> {
837 TableStore::scan_stream(snapshot.dataset(), projection, filter, order_by, with_row_id).await
841 }
842}