1use std::fmt::Debug;
56use std::sync::Arc;
57
58use arrow_array::RecordBatch;
59use arrow_schema::SchemaRef;
60use async_trait::async_trait;
61use lance::Dataset;
62use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
63use lance::dataset::{WhenMatched, WhenNotMatched};
64
65use crate::db::{Snapshot, SubTableEntry};
66use crate::error::Result;
67use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
68
69pub(crate) mod sealed {
72 pub trait Sealed {}
77
78 impl Sealed for crate::table_store::TableStore {}
79}
80
81#[derive(Debug, Clone)]
91pub struct SnapshotHandle {
92 pub(crate) inner: Arc<Dataset>,
93}
94
95impl SnapshotHandle {
96 pub(crate) fn new(ds: Dataset) -> Self {
99 Self {
100 inner: Arc::new(ds),
101 }
102 }
103
104 pub(crate) fn dataset(&self) -> &Dataset {
107 &self.inner
108 }
109
110 pub(crate) fn into_arc(self) -> Arc<Dataset> {
123 self.inner
124 }
125
126 pub(crate) fn into_dataset(self) -> Dataset {
138 Arc::try_unwrap(self.inner).unwrap_or_else(|arc| (*arc).clone())
139 }
140
141 pub fn version(&self) -> u64 {
145 self.inner.version().version
146 }
147
148 pub fn uses_stable_row_ids(&self) -> bool {
150 self.inner.manifest.uses_stable_row_ids()
151 }
152}
153
154#[derive(Debug, Clone)]
161pub struct StagedHandle {
162 pub(crate) inner: StagedWrite,
163}
164
165impl StagedHandle {
166 pub(crate) fn new(staged: StagedWrite) -> Self {
167 Self { inner: staged }
168 }
169
170 pub(crate) fn into_staged(self) -> StagedWrite {
173 self.inner
174 }
175}
176
177pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
184 handles.iter().map(|h| h.inner.clone()).collect()
185}
186
187pub enum ForkOutcome<D> {
203 Created(D),
204 RefAlreadyExists,
205}
206
207#[async_trait]
216pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
217 async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
220
221 async fn open_snapshot_at_table(
222 &self,
223 snapshot: &Snapshot,
224 table_key: &str,
225 ) -> Result<SnapshotHandle>;
226
227 async fn open_dataset_head(
228 &self,
229 dataset_uri: &str,
230 branch: Option<&str>,
231 ) -> Result<SnapshotHandle>;
232
233 async fn open_dataset_head_for_write(
234 &self,
235 table_key: &str,
236 dataset_uri: &str,
237 branch: Option<&str>,
238 ) -> Result<SnapshotHandle>;
239
240 async fn open_dataset_at_state(
241 &self,
242 table_path: &str,
243 branch: Option<&str>,
244 version: u64,
245 ) -> Result<SnapshotHandle>;
246
247 async fn fork_branch_from_state(
248 &self,
249 dataset_uri: &str,
250 source_branch: Option<&str>,
251 table_key: &str,
252 source_version: u64,
253 target_branch: &str,
254 ) -> Result<ForkOutcome<SnapshotHandle>>;
255
256 async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
257
258 async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
265
266 async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>>;
271
272 async fn reopen_for_mutation(
273 &self,
274 dataset_uri: &str,
275 branch: Option<&str>,
276 table_key: &str,
277 expected_version: u64,
278 ) -> Result<SnapshotHandle>;
279
280 fn ensure_expected_version(
281 &self,
282 snapshot: &SnapshotHandle,
283 table_key: &str,
284 expected_version: u64,
285 ) -> Result<()>;
286
287 async fn scan(
290 &self,
291 snapshot: &SnapshotHandle,
292 projection: Option<&[&str]>,
293 filter: Option<&str>,
294 order_by: Option<Vec<ColumnOrdering>>,
295 ) -> Result<Vec<RecordBatch>>;
296
297 async fn scan_with_row_id(
298 &self,
299 snapshot: &SnapshotHandle,
300 projection: Option<&[&str]>,
301 filter: Option<&str>,
302 order_by: Option<Vec<ColumnOrdering>>,
303 with_row_id: bool,
304 ) -> Result<Vec<RecordBatch>>;
305
306 async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
307
308 async fn scan_batches_for_rewrite(&self, snapshot: &SnapshotHandle)
309 -> Result<Vec<RecordBatch>>;
310
311 async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize>;
312
313 async fn count_rows_with_staged(
314 &self,
315 snapshot: &SnapshotHandle,
316 staged: &[StagedHandle],
317 filter: Option<String>,
318 ) -> Result<usize>;
319
320 async fn scan_with_staged(
321 &self,
322 snapshot: &SnapshotHandle,
323 staged: &[StagedHandle],
324 projection: Option<&[&str]>,
325 filter: Option<&str>,
326 ) -> Result<Vec<RecordBatch>>;
327
328 async fn scan_with_pending(
329 &self,
330 snapshot: &SnapshotHandle,
331 pending: &[RecordBatch],
332 pending_schema: Option<SchemaRef>,
333 projection: Option<&[&str]>,
334 filter: Option<&str>,
335 key_column: Option<&str>,
336 ) -> Result<Vec<RecordBatch>>;
337
338 async fn first_row_id_for_filter(
339 &self,
340 snapshot: &SnapshotHandle,
341 filter: &str,
342 ) -> Result<Option<u64>>;
343
344 async fn table_state(&self, dataset_uri: &str, snapshot: &SnapshotHandle)
345 -> Result<TableState>;
346
347 async fn stage_append(
350 &self,
351 snapshot: &SnapshotHandle,
352 batch: RecordBatch,
353 prior_stages: &[StagedHandle],
354 ) -> Result<StagedHandle>;
355
356 async fn stage_append_stream(
359 &self,
360 snapshot: &SnapshotHandle,
361 source: &SnapshotHandle,
362 prior_stages: &[StagedHandle],
363 ) -> Result<StagedHandle>;
364
365 async fn stage_merge_insert(
366 &self,
367 snapshot: SnapshotHandle,
368 batch: RecordBatch,
369 key_columns: Vec<String>,
370 when_matched: WhenMatched,
371 when_not_matched: WhenNotMatched,
372 ) -> Result<StagedHandle>;
373
374 async fn commit_staged(
375 &self,
376 snapshot: SnapshotHandle,
377 staged: StagedHandle,
378 ) -> Result<SnapshotHandle>;
379
380 async fn stage_overwrite(
382 &self,
383 snapshot: &SnapshotHandle,
384 batch: RecordBatch,
385 ) -> Result<StagedHandle>;
386
387 async fn stage_create_btree_index(
389 &self,
390 snapshot: &SnapshotHandle,
391 columns: &[&str],
392 ) -> Result<StagedHandle>;
393
394 async fn stage_create_inverted_index(
396 &self,
397 snapshot: &SnapshotHandle,
398 column: &str,
399 ) -> Result<StagedHandle>;
400
401 async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
411 async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
412 async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
413
414 fn root_uri(&self) -> &str;
421 fn dataset_uri(&self, table_path: &str) -> String;
422
423 async fn scan_stream(
431 &self,
432 snapshot: &SnapshotHandle,
433 projection: Option<&[&str]>,
434 filter: Option<&str>,
435 order_by: Option<Vec<ColumnOrdering>>,
436 with_row_id: bool,
437 ) -> Result<DatasetRecordBatchStream>;
438}
439
440#[async_trait]
457pub(crate) trait InlineCommitResidual: sealed::Sealed + Send + Sync + Debug {
458 async fn delete_where(
459 &self,
460 dataset_uri: &str,
461 snapshot: SnapshotHandle,
462 filter: &str,
463 ) -> Result<(SnapshotHandle, DeleteState)>;
464
465 async fn create_vector_index(
466 &self,
467 snapshot: SnapshotHandle,
468 column: &str,
469 ) -> Result<SnapshotHandle>;
470}
471
472#[async_trait]
475impl TableStorage for TableStore {
476 async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
477 self.open_at_entry(entry).await.map(SnapshotHandle::new)
478 }
479
480 async fn open_snapshot_at_table(
481 &self,
482 snapshot: &Snapshot,
483 table_key: &str,
484 ) -> Result<SnapshotHandle> {
485 self.open_snapshot_table(snapshot, table_key)
486 .await
487 .map(SnapshotHandle::new)
488 }
489
490 async fn open_dataset_head(
491 &self,
492 dataset_uri: &str,
493 branch: Option<&str>,
494 ) -> Result<SnapshotHandle> {
495 TableStore::open_dataset_head(self, dataset_uri, branch)
496 .await
497 .map(SnapshotHandle::new)
498 }
499
500 async fn open_dataset_head_for_write(
501 &self,
502 table_key: &str,
503 dataset_uri: &str,
504 branch: Option<&str>,
505 ) -> Result<SnapshotHandle> {
506 TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
507 .await
508 .map(SnapshotHandle::new)
509 }
510
511 async fn open_dataset_at_state(
512 &self,
513 table_path: &str,
514 branch: Option<&str>,
515 version: u64,
516 ) -> Result<SnapshotHandle> {
517 TableStore::open_dataset_at_state(self, table_path, branch, version)
518 .await
519 .map(SnapshotHandle::new)
520 }
521
522 async fn fork_branch_from_state(
523 &self,
524 dataset_uri: &str,
525 source_branch: Option<&str>,
526 table_key: &str,
527 source_version: u64,
528 target_branch: &str,
529 ) -> Result<ForkOutcome<SnapshotHandle>> {
530 Ok(
531 match TableStore::fork_branch_from_state(
532 self,
533 dataset_uri,
534 source_branch,
535 table_key,
536 source_version,
537 target_branch,
538 )
539 .await?
540 {
541 ForkOutcome::Created(ds) => ForkOutcome::Created(SnapshotHandle::new(ds)),
542 ForkOutcome::RefAlreadyExists => ForkOutcome::RefAlreadyExists,
543 },
544 )
545 }
546
547 async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
548 TableStore::delete_branch(self, dataset_uri, branch).await
549 }
550
551 async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
552 TableStore::force_delete_branch(self, dataset_uri, branch).await
553 }
554
555 async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>> {
556 TableStore::list_branches(self, dataset_uri).await
557 }
558
559 async fn reopen_for_mutation(
560 &self,
561 dataset_uri: &str,
562 branch: Option<&str>,
563 table_key: &str,
564 expected_version: u64,
565 ) -> Result<SnapshotHandle> {
566 TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
567 .await
568 .map(SnapshotHandle::new)
569 }
570
571 fn ensure_expected_version(
572 &self,
573 snapshot: &SnapshotHandle,
574 table_key: &str,
575 expected_version: u64,
576 ) -> Result<()> {
577 TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
578 }
579
580 async fn scan(
581 &self,
582 snapshot: &SnapshotHandle,
583 projection: Option<&[&str]>,
584 filter: Option<&str>,
585 order_by: Option<Vec<ColumnOrdering>>,
586 ) -> Result<Vec<RecordBatch>> {
587 TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
588 }
589
590 async fn scan_with_row_id(
591 &self,
592 snapshot: &SnapshotHandle,
593 projection: Option<&[&str]>,
594 filter: Option<&str>,
595 order_by: Option<Vec<ColumnOrdering>>,
596 with_row_id: bool,
597 ) -> Result<Vec<RecordBatch>> {
598 TableStore::scan_with(
599 self,
600 snapshot.dataset(),
601 projection,
602 filter,
603 order_by,
604 with_row_id,
605 |_| Ok(()),
606 )
607 .await
608 }
609
610 async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
611 TableStore::scan_batches(self, snapshot.dataset()).await
612 }
613
614 async fn scan_batches_for_rewrite(
615 &self,
616 snapshot: &SnapshotHandle,
617 ) -> Result<Vec<RecordBatch>> {
618 TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
619 }
620
621 async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize> {
622 TableStore::count_rows(self, snapshot.dataset(), filter).await
623 }
624
625 async fn count_rows_with_staged(
626 &self,
627 snapshot: &SnapshotHandle,
628 staged: &[StagedHandle],
629 filter: Option<String>,
630 ) -> Result<usize> {
631 let staged_writes = staged_handles_as_writes(staged);
632 TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
633 }
634
635 async fn scan_with_staged(
636 &self,
637 snapshot: &SnapshotHandle,
638 staged: &[StagedHandle],
639 projection: Option<&[&str]>,
640 filter: Option<&str>,
641 ) -> Result<Vec<RecordBatch>> {
642 let staged_writes = staged_handles_as_writes(staged);
643 TableStore::scan_with_staged(self, snapshot.dataset(), &staged_writes, projection, filter)
644 .await
645 }
646
647 async fn scan_with_pending(
648 &self,
649 snapshot: &SnapshotHandle,
650 pending: &[RecordBatch],
651 pending_schema: Option<SchemaRef>,
652 projection: Option<&[&str]>,
653 filter: Option<&str>,
654 key_column: Option<&str>,
655 ) -> Result<Vec<RecordBatch>> {
656 TableStore::scan_with_pending(
657 self,
658 snapshot.dataset(),
659 pending,
660 pending_schema,
661 projection,
662 filter,
663 key_column,
664 )
665 .await
666 }
667
668 async fn first_row_id_for_filter(
669 &self,
670 snapshot: &SnapshotHandle,
671 filter: &str,
672 ) -> Result<Option<u64>> {
673 TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
674 }
675
676 async fn table_state(
677 &self,
678 dataset_uri: &str,
679 snapshot: &SnapshotHandle,
680 ) -> Result<TableState> {
681 TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
682 }
683
684 async fn stage_append(
685 &self,
686 snapshot: &SnapshotHandle,
687 batch: RecordBatch,
688 prior_stages: &[StagedHandle],
689 ) -> Result<StagedHandle> {
690 let staged_writes = staged_handles_as_writes(prior_stages);
691 TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
692 .await
693 .map(StagedHandle::new)
694 }
695
696 async fn stage_append_stream(
697 &self,
698 snapshot: &SnapshotHandle,
699 source: &SnapshotHandle,
700 prior_stages: &[StagedHandle],
701 ) -> Result<StagedHandle> {
702 let staged_writes = staged_handles_as_writes(prior_stages);
703 TableStore::stage_append_stream(self, snapshot.dataset(), source.dataset(), &staged_writes)
704 .await
705 .map(StagedHandle::new)
706 }
707
708 async fn stage_merge_insert(
709 &self,
710 snapshot: SnapshotHandle,
711 batch: RecordBatch,
712 key_columns: Vec<String>,
713 when_matched: WhenMatched,
714 when_not_matched: WhenNotMatched,
715 ) -> Result<StagedHandle> {
716 let ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
717 TableStore::stage_merge_insert(self, ds, batch, key_columns, when_matched, when_not_matched)
718 .await
719 .map(StagedHandle::new)
720 }
721
722 async fn commit_staged(
723 &self,
724 snapshot: SnapshotHandle,
725 staged: StagedHandle,
726 ) -> Result<SnapshotHandle> {
727 let ds_arc = snapshot.into_arc();
728 let transaction = staged.into_staged().transaction;
729 TableStore::commit_staged(self, ds_arc, transaction)
730 .await
731 .map(SnapshotHandle::new)
732 }
733
734 async fn stage_overwrite(
735 &self,
736 snapshot: &SnapshotHandle,
737 batch: RecordBatch,
738 ) -> Result<StagedHandle> {
739 TableStore::stage_overwrite(self, snapshot.dataset(), batch)
740 .await
741 .map(StagedHandle::new)
742 }
743
744 async fn stage_create_btree_index(
745 &self,
746 snapshot: &SnapshotHandle,
747 columns: &[&str],
748 ) -> Result<StagedHandle> {
749 TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
750 .await
751 .map(StagedHandle::new)
752 }
753
754 async fn stage_create_inverted_index(
755 &self,
756 snapshot: &SnapshotHandle,
757 column: &str,
758 ) -> Result<StagedHandle> {
759 TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
760 .await
761 .map(StagedHandle::new)
762 }
763
764 async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
765 TableStore::has_btree_index(self, snapshot.dataset(), column).await
766 }
767
768 async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
769 TableStore::has_fts_index(self, snapshot.dataset(), column).await
770 }
771
772 async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
773 TableStore::has_vector_index(self, snapshot.dataset(), column).await
774 }
775
776 fn root_uri(&self) -> &str {
777 TableStore::root_uri(self)
778 }
779
780 fn dataset_uri(&self, table_path: &str) -> String {
781 TableStore::dataset_uri(self, table_path)
782 }
783
784 async fn scan_stream(
785 &self,
786 snapshot: &SnapshotHandle,
787 projection: Option<&[&str]>,
788 filter: Option<&str>,
789 order_by: Option<Vec<ColumnOrdering>>,
790 with_row_id: bool,
791 ) -> Result<DatasetRecordBatchStream> {
792 TableStore::scan_stream(
796 snapshot.dataset(),
797 projection,
798 filter,
799 order_by,
800 with_row_id,
801 )
802 .await
803 }
804}
805
806#[async_trait]
807impl InlineCommitResidual for TableStore {
808 async fn delete_where(
809 &self,
810 dataset_uri: &str,
811 snapshot: SnapshotHandle,
812 filter: &str,
813 ) -> Result<(SnapshotHandle, DeleteState)> {
814 let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
815 let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
816 Ok((SnapshotHandle::new(ds), state))
817 }
818
819 async fn create_vector_index(
820 &self,
821 snapshot: SnapshotHandle,
822 column: &str,
823 ) -> Result<SnapshotHandle> {
824 let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
825 TableStore::create_vector_index(self, &mut ds, column).await?;
826 Ok(SnapshotHandle::new(ds))
827 }
828}