1use std::fmt::Debug;
54use std::sync::Arc;
55
56use arrow_array::RecordBatch;
57use arrow_schema::SchemaRef;
58use async_trait::async_trait;
59use lance::Dataset;
60use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
61use lance::dataset::{WhenMatched, WhenNotMatched};
62
63use crate::db::{Snapshot, SubTableEntry};
64use crate::error::Result;
65use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
66
67pub(crate) mod sealed {
70 pub trait Sealed {}
75
76 impl Sealed for crate::table_store::TableStore {}
77}
78
79#[derive(Debug, Clone)]
89pub struct SnapshotHandle {
90 pub(crate) inner: Arc<Dataset>,
91}
92
93impl SnapshotHandle {
94 pub(crate) fn new(ds: Dataset) -> Self {
97 Self {
98 inner: Arc::new(ds),
99 }
100 }
101
102 pub(crate) fn dataset(&self) -> &Dataset {
105 &self.inner
106 }
107
108 pub(crate) fn into_arc(self) -> Arc<Dataset> {
111 self.inner
112 }
113
114 pub fn version(&self) -> u64 {
118 self.inner.version().version
119 }
120
121 pub fn uses_stable_row_ids(&self) -> bool {
123 self.inner.manifest.uses_stable_row_ids()
124 }
125}
126
127#[derive(Debug, Clone)]
134pub struct StagedHandle {
135 pub(crate) inner: StagedWrite,
136}
137
138impl StagedHandle {
139 pub(crate) fn new(staged: StagedWrite) -> Self {
140 Self { inner: staged }
141 }
142
143 pub(crate) fn into_staged(self) -> StagedWrite {
146 self.inner
147 }
148}
149
150pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
157 handles.iter().map(|h| h.inner.clone()).collect()
158}
159
160#[async_trait]
169pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
170 async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
173
174 async fn open_snapshot_at_table(
175 &self,
176 snapshot: &Snapshot,
177 table_key: &str,
178 ) -> Result<SnapshotHandle>;
179
180 async fn open_dataset_head(
181 &self,
182 dataset_uri: &str,
183 branch: Option<&str>,
184 ) -> Result<SnapshotHandle>;
185
186 async fn open_dataset_head_for_write(
187 &self,
188 table_key: &str,
189 dataset_uri: &str,
190 branch: Option<&str>,
191 ) -> Result<SnapshotHandle>;
192
193 async fn open_dataset_at_state(
194 &self,
195 table_path: &str,
196 branch: Option<&str>,
197 version: u64,
198 ) -> Result<SnapshotHandle>;
199
200 async fn fork_branch_from_state(
201 &self,
202 dataset_uri: &str,
203 source_branch: Option<&str>,
204 table_key: &str,
205 source_version: u64,
206 target_branch: &str,
207 ) -> Result<SnapshotHandle>;
208
209 async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
210
211 async fn reopen_for_mutation(
212 &self,
213 dataset_uri: &str,
214 branch: Option<&str>,
215 table_key: &str,
216 expected_version: u64,
217 ) -> Result<SnapshotHandle>;
218
219 fn ensure_expected_version(
220 &self,
221 snapshot: &SnapshotHandle,
222 table_key: &str,
223 expected_version: u64,
224 ) -> Result<()>;
225
226 async fn scan(
229 &self,
230 snapshot: &SnapshotHandle,
231 projection: Option<&[&str]>,
232 filter: Option<&str>,
233 order_by: Option<Vec<ColumnOrdering>>,
234 ) -> Result<Vec<RecordBatch>>;
235
236 async fn scan_with_row_id(
237 &self,
238 snapshot: &SnapshotHandle,
239 projection: Option<&[&str]>,
240 filter: Option<&str>,
241 order_by: Option<Vec<ColumnOrdering>>,
242 with_row_id: bool,
243 ) -> Result<Vec<RecordBatch>>;
244
245 async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
246
247 async fn scan_batches_for_rewrite(&self, snapshot: &SnapshotHandle)
248 -> Result<Vec<RecordBatch>>;
249
250 async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize>;
251
252 async fn count_rows_with_staged(
253 &self,
254 snapshot: &SnapshotHandle,
255 staged: &[StagedHandle],
256 filter: Option<String>,
257 ) -> Result<usize>;
258
259 async fn scan_with_staged(
260 &self,
261 snapshot: &SnapshotHandle,
262 staged: &[StagedHandle],
263 projection: Option<&[&str]>,
264 filter: Option<&str>,
265 ) -> Result<Vec<RecordBatch>>;
266
267 async fn scan_with_pending(
268 &self,
269 snapshot: &SnapshotHandle,
270 pending: &[RecordBatch],
271 pending_schema: Option<SchemaRef>,
272 projection: Option<&[&str]>,
273 filter: Option<&str>,
274 key_column: Option<&str>,
275 ) -> Result<Vec<RecordBatch>>;
276
277 async fn first_row_id_for_filter(
278 &self,
279 snapshot: &SnapshotHandle,
280 filter: &str,
281 ) -> Result<Option<u64>>;
282
283 async fn table_state(&self, dataset_uri: &str, snapshot: &SnapshotHandle)
284 -> Result<TableState>;
285
286 async fn stage_append(
289 &self,
290 snapshot: &SnapshotHandle,
291 batch: RecordBatch,
292 prior_stages: &[StagedHandle],
293 ) -> Result<StagedHandle>;
294
295 async fn stage_merge_insert(
296 &self,
297 snapshot: SnapshotHandle,
298 batch: RecordBatch,
299 key_columns: Vec<String>,
300 when_matched: WhenMatched,
301 when_not_matched: WhenNotMatched,
302 ) -> Result<StagedHandle>;
303
304 async fn commit_staged(
305 &self,
306 snapshot: SnapshotHandle,
307 staged: StagedHandle,
308 ) -> Result<SnapshotHandle>;
309
310 async fn stage_overwrite(
312 &self,
313 snapshot: &SnapshotHandle,
314 batch: RecordBatch,
315 ) -> Result<StagedHandle>;
316
317 async fn stage_create_btree_index(
319 &self,
320 snapshot: &SnapshotHandle,
321 columns: &[&str],
322 ) -> Result<StagedHandle>;
323
324 async fn stage_create_inverted_index(
326 &self,
327 snapshot: &SnapshotHandle,
328 column: &str,
329 ) -> Result<StagedHandle>;
330
331 async fn append_batch(
347 &self,
348 dataset_uri: &str,
349 snapshot: SnapshotHandle,
350 batch: RecordBatch,
351 ) -> Result<(SnapshotHandle, TableState)>;
352
353 async fn merge_insert_batches(
354 &self,
355 dataset_uri: &str,
356 snapshot: SnapshotHandle,
357 batches: Vec<RecordBatch>,
358 key_columns: Vec<String>,
359 when_matched: WhenMatched,
360 when_not_matched: WhenNotMatched,
361 ) -> Result<TableState>;
362
363 async fn overwrite_batch(
364 &self,
365 dataset_uri: &str,
366 snapshot: SnapshotHandle,
367 batch: RecordBatch,
368 ) -> Result<(SnapshotHandle, TableState)>;
369
370 async fn delete_where(
371 &self,
372 dataset_uri: &str,
373 snapshot: SnapshotHandle,
374 filter: &str,
375 ) -> Result<(SnapshotHandle, DeleteState)>;
376
377 async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
378 async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
379 async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
380
381 async fn create_btree_index(
382 &self,
383 snapshot: SnapshotHandle,
384 columns: &[&str],
385 ) -> Result<SnapshotHandle>;
386
387 async fn create_inverted_index(
388 &self,
389 snapshot: SnapshotHandle,
390 column: &str,
391 ) -> Result<SnapshotHandle>;
392
393 async fn create_vector_index(
394 &self,
395 snapshot: SnapshotHandle,
396 column: &str,
397 ) -> Result<SnapshotHandle>;
398
399 fn root_uri(&self) -> &str;
406 fn dataset_uri(&self, table_path: &str) -> String;
407
408 async fn scan_stream(
416 &self,
417 snapshot: &SnapshotHandle,
418 projection: Option<&[&str]>,
419 filter: Option<&str>,
420 order_by: Option<Vec<ColumnOrdering>>,
421 with_row_id: bool,
422 ) -> Result<DatasetRecordBatchStream>;
423}
424
425#[async_trait]
428impl TableStorage for TableStore {
429 async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
430 self.open_at_entry(entry).await.map(SnapshotHandle::new)
431 }
432
433 async fn open_snapshot_at_table(
434 &self,
435 snapshot: &Snapshot,
436 table_key: &str,
437 ) -> Result<SnapshotHandle> {
438 self.open_snapshot_table(snapshot, table_key)
439 .await
440 .map(SnapshotHandle::new)
441 }
442
443 async fn open_dataset_head(
444 &self,
445 dataset_uri: &str,
446 branch: Option<&str>,
447 ) -> Result<SnapshotHandle> {
448 TableStore::open_dataset_head(self, dataset_uri, branch)
449 .await
450 .map(SnapshotHandle::new)
451 }
452
453 async fn open_dataset_head_for_write(
454 &self,
455 table_key: &str,
456 dataset_uri: &str,
457 branch: Option<&str>,
458 ) -> Result<SnapshotHandle> {
459 TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
460 .await
461 .map(SnapshotHandle::new)
462 }
463
464 async fn open_dataset_at_state(
465 &self,
466 table_path: &str,
467 branch: Option<&str>,
468 version: u64,
469 ) -> Result<SnapshotHandle> {
470 TableStore::open_dataset_at_state(self, table_path, branch, version)
471 .await
472 .map(SnapshotHandle::new)
473 }
474
475 async fn fork_branch_from_state(
476 &self,
477 dataset_uri: &str,
478 source_branch: Option<&str>,
479 table_key: &str,
480 source_version: u64,
481 target_branch: &str,
482 ) -> Result<SnapshotHandle> {
483 TableStore::fork_branch_from_state(
484 self,
485 dataset_uri,
486 source_branch,
487 table_key,
488 source_version,
489 target_branch,
490 )
491 .await
492 .map(SnapshotHandle::new)
493 }
494
495 async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
496 TableStore::delete_branch(self, dataset_uri, branch).await
497 }
498
499 async fn reopen_for_mutation(
500 &self,
501 dataset_uri: &str,
502 branch: Option<&str>,
503 table_key: &str,
504 expected_version: u64,
505 ) -> Result<SnapshotHandle> {
506 TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
507 .await
508 .map(SnapshotHandle::new)
509 }
510
511 fn ensure_expected_version(
512 &self,
513 snapshot: &SnapshotHandle,
514 table_key: &str,
515 expected_version: u64,
516 ) -> Result<()> {
517 TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
518 }
519
520 async fn scan(
521 &self,
522 snapshot: &SnapshotHandle,
523 projection: Option<&[&str]>,
524 filter: Option<&str>,
525 order_by: Option<Vec<ColumnOrdering>>,
526 ) -> Result<Vec<RecordBatch>> {
527 TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
528 }
529
530 async fn scan_with_row_id(
531 &self,
532 snapshot: &SnapshotHandle,
533 projection: Option<&[&str]>,
534 filter: Option<&str>,
535 order_by: Option<Vec<ColumnOrdering>>,
536 with_row_id: bool,
537 ) -> Result<Vec<RecordBatch>> {
538 TableStore::scan_with(
539 self,
540 snapshot.dataset(),
541 projection,
542 filter,
543 order_by,
544 with_row_id,
545 |_| Ok(()),
546 )
547 .await
548 }
549
550 async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
551 TableStore::scan_batches(self, snapshot.dataset()).await
552 }
553
554 async fn scan_batches_for_rewrite(
555 &self,
556 snapshot: &SnapshotHandle,
557 ) -> Result<Vec<RecordBatch>> {
558 TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
559 }
560
561 async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize> {
562 TableStore::count_rows(self, snapshot.dataset(), filter).await
563 }
564
565 async fn count_rows_with_staged(
566 &self,
567 snapshot: &SnapshotHandle,
568 staged: &[StagedHandle],
569 filter: Option<String>,
570 ) -> Result<usize> {
571 let staged_writes = staged_handles_as_writes(staged);
572 TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
573 }
574
575 async fn scan_with_staged(
576 &self,
577 snapshot: &SnapshotHandle,
578 staged: &[StagedHandle],
579 projection: Option<&[&str]>,
580 filter: Option<&str>,
581 ) -> Result<Vec<RecordBatch>> {
582 let staged_writes = staged_handles_as_writes(staged);
583 TableStore::scan_with_staged(self, snapshot.dataset(), &staged_writes, projection, filter)
584 .await
585 }
586
587 async fn scan_with_pending(
588 &self,
589 snapshot: &SnapshotHandle,
590 pending: &[RecordBatch],
591 pending_schema: Option<SchemaRef>,
592 projection: Option<&[&str]>,
593 filter: Option<&str>,
594 key_column: Option<&str>,
595 ) -> Result<Vec<RecordBatch>> {
596 TableStore::scan_with_pending(
597 self,
598 snapshot.dataset(),
599 pending,
600 pending_schema,
601 projection,
602 filter,
603 key_column,
604 )
605 .await
606 }
607
608 async fn first_row_id_for_filter(
609 &self,
610 snapshot: &SnapshotHandle,
611 filter: &str,
612 ) -> Result<Option<u64>> {
613 TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
614 }
615
616 async fn table_state(
617 &self,
618 dataset_uri: &str,
619 snapshot: &SnapshotHandle,
620 ) -> Result<TableState> {
621 TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
622 }
623
624 async fn stage_append(
625 &self,
626 snapshot: &SnapshotHandle,
627 batch: RecordBatch,
628 prior_stages: &[StagedHandle],
629 ) -> Result<StagedHandle> {
630 let staged_writes = staged_handles_as_writes(prior_stages);
631 TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
632 .await
633 .map(StagedHandle::new)
634 }
635
636 async fn stage_merge_insert(
637 &self,
638 snapshot: SnapshotHandle,
639 batch: RecordBatch,
640 key_columns: Vec<String>,
641 when_matched: WhenMatched,
642 when_not_matched: WhenNotMatched,
643 ) -> Result<StagedHandle> {
644 let ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
645 TableStore::stage_merge_insert(self, ds, batch, key_columns, when_matched, when_not_matched)
646 .await
647 .map(StagedHandle::new)
648 }
649
650 async fn commit_staged(
651 &self,
652 snapshot: SnapshotHandle,
653 staged: StagedHandle,
654 ) -> Result<SnapshotHandle> {
655 let ds_arc = snapshot.into_arc();
656 let transaction = staged.into_staged().transaction;
657 TableStore::commit_staged(self, ds_arc, transaction)
658 .await
659 .map(SnapshotHandle::new)
660 }
661
662 async fn stage_overwrite(
663 &self,
664 snapshot: &SnapshotHandle,
665 batch: RecordBatch,
666 ) -> Result<StagedHandle> {
667 TableStore::stage_overwrite(self, snapshot.dataset(), batch)
668 .await
669 .map(StagedHandle::new)
670 }
671
672 async fn stage_create_btree_index(
673 &self,
674 snapshot: &SnapshotHandle,
675 columns: &[&str],
676 ) -> Result<StagedHandle> {
677 TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
678 .await
679 .map(StagedHandle::new)
680 }
681
682 async fn stage_create_inverted_index(
683 &self,
684 snapshot: &SnapshotHandle,
685 column: &str,
686 ) -> Result<StagedHandle> {
687 TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
688 .await
689 .map(StagedHandle::new)
690 }
691
692 async fn append_batch(
693 &self,
694 dataset_uri: &str,
695 snapshot: SnapshotHandle,
696 batch: RecordBatch,
697 ) -> Result<(SnapshotHandle, TableState)> {
698 let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
699 let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?;
700 Ok((SnapshotHandle::new(ds), state))
701 }
702
703 async fn merge_insert_batches(
704 &self,
705 dataset_uri: &str,
706 snapshot: SnapshotHandle,
707 batches: Vec<RecordBatch>,
708 key_columns: Vec<String>,
709 when_matched: WhenMatched,
710 when_not_matched: WhenNotMatched,
711 ) -> Result<TableState> {
712 let ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
713 TableStore::merge_insert_batches(
714 self,
715 dataset_uri,
716 ds,
717 batches,
718 key_columns,
719 when_matched,
720 when_not_matched,
721 )
722 .await
723 }
724
725 async fn overwrite_batch(
726 &self,
727 dataset_uri: &str,
728 snapshot: SnapshotHandle,
729 batch: RecordBatch,
730 ) -> Result<(SnapshotHandle, TableState)> {
731 let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
732 let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
733 Ok((SnapshotHandle::new(ds), state))
734 }
735
736 async fn delete_where(
737 &self,
738 dataset_uri: &str,
739 snapshot: SnapshotHandle,
740 filter: &str,
741 ) -> Result<(SnapshotHandle, DeleteState)> {
742 let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
743 let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
744 Ok((SnapshotHandle::new(ds), state))
745 }
746
747 async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
748 TableStore::has_btree_index(self, snapshot.dataset(), column).await
749 }
750
751 async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
752 TableStore::has_fts_index(self, snapshot.dataset(), column).await
753 }
754
755 async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
756 TableStore::has_vector_index(self, snapshot.dataset(), column).await
757 }
758
759 async fn create_btree_index(
760 &self,
761 snapshot: SnapshotHandle,
762 columns: &[&str],
763 ) -> Result<SnapshotHandle> {
764 let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
765 TableStore::create_btree_index(self, &mut ds, columns).await?;
766 Ok(SnapshotHandle::new(ds))
767 }
768
769 async fn create_inverted_index(
770 &self,
771 snapshot: SnapshotHandle,
772 column: &str,
773 ) -> Result<SnapshotHandle> {
774 let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
775 TableStore::create_inverted_index(self, &mut ds, column).await?;
776 Ok(SnapshotHandle::new(ds))
777 }
778
779 async fn create_vector_index(
780 &self,
781 snapshot: SnapshotHandle,
782 column: &str,
783 ) -> Result<SnapshotHandle> {
784 let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
785 TableStore::create_vector_index(self, &mut ds, column).await?;
786 Ok(SnapshotHandle::new(ds))
787 }
788
789 fn root_uri(&self) -> &str {
790 TableStore::root_uri(self)
791 }
792
793 fn dataset_uri(&self, table_path: &str) -> String {
794 TableStore::dataset_uri(self, table_path)
795 }
796
797 async fn scan_stream(
798 &self,
799 snapshot: &SnapshotHandle,
800 projection: Option<&[&str]>,
801 filter: Option<&str>,
802 order_by: Option<Vec<ColumnOrdering>>,
803 with_row_id: bool,
804 ) -> Result<DatasetRecordBatchStream> {
805 TableStore::scan_stream(
809 snapshot.dataset(),
810 projection,
811 filter,
812 order_by,
813 with_row_id,
814 )
815 .await
816 }
817}