Skip to main content

omnigraph/
storage_layer.rs

1//! Storage trait surface — MR-793.
2//!
3//! `TableStorage` is the engine-internal trait that exposes the
4//! staged-write primitives (`stage_append`, `stage_merge_insert`,
5//! `stage_overwrite`, `stage_create_btree_index`,
6//! `stage_create_inverted_index`) plus `commit_staged` as the canonical
7//! way for new engine writers to advance Lance HEAD without coupling
8//! "write bytes" with "advance HEAD" in one Lance API call.
9//!
10//! ## Transitional residuals on the trait
11//!
12//! Several inline-commit methods remain on the trait surface as
13//! documented residuals: `delete_where` (Lance 4.0.0's `DeleteJob` is
14//! `pub(crate)` — see [#6658](https://github.com/lance-format/lance/issues/6658)),
15//! `create_vector_index` (segment-commit-path requires
16//! `build_index_metadata_from_segments` which is `pub(crate)` — see
17//! [#6666](https://github.com/lance-format/lance/issues/6666)), and the
18//! legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` /
19//! `create_btree_index` / `create_inverted_index` paths kept while
20//! engine call sites finish migrating off of them (Phase 1b / Phase 9
21//! of MR-793). These are named honestly at every call site; the
22//! forbidden-API guard test catches direct lance::* misuse outside the
23//! storage layer.
24//!
25//! ## Sealed
26//!
27//! `TableStorage: sealed::Sealed`. Only types in this crate can implement
28//! the trait, so a downstream crate cannot subvert the contract by
29//! providing its own impl.
30//!
31//! ## Opaque handles
32//!
33//! `SnapshotHandle` and `StagedHandle` wrap `lance::Dataset` and
34//! `StagedWrite` respectively. Their inner Lance types are
35//! `pub(crate)` — engine code outside `table_store` cannot reach
36//! through. This is the §III.9 alignment: `lance::Dataset` does not
37//! appear in trait signatures.
38//!
39//! ## Migration status (MR-793 PR #70)
40//!
41//! Phases 1a / 2 / 4 / 5 / 6 are landed: trait scaffolding, three new
42//! staged primitives (`stage_overwrite`, scalar index staging), and
43//! migration of `ensure_indices`, `branch_merge`, `schema_apply` onto
44//! the staged surface. Phase 1b (call-site conversion to
45//! `Arc<dyn TableStorage>`), Phase 9 (demote unused inline-commit
46//! methods to `pub(crate)`), Phase 7 (recovery reconciler — MR-847),
47//! and Phase 8 (index reconciler — MR-848) are deferred to follow-ups.
48
49use std::fmt::Debug;
50use std::sync::Arc;
51
52use arrow_array::RecordBatch;
53use arrow_schema::SchemaRef;
54use async_trait::async_trait;
55use lance::Dataset;
56use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
57use lance::dataset::{WhenMatched, WhenNotMatched};
58
59use crate::db::{Snapshot, SubTableEntry};
60use crate::error::Result;
61use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
62
63// ─── sealed module ──────────────────────────────────────────────────────────
64
65pub(crate) mod sealed {
66    /// Sealed marker — only types defined in `omnigraph-engine` can
67    /// implement `TableStorage`. Combined with the trait being the only
68    /// route to write APIs from engine code, this gives type-system
69    /// enforcement of the staged-write invariant.
70    pub trait Sealed {}
71
72    impl Sealed for crate::table_store::TableStore {}
73}
74
75// ─── opaque handles ────────────────────────────────────────────────────────
76
77/// Opaque handle to a snapshot of a single sub-table dataset at a
78/// specific version.
79///
80/// Engine code never sees `lance::Dataset` directly; it holds
81/// `SnapshotHandle` and passes it back to `TableStorage` methods.
82/// Inside this crate, `pub(crate)` accessors expose the inner
83/// `Arc<Dataset>` to the `TableStorage` impl.
84#[derive(Debug, Clone)]
85pub struct SnapshotHandle {
86    pub(crate) inner: Arc<Dataset>,
87}
88
89impl SnapshotHandle {
90    /// Construct from a Lance dataset. `pub(crate)` — only
91    /// `TableStore` should produce these.
92    pub(crate) fn new(ds: Dataset) -> Self {
93        Self { inner: Arc::new(ds) }
94    }
95
96    /// Borrow the underlying Lance dataset. `pub(crate)` so only the
97    /// `TableStorage` impl in this crate can reach through.
98    pub(crate) fn dataset(&self) -> &Dataset {
99        &self.inner
100    }
101
102    /// Take ownership of the inner `Arc<Dataset>`. Used when committing
103    /// staged writes (the call needs to consume the snapshot).
104    pub(crate) fn into_arc(self) -> Arc<Dataset> {
105        self.inner
106    }
107
108    // ── public, lance-free accessors ──
109
110    /// Current Lance manifest version of the snapshot.
111    pub fn version(&self) -> u64 {
112        self.inner.version().version
113    }
114
115    /// Whether the underlying dataset uses stable row IDs.
116    pub fn uses_stable_row_ids(&self) -> bool {
117        self.inner.manifest.uses_stable_row_ids()
118    }
119}
120
121/// Opaque handle to a staged Lance transaction (data write or scalar
122/// index build) that has not yet advanced HEAD.
123///
124/// Produced by `TableStorage::stage_*`, consumed by
125/// `TableStorage::commit_staged`. Carries the underlying `StagedWrite`
126/// (transaction + read-your-writes deltas) behind `pub(crate)`.
127#[derive(Debug, Clone)]
128pub struct StagedHandle {
129    pub(crate) inner: StagedWrite,
130}
131
132impl StagedHandle {
133    pub(crate) fn new(staged: StagedWrite) -> Self {
134        Self { inner: staged }
135    }
136
137    /// Take ownership of the inner `StagedWrite`. Used by
138    /// `commit_staged`.
139    pub(crate) fn into_staged(self) -> StagedWrite {
140        self.inner
141    }
142}
143
144/// Helper: clone the inner `StagedWrite` out of each `StagedHandle` and
145/// collect into a `Vec<StagedWrite>` for handing to
146/// `TableStore::stage_append`'s `prior_stages` parameter. The result is
147/// owned (not borrowed) — callers that already had a `&[StagedHandle]`
148/// pay a clone cost per element. `StagedWrite::clone` is cheap because
149/// `Transaction` and `Vec<Fragment>` are shallow-clone friendly.
150pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
151    handles.iter().map(|h| h.inner.clone()).collect()
152}
153
154// ─── TableStorage trait ────────────────────────────────────────────────────
155
156/// Engine-internal trait covering every Lance dataset operation an
157/// `omnigraph` engine call site might perform.
158///
159/// `TableStore` is the only `impl`. The trait is sealed; the inline
160/// Lance APIs are not reachable through trait dispatch. New writers that
161/// might advance Lance HEAD MUST add a staged-shape method here.
162#[async_trait]
163pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
164    // ── Snapshot opens (no HEAD advance) ────────────────────────────────
165
166    async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
167
168    async fn open_snapshot_at_table(
169        &self,
170        snapshot: &Snapshot,
171        table_key: &str,
172    ) -> Result<SnapshotHandle>;
173
174    async fn open_dataset_head(
175        &self,
176        dataset_uri: &str,
177        branch: Option<&str>,
178    ) -> Result<SnapshotHandle>;
179
180    async fn open_dataset_head_for_write(
181        &self,
182        table_key: &str,
183        dataset_uri: &str,
184        branch: Option<&str>,
185    ) -> Result<SnapshotHandle>;
186
187    async fn open_dataset_at_state(
188        &self,
189        table_path: &str,
190        branch: Option<&str>,
191        version: u64,
192    ) -> Result<SnapshotHandle>;
193
194    async fn fork_branch_from_state(
195        &self,
196        dataset_uri: &str,
197        source_branch: Option<&str>,
198        table_key: &str,
199        source_version: u64,
200        target_branch: &str,
201    ) -> Result<SnapshotHandle>;
202
203    async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
204
205    async fn reopen_for_mutation(
206        &self,
207        dataset_uri: &str,
208        branch: Option<&str>,
209        table_key: &str,
210        expected_version: u64,
211    ) -> Result<SnapshotHandle>;
212
213    fn ensure_expected_version(
214        &self,
215        snapshot: &SnapshotHandle,
216        table_key: &str,
217        expected_version: u64,
218    ) -> Result<()>;
219
220    // ── Reads (no HEAD advance) ────────────────────────────────────────
221
222    async fn scan(
223        &self,
224        snapshot: &SnapshotHandle,
225        projection: Option<&[&str]>,
226        filter: Option<&str>,
227        order_by: Option<Vec<ColumnOrdering>>,
228    ) -> Result<Vec<RecordBatch>>;
229
230    async fn scan_with_row_id(
231        &self,
232        snapshot: &SnapshotHandle,
233        projection: Option<&[&str]>,
234        filter: Option<&str>,
235        order_by: Option<Vec<ColumnOrdering>>,
236        with_row_id: bool,
237    ) -> Result<Vec<RecordBatch>>;
238
239    async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
240
241    async fn scan_batches_for_rewrite(
242        &self,
243        snapshot: &SnapshotHandle,
244    ) -> Result<Vec<RecordBatch>>;
245
246    async fn count_rows(
247        &self,
248        snapshot: &SnapshotHandle,
249        filter: Option<String>,
250    ) -> Result<usize>;
251
252    async fn count_rows_with_staged(
253        &self,
254        snapshot: &SnapshotHandle,
255        staged: &[StagedHandle],
256        filter: Option<String>,
257    ) -> Result<usize>;
258
259    async fn scan_with_staged(
260        &self,
261        snapshot: &SnapshotHandle,
262        staged: &[StagedHandle],
263        projection: Option<&[&str]>,
264        filter: Option<&str>,
265    ) -> Result<Vec<RecordBatch>>;
266
267    async fn scan_with_pending(
268        &self,
269        snapshot: &SnapshotHandle,
270        pending: &[RecordBatch],
271        pending_schema: Option<SchemaRef>,
272        projection: Option<&[&str]>,
273        filter: Option<&str>,
274        key_column: Option<&str>,
275    ) -> Result<Vec<RecordBatch>>;
276
277    async fn first_row_id_for_filter(
278        &self,
279        snapshot: &SnapshotHandle,
280        filter: &str,
281    ) -> Result<Option<u64>>;
282
283    async fn table_state(
284        &self,
285        dataset_uri: &str,
286        snapshot: &SnapshotHandle,
287    ) -> Result<TableState>;
288
289    // ── Staged writes (no HEAD advance) ────────────────────────────────
290
291    async fn stage_append(
292        &self,
293        snapshot: &SnapshotHandle,
294        batch: RecordBatch,
295        prior_stages: &[StagedHandle],
296    ) -> Result<StagedHandle>;
297
298    async fn stage_merge_insert(
299        &self,
300        snapshot: SnapshotHandle,
301        batch: RecordBatch,
302        key_columns: Vec<String>,
303        when_matched: WhenMatched,
304        when_not_matched: WhenNotMatched,
305    ) -> Result<StagedHandle>;
306
307    async fn commit_staged(
308        &self,
309        snapshot: SnapshotHandle,
310        staged: StagedHandle,
311    ) -> Result<SnapshotHandle>;
312
313    /// Stage an overwrite (Operation::Overwrite). MR-793 Phase 2.
314    async fn stage_overwrite(
315        &self,
316        snapshot: &SnapshotHandle,
317        batch: RecordBatch,
318    ) -> Result<StagedHandle>;
319
320    /// Stage a BTREE scalar index build. MR-793 Phase 2.
321    async fn stage_create_btree_index(
322        &self,
323        snapshot: &SnapshotHandle,
324        columns: &[&str],
325    ) -> Result<StagedHandle>;
326
327    /// Stage an INVERTED (FTS) scalar index build. MR-793 Phase 2.
328    async fn stage_create_inverted_index(
329        &self,
330        snapshot: &SnapshotHandle,
331        column: &str,
332    ) -> Result<StagedHandle>;
333
334    // ── Inline-commit residuals (named honestly per MR-793 §3.2) ──────
335    //
336    // These methods advance Lance HEAD as a side effect of writing.
337    // They stay on the trait until the corresponding upstream Lance API
338    // ships:
339    //
340    // * `delete_where` — Lance #6658 (two-phase delete).
341    // * `create_*_index` — `build_index_metadata_from_segments` is
342    //   `pub(crate)` for vector indices in lance-4.0.0; scalar indices
343    //   migrate to staged in MR-793 Phase 2.
344    // * `append_batch`, `merge_insert_batches`, `overwrite_batch` —
345    //   legacy paths that will be demoted to `pub(crate)` in MR-793
346    //   Phase 9 once all engine sites route through the staged
347    //   primitives.
348
349    async fn append_batch(
350        &self,
351        dataset_uri: &str,
352        snapshot: SnapshotHandle,
353        batch: RecordBatch,
354    ) -> Result<(SnapshotHandle, TableState)>;
355
356    async fn merge_insert_batches(
357        &self,
358        dataset_uri: &str,
359        snapshot: SnapshotHandle,
360        batches: Vec<RecordBatch>,
361        key_columns: Vec<String>,
362        when_matched: WhenMatched,
363        when_not_matched: WhenNotMatched,
364    ) -> Result<TableState>;
365
366    async fn overwrite_batch(
367        &self,
368        dataset_uri: &str,
369        snapshot: SnapshotHandle,
370        batch: RecordBatch,
371    ) -> Result<(SnapshotHandle, TableState)>;
372
373    async fn delete_where(
374        &self,
375        dataset_uri: &str,
376        snapshot: SnapshotHandle,
377        filter: &str,
378    ) -> Result<(SnapshotHandle, DeleteState)>;
379
380    async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
381    async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
382    async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
383
384    async fn create_btree_index(
385        &self,
386        snapshot: SnapshotHandle,
387        columns: &[&str],
388    ) -> Result<SnapshotHandle>;
389
390    async fn create_inverted_index(
391        &self,
392        snapshot: SnapshotHandle,
393        column: &str,
394    ) -> Result<SnapshotHandle>;
395
396    async fn create_vector_index(
397        &self,
398        snapshot: SnapshotHandle,
399        column: &str,
400    ) -> Result<SnapshotHandle>;
401
402    // ── URI helpers ────────────────────────────────────────────────────
403    //
404    // These are pure string formatting; they live on the trait so engine
405    // code holding `Arc<dyn TableStorage>` can compute dataset URIs
406    // without importing the concrete struct.
407
408    fn root_uri(&self) -> &str;
409    fn dataset_uri(&self, table_path: &str) -> String;
410
411    // ── Streaming access (used by the export path) ────────────────────
412    //
413    // Engine code that needs a `DatasetRecordBatchStream` (rather than a
414    // collected `Vec<RecordBatch>`) goes through this trait method.
415    // Useful for the JSONL exporter that streams rows to a writer
416    // without materializing the whole result.
417
418    async fn scan_stream(
419        &self,
420        snapshot: &SnapshotHandle,
421        projection: Option<&[&str]>,
422        filter: Option<&str>,
423        order_by: Option<Vec<ColumnOrdering>>,
424        with_row_id: bool,
425    ) -> Result<DatasetRecordBatchStream>;
426}
427
428// ─── single impl: TableStore ──────────────────────────────────────────────
429
430#[async_trait]
431impl TableStorage for TableStore {
432    async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
433        self.open_at_entry(entry).await.map(SnapshotHandle::new)
434    }
435
436    async fn open_snapshot_at_table(
437        &self,
438        snapshot: &Snapshot,
439        table_key: &str,
440    ) -> Result<SnapshotHandle> {
441        self.open_snapshot_table(snapshot, table_key)
442            .await
443            .map(SnapshotHandle::new)
444    }
445
446    async fn open_dataset_head(
447        &self,
448        dataset_uri: &str,
449        branch: Option<&str>,
450    ) -> Result<SnapshotHandle> {
451        TableStore::open_dataset_head(self, dataset_uri, branch)
452            .await
453            .map(SnapshotHandle::new)
454    }
455
456    async fn open_dataset_head_for_write(
457        &self,
458        table_key: &str,
459        dataset_uri: &str,
460        branch: Option<&str>,
461    ) -> Result<SnapshotHandle> {
462        TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
463            .await
464            .map(SnapshotHandle::new)
465    }
466
467    async fn open_dataset_at_state(
468        &self,
469        table_path: &str,
470        branch: Option<&str>,
471        version: u64,
472    ) -> Result<SnapshotHandle> {
473        TableStore::open_dataset_at_state(self, table_path, branch, version)
474            .await
475            .map(SnapshotHandle::new)
476    }
477
478    async fn fork_branch_from_state(
479        &self,
480        dataset_uri: &str,
481        source_branch: Option<&str>,
482        table_key: &str,
483        source_version: u64,
484        target_branch: &str,
485    ) -> Result<SnapshotHandle> {
486        TableStore::fork_branch_from_state(
487            self,
488            dataset_uri,
489            source_branch,
490            table_key,
491            source_version,
492            target_branch,
493        )
494        .await
495        .map(SnapshotHandle::new)
496    }
497
498    async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
499        TableStore::delete_branch(self, dataset_uri, branch).await
500    }
501
502    async fn reopen_for_mutation(
503        &self,
504        dataset_uri: &str,
505        branch: Option<&str>,
506        table_key: &str,
507        expected_version: u64,
508    ) -> Result<SnapshotHandle> {
509        TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
510            .await
511            .map(SnapshotHandle::new)
512    }
513
514    fn ensure_expected_version(
515        &self,
516        snapshot: &SnapshotHandle,
517        table_key: &str,
518        expected_version: u64,
519    ) -> Result<()> {
520        TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
521    }
522
523    async fn scan(
524        &self,
525        snapshot: &SnapshotHandle,
526        projection: Option<&[&str]>,
527        filter: Option<&str>,
528        order_by: Option<Vec<ColumnOrdering>>,
529    ) -> Result<Vec<RecordBatch>> {
530        TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
531    }
532
533    async fn scan_with_row_id(
534        &self,
535        snapshot: &SnapshotHandle,
536        projection: Option<&[&str]>,
537        filter: Option<&str>,
538        order_by: Option<Vec<ColumnOrdering>>,
539        with_row_id: bool,
540    ) -> Result<Vec<RecordBatch>> {
541        TableStore::scan_with(
542            self,
543            snapshot.dataset(),
544            projection,
545            filter,
546            order_by,
547            with_row_id,
548            |_| Ok(()),
549        )
550        .await
551    }
552
553    async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
554        TableStore::scan_batches(self, snapshot.dataset()).await
555    }
556
557    async fn scan_batches_for_rewrite(
558        &self,
559        snapshot: &SnapshotHandle,
560    ) -> Result<Vec<RecordBatch>> {
561        TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
562    }
563
564    async fn count_rows(
565        &self,
566        snapshot: &SnapshotHandle,
567        filter: Option<String>,
568    ) -> Result<usize> {
569        TableStore::count_rows(self, snapshot.dataset(), filter).await
570    }
571
572    async fn count_rows_with_staged(
573        &self,
574        snapshot: &SnapshotHandle,
575        staged: &[StagedHandle],
576        filter: Option<String>,
577    ) -> Result<usize> {
578        let staged_writes = staged_handles_as_writes(staged);
579        TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
580    }
581
582    async fn scan_with_staged(
583        &self,
584        snapshot: &SnapshotHandle,
585        staged: &[StagedHandle],
586        projection: Option<&[&str]>,
587        filter: Option<&str>,
588    ) -> Result<Vec<RecordBatch>> {
589        let staged_writes = staged_handles_as_writes(staged);
590        TableStore::scan_with_staged(
591            self,
592            snapshot.dataset(),
593            &staged_writes,
594            projection,
595            filter,
596        )
597        .await
598    }
599
600    async fn scan_with_pending(
601        &self,
602        snapshot: &SnapshotHandle,
603        pending: &[RecordBatch],
604        pending_schema: Option<SchemaRef>,
605        projection: Option<&[&str]>,
606        filter: Option<&str>,
607        key_column: Option<&str>,
608    ) -> Result<Vec<RecordBatch>> {
609        TableStore::scan_with_pending(
610            self,
611            snapshot.dataset(),
612            pending,
613            pending_schema,
614            projection,
615            filter,
616            key_column,
617        )
618        .await
619    }
620
621    async fn first_row_id_for_filter(
622        &self,
623        snapshot: &SnapshotHandle,
624        filter: &str,
625    ) -> Result<Option<u64>> {
626        TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
627    }
628
629    async fn table_state(
630        &self,
631        dataset_uri: &str,
632        snapshot: &SnapshotHandle,
633    ) -> Result<TableState> {
634        TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
635    }
636
637    async fn stage_append(
638        &self,
639        snapshot: &SnapshotHandle,
640        batch: RecordBatch,
641        prior_stages: &[StagedHandle],
642    ) -> Result<StagedHandle> {
643        let staged_writes = staged_handles_as_writes(prior_stages);
644        TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
645            .await
646            .map(StagedHandle::new)
647    }
648
649    async fn stage_merge_insert(
650        &self,
651        snapshot: SnapshotHandle,
652        batch: RecordBatch,
653        key_columns: Vec<String>,
654        when_matched: WhenMatched,
655        when_not_matched: WhenNotMatched,
656    ) -> Result<StagedHandle> {
657        let ds = Arc::try_unwrap(snapshot.into_arc())
658            .unwrap_or_else(|arc| (*arc).clone());
659        TableStore::stage_merge_insert(
660            self,
661            ds,
662            batch,
663            key_columns,
664            when_matched,
665            when_not_matched,
666        )
667        .await
668        .map(StagedHandle::new)
669    }
670
671    async fn commit_staged(
672        &self,
673        snapshot: SnapshotHandle,
674        staged: StagedHandle,
675    ) -> Result<SnapshotHandle> {
676        let ds_arc = snapshot.into_arc();
677        let transaction = staged.into_staged().transaction;
678        TableStore::commit_staged(self, ds_arc, transaction)
679            .await
680            .map(SnapshotHandle::new)
681    }
682
683    async fn stage_overwrite(
684        &self,
685        snapshot: &SnapshotHandle,
686        batch: RecordBatch,
687    ) -> Result<StagedHandle> {
688        TableStore::stage_overwrite(self, snapshot.dataset(), batch)
689            .await
690            .map(StagedHandle::new)
691    }
692
693    async fn stage_create_btree_index(
694        &self,
695        snapshot: &SnapshotHandle,
696        columns: &[&str],
697    ) -> Result<StagedHandle> {
698        TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
699            .await
700            .map(StagedHandle::new)
701    }
702
703    async fn stage_create_inverted_index(
704        &self,
705        snapshot: &SnapshotHandle,
706        column: &str,
707    ) -> Result<StagedHandle> {
708        TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
709            .await
710            .map(StagedHandle::new)
711    }
712
713    async fn append_batch(
714        &self,
715        dataset_uri: &str,
716        snapshot: SnapshotHandle,
717        batch: RecordBatch,
718    ) -> Result<(SnapshotHandle, TableState)> {
719        let mut ds = Arc::try_unwrap(snapshot.into_arc())
720            .unwrap_or_else(|arc| (*arc).clone());
721        let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?;
722        Ok((SnapshotHandle::new(ds), state))
723    }
724
725    async fn merge_insert_batches(
726        &self,
727        dataset_uri: &str,
728        snapshot: SnapshotHandle,
729        batches: Vec<RecordBatch>,
730        key_columns: Vec<String>,
731        when_matched: WhenMatched,
732        when_not_matched: WhenNotMatched,
733    ) -> Result<TableState> {
734        let ds = Arc::try_unwrap(snapshot.into_arc())
735            .unwrap_or_else(|arc| (*arc).clone());
736        TableStore::merge_insert_batches(
737            self,
738            dataset_uri,
739            ds,
740            batches,
741            key_columns,
742            when_matched,
743            when_not_matched,
744        )
745        .await
746    }
747
748    async fn overwrite_batch(
749        &self,
750        dataset_uri: &str,
751        snapshot: SnapshotHandle,
752        batch: RecordBatch,
753    ) -> Result<(SnapshotHandle, TableState)> {
754        let mut ds = Arc::try_unwrap(snapshot.into_arc())
755            .unwrap_or_else(|arc| (*arc).clone());
756        let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
757        Ok((SnapshotHandle::new(ds), state))
758    }
759
760    async fn delete_where(
761        &self,
762        dataset_uri: &str,
763        snapshot: SnapshotHandle,
764        filter: &str,
765    ) -> Result<(SnapshotHandle, DeleteState)> {
766        let mut ds = Arc::try_unwrap(snapshot.into_arc())
767            .unwrap_or_else(|arc| (*arc).clone());
768        let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
769        Ok((SnapshotHandle::new(ds), state))
770    }
771
772    async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
773        TableStore::has_btree_index(self, snapshot.dataset(), column).await
774    }
775
776    async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
777        TableStore::has_fts_index(self, snapshot.dataset(), column).await
778    }
779
780    async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
781        TableStore::has_vector_index(self, snapshot.dataset(), column).await
782    }
783
784    async fn create_btree_index(
785        &self,
786        snapshot: SnapshotHandle,
787        columns: &[&str],
788    ) -> Result<SnapshotHandle> {
789        let mut ds = Arc::try_unwrap(snapshot.into_arc())
790            .unwrap_or_else(|arc| (*arc).clone());
791        TableStore::create_btree_index(self, &mut ds, columns).await?;
792        Ok(SnapshotHandle::new(ds))
793    }
794
795    async fn create_inverted_index(
796        &self,
797        snapshot: SnapshotHandle,
798        column: &str,
799    ) -> Result<SnapshotHandle> {
800        let mut ds = Arc::try_unwrap(snapshot.into_arc())
801            .unwrap_or_else(|arc| (*arc).clone());
802        TableStore::create_inverted_index(self, &mut ds, column).await?;
803        Ok(SnapshotHandle::new(ds))
804    }
805
806    async fn create_vector_index(
807        &self,
808        snapshot: SnapshotHandle,
809        column: &str,
810    ) -> Result<SnapshotHandle> {
811        let mut ds = Arc::try_unwrap(snapshot.into_arc())
812            .unwrap_or_else(|arc| (*arc).clone());
813        TableStore::create_vector_index(self, &mut ds, column).await?;
814        Ok(SnapshotHandle::new(ds))
815    }
816
817    fn root_uri(&self) -> &str {
818        TableStore::root_uri(self)
819    }
820
821    fn dataset_uri(&self, table_path: &str) -> String {
822        TableStore::dataset_uri(self, table_path)
823    }
824
825    async fn scan_stream(
826        &self,
827        snapshot: &SnapshotHandle,
828        projection: Option<&[&str]>,
829        filter: Option<&str>,
830        order_by: Option<Vec<ColumnOrdering>>,
831        with_row_id: bool,
832    ) -> Result<DatasetRecordBatchStream> {
833        // Note: existing TableStore::scan_stream is an associated fn that
834        // takes &Dataset, so we delegate via the dataset reference held by
835        // the snapshot.
836        TableStore::scan_stream(snapshot.dataset(), projection, filter, order_by, with_row_id).await
837    }
838}