1use std::fmt::Debug;
56use std::sync::Arc;
57
58use arrow_array::RecordBatch;
59use arrow_schema::SchemaRef;
60use async_trait::async_trait;
61use lance::Dataset;
62use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
63use lance::dataset::{WhenMatched, WhenNotMatched};
64
65use crate::db::{Snapshot, SubTableEntry};
66use crate::error::Result;
67use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
68
69pub(crate) mod sealed {
72 pub trait Sealed {}
77
78 impl Sealed for crate::table_store::TableStore {}
79}
80
81#[derive(Debug, Clone)]
91pub struct SnapshotHandle {
92 pub(crate) inner: Arc<Dataset>,
93}
94
95impl SnapshotHandle {
96 pub(crate) fn new(ds: Dataset) -> Self {
99 Self {
100 inner: Arc::new(ds),
101 }
102 }
103
104 pub(crate) fn dataset(&self) -> &Dataset {
107 &self.inner
108 }
109
110 pub(crate) fn into_arc(self) -> Arc<Dataset> {
123 self.inner
124 }
125
126 pub(crate) fn into_dataset(self) -> Dataset {
138 Arc::try_unwrap(self.inner).unwrap_or_else(|arc| (*arc).clone())
139 }
140
141 pub fn version(&self) -> u64 {
145 self.inner.version().version
146 }
147
148 pub fn uses_stable_row_ids(&self) -> bool {
150 self.inner.manifest.uses_stable_row_ids()
151 }
152}
153
154#[derive(Debug, Clone)]
161pub struct StagedHandle {
162 pub(crate) inner: StagedWrite,
163}
164
165impl StagedHandle {
166 pub(crate) fn new(staged: StagedWrite) -> Self {
167 Self { inner: staged }
168 }
169
170 pub(crate) fn into_staged(self) -> StagedWrite {
173 self.inner
174 }
175}
176
177pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
184 handles.iter().map(|h| h.inner.clone()).collect()
185}
186
187pub enum ForkOutcome<D> {
203 Created(D),
204 RefAlreadyExists,
205}
206
207#[async_trait]
216pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
217 async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
220
221 async fn open_snapshot_at_table(
222 &self,
223 snapshot: &Snapshot,
224 table_key: &str,
225 ) -> Result<SnapshotHandle>;
226
227 async fn open_dataset_head(
228 &self,
229 dataset_uri: &str,
230 branch: Option<&str>,
231 ) -> Result<SnapshotHandle>;
232
233 async fn open_dataset_head_for_write(
234 &self,
235 table_key: &str,
236 dataset_uri: &str,
237 branch: Option<&str>,
238 ) -> Result<SnapshotHandle>;
239
240 async fn open_dataset_at_state(
241 &self,
242 table_path: &str,
243 branch: Option<&str>,
244 version: u64,
245 ) -> Result<SnapshotHandle>;
246
247 async fn fork_branch_from_state(
248 &self,
249 dataset_uri: &str,
250 source_branch: Option<&str>,
251 table_key: &str,
252 source_version: u64,
253 target_branch: &str,
254 ) -> Result<ForkOutcome<SnapshotHandle>>;
255
256 async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
257
258 async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
265
266 async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>>;
271
272 async fn reopen_for_mutation(
273 &self,
274 dataset_uri: &str,
275 branch: Option<&str>,
276 table_key: &str,
277 expected_version: u64,
278 ) -> Result<SnapshotHandle>;
279
280 fn ensure_expected_version(
281 &self,
282 snapshot: &SnapshotHandle,
283 table_key: &str,
284 expected_version: u64,
285 ) -> Result<()>;
286
287 async fn scan(
290 &self,
291 snapshot: &SnapshotHandle,
292 projection: Option<&[&str]>,
293 filter: Option<&str>,
294 order_by: Option<Vec<ColumnOrdering>>,
295 ) -> Result<Vec<RecordBatch>>;
296
297 async fn scan_with_row_id(
298 &self,
299 snapshot: &SnapshotHandle,
300 projection: Option<&[&str]>,
301 filter: Option<&str>,
302 order_by: Option<Vec<ColumnOrdering>>,
303 with_row_id: bool,
304 ) -> Result<Vec<RecordBatch>>;
305
306 async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
307
308 async fn scan_batches_for_rewrite(&self, snapshot: &SnapshotHandle)
309 -> Result<Vec<RecordBatch>>;
310
311 async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize>;
312
313 async fn count_rows_with_staged(
314 &self,
315 snapshot: &SnapshotHandle,
316 staged: &[StagedHandle],
317 filter: Option<String>,
318 ) -> Result<usize>;
319
320 async fn scan_with_staged(
321 &self,
322 snapshot: &SnapshotHandle,
323 staged: &[StagedHandle],
324 projection: Option<&[&str]>,
325 filter: Option<&str>,
326 ) -> Result<Vec<RecordBatch>>;
327
328 async fn scan_with_pending(
329 &self,
330 snapshot: &SnapshotHandle,
331 pending: &[RecordBatch],
332 pending_schema: Option<SchemaRef>,
333 projection: Option<&[&str]>,
334 filter: Option<&str>,
335 key_column: Option<&str>,
336 ) -> Result<Vec<RecordBatch>>;
337
338 async fn first_row_id_for_filter(
339 &self,
340 snapshot: &SnapshotHandle,
341 filter: &str,
342 ) -> Result<Option<u64>>;
343
344 async fn table_state(&self, dataset_uri: &str, snapshot: &SnapshotHandle)
345 -> Result<TableState>;
346
347 async fn stage_append(
350 &self,
351 snapshot: &SnapshotHandle,
352 batch: RecordBatch,
353 prior_stages: &[StagedHandle],
354 ) -> Result<StagedHandle>;
355
356 async fn stage_merge_insert(
357 &self,
358 snapshot: SnapshotHandle,
359 batch: RecordBatch,
360 key_columns: Vec<String>,
361 when_matched: WhenMatched,
362 when_not_matched: WhenNotMatched,
363 ) -> Result<StagedHandle>;
364
365 async fn commit_staged(
366 &self,
367 snapshot: SnapshotHandle,
368 staged: StagedHandle,
369 ) -> Result<SnapshotHandle>;
370
371 async fn stage_overwrite(
373 &self,
374 snapshot: &SnapshotHandle,
375 batch: RecordBatch,
376 ) -> Result<StagedHandle>;
377
378 async fn stage_create_btree_index(
380 &self,
381 snapshot: &SnapshotHandle,
382 columns: &[&str],
383 ) -> Result<StagedHandle>;
384
385 async fn stage_create_inverted_index(
387 &self,
388 snapshot: &SnapshotHandle,
389 column: &str,
390 ) -> Result<StagedHandle>;
391
392 async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
402 async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
403 async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
404
405 fn root_uri(&self) -> &str;
412 fn dataset_uri(&self, table_path: &str) -> String;
413
414 async fn scan_stream(
422 &self,
423 snapshot: &SnapshotHandle,
424 projection: Option<&[&str]>,
425 filter: Option<&str>,
426 order_by: Option<Vec<ColumnOrdering>>,
427 with_row_id: bool,
428 ) -> Result<DatasetRecordBatchStream>;
429}
430
431#[async_trait]
448pub(crate) trait InlineCommitResidual: sealed::Sealed + Send + Sync + Debug {
449 async fn delete_where(
450 &self,
451 dataset_uri: &str,
452 snapshot: SnapshotHandle,
453 filter: &str,
454 ) -> Result<(SnapshotHandle, DeleteState)>;
455
456 async fn create_vector_index(
457 &self,
458 snapshot: SnapshotHandle,
459 column: &str,
460 ) -> Result<SnapshotHandle>;
461}
462
463#[async_trait]
466impl TableStorage for TableStore {
467 async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
468 self.open_at_entry(entry).await.map(SnapshotHandle::new)
469 }
470
471 async fn open_snapshot_at_table(
472 &self,
473 snapshot: &Snapshot,
474 table_key: &str,
475 ) -> Result<SnapshotHandle> {
476 self.open_snapshot_table(snapshot, table_key)
477 .await
478 .map(SnapshotHandle::new)
479 }
480
481 async fn open_dataset_head(
482 &self,
483 dataset_uri: &str,
484 branch: Option<&str>,
485 ) -> Result<SnapshotHandle> {
486 TableStore::open_dataset_head(self, dataset_uri, branch)
487 .await
488 .map(SnapshotHandle::new)
489 }
490
491 async fn open_dataset_head_for_write(
492 &self,
493 table_key: &str,
494 dataset_uri: &str,
495 branch: Option<&str>,
496 ) -> Result<SnapshotHandle> {
497 TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
498 .await
499 .map(SnapshotHandle::new)
500 }
501
502 async fn open_dataset_at_state(
503 &self,
504 table_path: &str,
505 branch: Option<&str>,
506 version: u64,
507 ) -> Result<SnapshotHandle> {
508 TableStore::open_dataset_at_state(self, table_path, branch, version)
509 .await
510 .map(SnapshotHandle::new)
511 }
512
513 async fn fork_branch_from_state(
514 &self,
515 dataset_uri: &str,
516 source_branch: Option<&str>,
517 table_key: &str,
518 source_version: u64,
519 target_branch: &str,
520 ) -> Result<ForkOutcome<SnapshotHandle>> {
521 Ok(
522 match TableStore::fork_branch_from_state(
523 self,
524 dataset_uri,
525 source_branch,
526 table_key,
527 source_version,
528 target_branch,
529 )
530 .await?
531 {
532 ForkOutcome::Created(ds) => ForkOutcome::Created(SnapshotHandle::new(ds)),
533 ForkOutcome::RefAlreadyExists => ForkOutcome::RefAlreadyExists,
534 },
535 )
536 }
537
538 async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
539 TableStore::delete_branch(self, dataset_uri, branch).await
540 }
541
542 async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
543 TableStore::force_delete_branch(self, dataset_uri, branch).await
544 }
545
546 async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>> {
547 TableStore::list_branches(self, dataset_uri).await
548 }
549
550 async fn reopen_for_mutation(
551 &self,
552 dataset_uri: &str,
553 branch: Option<&str>,
554 table_key: &str,
555 expected_version: u64,
556 ) -> Result<SnapshotHandle> {
557 TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
558 .await
559 .map(SnapshotHandle::new)
560 }
561
562 fn ensure_expected_version(
563 &self,
564 snapshot: &SnapshotHandle,
565 table_key: &str,
566 expected_version: u64,
567 ) -> Result<()> {
568 TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
569 }
570
571 async fn scan(
572 &self,
573 snapshot: &SnapshotHandle,
574 projection: Option<&[&str]>,
575 filter: Option<&str>,
576 order_by: Option<Vec<ColumnOrdering>>,
577 ) -> Result<Vec<RecordBatch>> {
578 TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
579 }
580
581 async fn scan_with_row_id(
582 &self,
583 snapshot: &SnapshotHandle,
584 projection: Option<&[&str]>,
585 filter: Option<&str>,
586 order_by: Option<Vec<ColumnOrdering>>,
587 with_row_id: bool,
588 ) -> Result<Vec<RecordBatch>> {
589 TableStore::scan_with(
590 self,
591 snapshot.dataset(),
592 projection,
593 filter,
594 order_by,
595 with_row_id,
596 |_| Ok(()),
597 )
598 .await
599 }
600
601 async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
602 TableStore::scan_batches(self, snapshot.dataset()).await
603 }
604
605 async fn scan_batches_for_rewrite(
606 &self,
607 snapshot: &SnapshotHandle,
608 ) -> Result<Vec<RecordBatch>> {
609 TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
610 }
611
612 async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize> {
613 TableStore::count_rows(self, snapshot.dataset(), filter).await
614 }
615
616 async fn count_rows_with_staged(
617 &self,
618 snapshot: &SnapshotHandle,
619 staged: &[StagedHandle],
620 filter: Option<String>,
621 ) -> Result<usize> {
622 let staged_writes = staged_handles_as_writes(staged);
623 TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
624 }
625
626 async fn scan_with_staged(
627 &self,
628 snapshot: &SnapshotHandle,
629 staged: &[StagedHandle],
630 projection: Option<&[&str]>,
631 filter: Option<&str>,
632 ) -> Result<Vec<RecordBatch>> {
633 let staged_writes = staged_handles_as_writes(staged);
634 TableStore::scan_with_staged(self, snapshot.dataset(), &staged_writes, projection, filter)
635 .await
636 }
637
638 async fn scan_with_pending(
639 &self,
640 snapshot: &SnapshotHandle,
641 pending: &[RecordBatch],
642 pending_schema: Option<SchemaRef>,
643 projection: Option<&[&str]>,
644 filter: Option<&str>,
645 key_column: Option<&str>,
646 ) -> Result<Vec<RecordBatch>> {
647 TableStore::scan_with_pending(
648 self,
649 snapshot.dataset(),
650 pending,
651 pending_schema,
652 projection,
653 filter,
654 key_column,
655 )
656 .await
657 }
658
659 async fn first_row_id_for_filter(
660 &self,
661 snapshot: &SnapshotHandle,
662 filter: &str,
663 ) -> Result<Option<u64>> {
664 TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
665 }
666
667 async fn table_state(
668 &self,
669 dataset_uri: &str,
670 snapshot: &SnapshotHandle,
671 ) -> Result<TableState> {
672 TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
673 }
674
675 async fn stage_append(
676 &self,
677 snapshot: &SnapshotHandle,
678 batch: RecordBatch,
679 prior_stages: &[StagedHandle],
680 ) -> Result<StagedHandle> {
681 let staged_writes = staged_handles_as_writes(prior_stages);
682 TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
683 .await
684 .map(StagedHandle::new)
685 }
686
687 async fn stage_merge_insert(
688 &self,
689 snapshot: SnapshotHandle,
690 batch: RecordBatch,
691 key_columns: Vec<String>,
692 when_matched: WhenMatched,
693 when_not_matched: WhenNotMatched,
694 ) -> Result<StagedHandle> {
695 let ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
696 TableStore::stage_merge_insert(self, ds, batch, key_columns, when_matched, when_not_matched)
697 .await
698 .map(StagedHandle::new)
699 }
700
701 async fn commit_staged(
702 &self,
703 snapshot: SnapshotHandle,
704 staged: StagedHandle,
705 ) -> Result<SnapshotHandle> {
706 let ds_arc = snapshot.into_arc();
707 let transaction = staged.into_staged().transaction;
708 TableStore::commit_staged(self, ds_arc, transaction)
709 .await
710 .map(SnapshotHandle::new)
711 }
712
713 async fn stage_overwrite(
714 &self,
715 snapshot: &SnapshotHandle,
716 batch: RecordBatch,
717 ) -> Result<StagedHandle> {
718 TableStore::stage_overwrite(self, snapshot.dataset(), batch)
719 .await
720 .map(StagedHandle::new)
721 }
722
723 async fn stage_create_btree_index(
724 &self,
725 snapshot: &SnapshotHandle,
726 columns: &[&str],
727 ) -> Result<StagedHandle> {
728 TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
729 .await
730 .map(StagedHandle::new)
731 }
732
733 async fn stage_create_inverted_index(
734 &self,
735 snapshot: &SnapshotHandle,
736 column: &str,
737 ) -> Result<StagedHandle> {
738 TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
739 .await
740 .map(StagedHandle::new)
741 }
742
743 async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
744 TableStore::has_btree_index(self, snapshot.dataset(), column).await
745 }
746
747 async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
748 TableStore::has_fts_index(self, snapshot.dataset(), column).await
749 }
750
751 async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
752 TableStore::has_vector_index(self, snapshot.dataset(), column).await
753 }
754
755 fn root_uri(&self) -> &str {
756 TableStore::root_uri(self)
757 }
758
759 fn dataset_uri(&self, table_path: &str) -> String {
760 TableStore::dataset_uri(self, table_path)
761 }
762
763 async fn scan_stream(
764 &self,
765 snapshot: &SnapshotHandle,
766 projection: Option<&[&str]>,
767 filter: Option<&str>,
768 order_by: Option<Vec<ColumnOrdering>>,
769 with_row_id: bool,
770 ) -> Result<DatasetRecordBatchStream> {
771 TableStore::scan_stream(
775 snapshot.dataset(),
776 projection,
777 filter,
778 order_by,
779 with_row_id,
780 )
781 .await
782 }
783}
784
785#[async_trait]
786impl InlineCommitResidual for TableStore {
787 async fn delete_where(
788 &self,
789 dataset_uri: &str,
790 snapshot: SnapshotHandle,
791 filter: &str,
792 ) -> Result<(SnapshotHandle, DeleteState)> {
793 let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
794 let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
795 Ok((SnapshotHandle::new(ds), state))
796 }
797
798 async fn create_vector_index(
799 &self,
800 snapshot: SnapshotHandle,
801 column: &str,
802 ) -> Result<SnapshotHandle> {
803 let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
804 TableStore::create_vector_index(self, &mut ds, column).await?;
805 Ok(SnapshotHandle::new(ds))
806 }
807}