Skip to main content

omnigraph/
storage_layer.rs

1//! Storage trait surface — MR-793.
2//!
3//! `TableStorage` is the engine-internal trait that exposes the
4//! staged-write primitives (`stage_append`, `stage_merge_insert`,
5//! `stage_overwrite`, `stage_create_btree_index`,
6//! `stage_create_inverted_index`) plus `commit_staged` as the canonical
7//! way for new engine writers to advance Lance HEAD without coupling
8//! "write bytes" with "advance HEAD" in one Lance API call.
9//!
10//! ## Transitional residuals on the trait
11//!
12//! Several inline-commit methods remain on the trait surface as
13//! documented residuals: `delete_where`
14//! ([#6658](https://github.com/lance-format/lance/issues/6658) closed
15//! 2026-05-14, but the public `DeleteBuilder::execute_uncommitted` API
16//! did not backport to the 6.x release line — it first ships in
17//! `v7.0.0-beta.10`. Migration to staged two-phase delete is tracked as
18//! MR-A and is gated on the Lance v7.x bump, not the current v6.0.1 pin),
19//! `create_vector_index` (segment-commit-path requires
20//! `build_index_metadata_from_segments` which is `pub(crate)` — see
21//! [#6666](https://github.com/lance-format/lance/issues/6666), still open), and the
22//! legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` /
23//! `create_btree_index` / `create_inverted_index` paths kept while
24//! engine call sites finish migrating off of them (Phase 1b / Phase 9
25//! of MR-793). These are named honestly at every call site; the
26//! forbidden-API guard test catches direct lance::* misuse outside the
27//! storage layer.
28//!
29//! ## Sealed
30//!
31//! `TableStorage: sealed::Sealed`. Only types in this crate can implement
32//! the trait, so a downstream crate cannot subvert the contract by
33//! providing its own impl.
34//!
35//! ## Opaque handles
36//!
37//! `SnapshotHandle` and `StagedHandle` wrap `lance::Dataset` and
38//! `StagedWrite` respectively. Their inner Lance types are
39//! `pub(crate)` — engine code outside `table_store` cannot reach
40//! through. This aligns with the storage-boundary invariant:
41//! `lance::Dataset` does not appear in trait signatures.
42//!
43//! ## Migration status (MR-793 PR #70)
44//!
45//! Phases 1a / 2 / 4 / 5 / 6 are landed: trait scaffolding, three new
46//! staged primitives (`stage_overwrite`, scalar index staging), and
47//! migration of `ensure_indices`, `branch_merge`, `schema_apply` onto
48//! the staged surface. Phase 1b (call-site conversion to
49//! `Arc<dyn TableStorage>`), Phase 9 (demote unused inline-commit
50//! methods to `pub(crate)`), Phase 7 (recovery reconciler — MR-847),
51//! and Phase 8 (index reconciler — MR-848) are deferred to follow-ups.
52
53use std::fmt::Debug;
54use std::sync::Arc;
55
56use arrow_array::RecordBatch;
57use arrow_schema::SchemaRef;
58use async_trait::async_trait;
59use lance::Dataset;
60use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
61use lance::dataset::{WhenMatched, WhenNotMatched};
62
63use crate::db::{Snapshot, SubTableEntry};
64use crate::error::Result;
65use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
66
67// ─── sealed module ──────────────────────────────────────────────────────────
68
69pub(crate) mod sealed {
70    /// Sealed marker — only types defined in `omnigraph-engine` can
71    /// implement `TableStorage`. Combined with the trait being the only
72    /// route to write APIs from engine code, this gives type-system
73    /// enforcement of the staged-write invariant.
74    pub trait Sealed {}
75
76    impl Sealed for crate::table_store::TableStore {}
77}
78
79// ─── opaque handles ────────────────────────────────────────────────────────
80
81/// Opaque handle to a snapshot of a single sub-table dataset at a
82/// specific version.
83///
84/// Engine code never sees `lance::Dataset` directly; it holds
85/// `SnapshotHandle` and passes it back to `TableStorage` methods.
86/// Inside this crate, `pub(crate)` accessors expose the inner
87/// `Arc<Dataset>` to the `TableStorage` impl.
88#[derive(Debug, Clone)]
89pub struct SnapshotHandle {
90    pub(crate) inner: Arc<Dataset>,
91}
92
93impl SnapshotHandle {
94    /// Construct from a Lance dataset. `pub(crate)` — only
95    /// `TableStore` should produce these.
96    pub(crate) fn new(ds: Dataset) -> Self {
97        Self {
98            inner: Arc::new(ds),
99        }
100    }
101
102    /// Borrow the underlying Lance dataset. `pub(crate)` so only the
103    /// `TableStorage` impl in this crate can reach through.
104    pub(crate) fn dataset(&self) -> &Dataset {
105        &self.inner
106    }
107
108    /// Take ownership of the inner `Arc<Dataset>`. Used when committing
109    /// staged writes (the call needs to consume the snapshot).
110    pub(crate) fn into_arc(self) -> Arc<Dataset> {
111        self.inner
112    }
113
114    // ── public, lance-free accessors ──
115
116    /// Current Lance manifest version of the snapshot.
117    pub fn version(&self) -> u64 {
118        self.inner.version().version
119    }
120
121    /// Whether the underlying dataset uses stable row IDs.
122    pub fn uses_stable_row_ids(&self) -> bool {
123        self.inner.manifest.uses_stable_row_ids()
124    }
125}
126
127/// Opaque handle to a staged Lance transaction (data write or scalar
128/// index build) that has not yet advanced HEAD.
129///
130/// Produced by `TableStorage::stage_*`, consumed by
131/// `TableStorage::commit_staged`. Carries the underlying `StagedWrite`
132/// (transaction + read-your-writes deltas) behind `pub(crate)`.
133#[derive(Debug, Clone)]
134pub struct StagedHandle {
135    pub(crate) inner: StagedWrite,
136}
137
138impl StagedHandle {
139    pub(crate) fn new(staged: StagedWrite) -> Self {
140        Self { inner: staged }
141    }
142
143    /// Take ownership of the inner `StagedWrite`. Used by
144    /// `commit_staged`.
145    pub(crate) fn into_staged(self) -> StagedWrite {
146        self.inner
147    }
148}
149
150/// Helper: clone the inner `StagedWrite` out of each `StagedHandle` and
151/// collect into a `Vec<StagedWrite>` for handing to
152/// `TableStore::stage_append`'s `prior_stages` parameter. The result is
153/// owned (not borrowed) — callers that already had a `&[StagedHandle]`
154/// pay a clone cost per element. `StagedWrite::clone` is cheap because
155/// `Transaction` and `Vec<Fragment>` are shallow-clone friendly.
156pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
157    handles.iter().map(|h| h.inner.clone()).collect()
158}
159
160// ─── TableStorage trait ────────────────────────────────────────────────────
161
162/// Engine-internal trait covering every Lance dataset operation an
163/// `omnigraph` engine call site might perform.
164///
165/// `TableStore` is the only `impl`. The trait is sealed; the inline
166/// Lance APIs are not reachable through trait dispatch. New writers that
167/// might advance Lance HEAD MUST add a staged-shape method here.
168#[async_trait]
169pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
170    // ── Snapshot opens (no HEAD advance) ────────────────────────────────
171
172    async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
173
174    async fn open_snapshot_at_table(
175        &self,
176        snapshot: &Snapshot,
177        table_key: &str,
178    ) -> Result<SnapshotHandle>;
179
180    async fn open_dataset_head(
181        &self,
182        dataset_uri: &str,
183        branch: Option<&str>,
184    ) -> Result<SnapshotHandle>;
185
186    async fn open_dataset_head_for_write(
187        &self,
188        table_key: &str,
189        dataset_uri: &str,
190        branch: Option<&str>,
191    ) -> Result<SnapshotHandle>;
192
193    async fn open_dataset_at_state(
194        &self,
195        table_path: &str,
196        branch: Option<&str>,
197        version: u64,
198    ) -> Result<SnapshotHandle>;
199
200    async fn fork_branch_from_state(
201        &self,
202        dataset_uri: &str,
203        source_branch: Option<&str>,
204        table_key: &str,
205        source_version: u64,
206        target_branch: &str,
207    ) -> Result<SnapshotHandle>;
208
209    async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
210
211    async fn reopen_for_mutation(
212        &self,
213        dataset_uri: &str,
214        branch: Option<&str>,
215        table_key: &str,
216        expected_version: u64,
217    ) -> Result<SnapshotHandle>;
218
219    fn ensure_expected_version(
220        &self,
221        snapshot: &SnapshotHandle,
222        table_key: &str,
223        expected_version: u64,
224    ) -> Result<()>;
225
226    // ── Reads (no HEAD advance) ────────────────────────────────────────
227
228    async fn scan(
229        &self,
230        snapshot: &SnapshotHandle,
231        projection: Option<&[&str]>,
232        filter: Option<&str>,
233        order_by: Option<Vec<ColumnOrdering>>,
234    ) -> Result<Vec<RecordBatch>>;
235
236    async fn scan_with_row_id(
237        &self,
238        snapshot: &SnapshotHandle,
239        projection: Option<&[&str]>,
240        filter: Option<&str>,
241        order_by: Option<Vec<ColumnOrdering>>,
242        with_row_id: bool,
243    ) -> Result<Vec<RecordBatch>>;
244
245    async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
246
247    async fn scan_batches_for_rewrite(&self, snapshot: &SnapshotHandle)
248    -> Result<Vec<RecordBatch>>;
249
250    async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize>;
251
252    async fn count_rows_with_staged(
253        &self,
254        snapshot: &SnapshotHandle,
255        staged: &[StagedHandle],
256        filter: Option<String>,
257    ) -> Result<usize>;
258
259    async fn scan_with_staged(
260        &self,
261        snapshot: &SnapshotHandle,
262        staged: &[StagedHandle],
263        projection: Option<&[&str]>,
264        filter: Option<&str>,
265    ) -> Result<Vec<RecordBatch>>;
266
267    async fn scan_with_pending(
268        &self,
269        snapshot: &SnapshotHandle,
270        pending: &[RecordBatch],
271        pending_schema: Option<SchemaRef>,
272        projection: Option<&[&str]>,
273        filter: Option<&str>,
274        key_column: Option<&str>,
275    ) -> Result<Vec<RecordBatch>>;
276
277    async fn first_row_id_for_filter(
278        &self,
279        snapshot: &SnapshotHandle,
280        filter: &str,
281    ) -> Result<Option<u64>>;
282
283    async fn table_state(&self, dataset_uri: &str, snapshot: &SnapshotHandle)
284    -> Result<TableState>;
285
286    // ── Staged writes (no HEAD advance) ────────────────────────────────
287
288    async fn stage_append(
289        &self,
290        snapshot: &SnapshotHandle,
291        batch: RecordBatch,
292        prior_stages: &[StagedHandle],
293    ) -> Result<StagedHandle>;
294
295    async fn stage_merge_insert(
296        &self,
297        snapshot: SnapshotHandle,
298        batch: RecordBatch,
299        key_columns: Vec<String>,
300        when_matched: WhenMatched,
301        when_not_matched: WhenNotMatched,
302    ) -> Result<StagedHandle>;
303
304    async fn commit_staged(
305        &self,
306        snapshot: SnapshotHandle,
307        staged: StagedHandle,
308    ) -> Result<SnapshotHandle>;
309
310    /// Stage an overwrite (Operation::Overwrite). MR-793 Phase 2.
311    async fn stage_overwrite(
312        &self,
313        snapshot: &SnapshotHandle,
314        batch: RecordBatch,
315    ) -> Result<StagedHandle>;
316
317    /// Stage a BTREE scalar index build. MR-793 Phase 2.
318    async fn stage_create_btree_index(
319        &self,
320        snapshot: &SnapshotHandle,
321        columns: &[&str],
322    ) -> Result<StagedHandle>;
323
324    /// Stage an INVERTED (FTS) scalar index build. MR-793 Phase 2.
325    async fn stage_create_inverted_index(
326        &self,
327        snapshot: &SnapshotHandle,
328        column: &str,
329    ) -> Result<StagedHandle>;
330
331    // ── Inline-commit residuals (named honestly per MR-793 §3.2) ──────
332    //
333    // These methods advance Lance HEAD as a side effect of writing.
334    // They stay on the trait until the corresponding upstream Lance API
335    // ships:
336    //
337    // * `delete_where` — Lance #6658 (two-phase delete).
338    // * `create_*_index` — `build_index_metadata_from_segments` is
339    //   `pub(crate)` for vector indices in lance-4.0.0; scalar indices
340    //   migrate to staged in MR-793 Phase 2.
341    // * `append_batch`, `merge_insert_batches`, `overwrite_batch` —
342    //   legacy paths that will be demoted to `pub(crate)` in MR-793
343    //   Phase 9 once all engine sites route through the staged
344    //   primitives.
345
346    async fn append_batch(
347        &self,
348        dataset_uri: &str,
349        snapshot: SnapshotHandle,
350        batch: RecordBatch,
351    ) -> Result<(SnapshotHandle, TableState)>;
352
353    async fn merge_insert_batches(
354        &self,
355        dataset_uri: &str,
356        snapshot: SnapshotHandle,
357        batches: Vec<RecordBatch>,
358        key_columns: Vec<String>,
359        when_matched: WhenMatched,
360        when_not_matched: WhenNotMatched,
361    ) -> Result<TableState>;
362
363    async fn overwrite_batch(
364        &self,
365        dataset_uri: &str,
366        snapshot: SnapshotHandle,
367        batch: RecordBatch,
368    ) -> Result<(SnapshotHandle, TableState)>;
369
370    async fn delete_where(
371        &self,
372        dataset_uri: &str,
373        snapshot: SnapshotHandle,
374        filter: &str,
375    ) -> Result<(SnapshotHandle, DeleteState)>;
376
377    async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
378    async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
379    async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
380
381    async fn create_btree_index(
382        &self,
383        snapshot: SnapshotHandle,
384        columns: &[&str],
385    ) -> Result<SnapshotHandle>;
386
387    async fn create_inverted_index(
388        &self,
389        snapshot: SnapshotHandle,
390        column: &str,
391    ) -> Result<SnapshotHandle>;
392
393    async fn create_vector_index(
394        &self,
395        snapshot: SnapshotHandle,
396        column: &str,
397    ) -> Result<SnapshotHandle>;
398
399    // ── URI helpers ────────────────────────────────────────────────────
400    //
401    // These are pure string formatting; they live on the trait so engine
402    // code holding `Arc<dyn TableStorage>` can compute dataset URIs
403    // without importing the concrete struct.
404
405    fn root_uri(&self) -> &str;
406    fn dataset_uri(&self, table_path: &str) -> String;
407
408    // ── Streaming access (used by the export path) ────────────────────
409    //
410    // Engine code that needs a `DatasetRecordBatchStream` (rather than a
411    // collected `Vec<RecordBatch>`) goes through this trait method.
412    // Useful for the JSONL exporter that streams rows to a writer
413    // without materializing the whole result.
414
415    async fn scan_stream(
416        &self,
417        snapshot: &SnapshotHandle,
418        projection: Option<&[&str]>,
419        filter: Option<&str>,
420        order_by: Option<Vec<ColumnOrdering>>,
421        with_row_id: bool,
422    ) -> Result<DatasetRecordBatchStream>;
423}
424
425// ─── single impl: TableStore ──────────────────────────────────────────────
426
427#[async_trait]
428impl TableStorage for TableStore {
429    async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
430        self.open_at_entry(entry).await.map(SnapshotHandle::new)
431    }
432
433    async fn open_snapshot_at_table(
434        &self,
435        snapshot: &Snapshot,
436        table_key: &str,
437    ) -> Result<SnapshotHandle> {
438        self.open_snapshot_table(snapshot, table_key)
439            .await
440            .map(SnapshotHandle::new)
441    }
442
443    async fn open_dataset_head(
444        &self,
445        dataset_uri: &str,
446        branch: Option<&str>,
447    ) -> Result<SnapshotHandle> {
448        TableStore::open_dataset_head(self, dataset_uri, branch)
449            .await
450            .map(SnapshotHandle::new)
451    }
452
453    async fn open_dataset_head_for_write(
454        &self,
455        table_key: &str,
456        dataset_uri: &str,
457        branch: Option<&str>,
458    ) -> Result<SnapshotHandle> {
459        TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
460            .await
461            .map(SnapshotHandle::new)
462    }
463
464    async fn open_dataset_at_state(
465        &self,
466        table_path: &str,
467        branch: Option<&str>,
468        version: u64,
469    ) -> Result<SnapshotHandle> {
470        TableStore::open_dataset_at_state(self, table_path, branch, version)
471            .await
472            .map(SnapshotHandle::new)
473    }
474
475    async fn fork_branch_from_state(
476        &self,
477        dataset_uri: &str,
478        source_branch: Option<&str>,
479        table_key: &str,
480        source_version: u64,
481        target_branch: &str,
482    ) -> Result<SnapshotHandle> {
483        TableStore::fork_branch_from_state(
484            self,
485            dataset_uri,
486            source_branch,
487            table_key,
488            source_version,
489            target_branch,
490        )
491        .await
492        .map(SnapshotHandle::new)
493    }
494
495    async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
496        TableStore::delete_branch(self, dataset_uri, branch).await
497    }
498
499    async fn reopen_for_mutation(
500        &self,
501        dataset_uri: &str,
502        branch: Option<&str>,
503        table_key: &str,
504        expected_version: u64,
505    ) -> Result<SnapshotHandle> {
506        TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
507            .await
508            .map(SnapshotHandle::new)
509    }
510
511    fn ensure_expected_version(
512        &self,
513        snapshot: &SnapshotHandle,
514        table_key: &str,
515        expected_version: u64,
516    ) -> Result<()> {
517        TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
518    }
519
520    async fn scan(
521        &self,
522        snapshot: &SnapshotHandle,
523        projection: Option<&[&str]>,
524        filter: Option<&str>,
525        order_by: Option<Vec<ColumnOrdering>>,
526    ) -> Result<Vec<RecordBatch>> {
527        TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
528    }
529
530    async fn scan_with_row_id(
531        &self,
532        snapshot: &SnapshotHandle,
533        projection: Option<&[&str]>,
534        filter: Option<&str>,
535        order_by: Option<Vec<ColumnOrdering>>,
536        with_row_id: bool,
537    ) -> Result<Vec<RecordBatch>> {
538        TableStore::scan_with(
539            self,
540            snapshot.dataset(),
541            projection,
542            filter,
543            order_by,
544            with_row_id,
545            |_| Ok(()),
546        )
547        .await
548    }
549
550    async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
551        TableStore::scan_batches(self, snapshot.dataset()).await
552    }
553
554    async fn scan_batches_for_rewrite(
555        &self,
556        snapshot: &SnapshotHandle,
557    ) -> Result<Vec<RecordBatch>> {
558        TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
559    }
560
561    async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize> {
562        TableStore::count_rows(self, snapshot.dataset(), filter).await
563    }
564
565    async fn count_rows_with_staged(
566        &self,
567        snapshot: &SnapshotHandle,
568        staged: &[StagedHandle],
569        filter: Option<String>,
570    ) -> Result<usize> {
571        let staged_writes = staged_handles_as_writes(staged);
572        TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
573    }
574
575    async fn scan_with_staged(
576        &self,
577        snapshot: &SnapshotHandle,
578        staged: &[StagedHandle],
579        projection: Option<&[&str]>,
580        filter: Option<&str>,
581    ) -> Result<Vec<RecordBatch>> {
582        let staged_writes = staged_handles_as_writes(staged);
583        TableStore::scan_with_staged(self, snapshot.dataset(), &staged_writes, projection, filter)
584            .await
585    }
586
587    async fn scan_with_pending(
588        &self,
589        snapshot: &SnapshotHandle,
590        pending: &[RecordBatch],
591        pending_schema: Option<SchemaRef>,
592        projection: Option<&[&str]>,
593        filter: Option<&str>,
594        key_column: Option<&str>,
595    ) -> Result<Vec<RecordBatch>> {
596        TableStore::scan_with_pending(
597            self,
598            snapshot.dataset(),
599            pending,
600            pending_schema,
601            projection,
602            filter,
603            key_column,
604        )
605        .await
606    }
607
608    async fn first_row_id_for_filter(
609        &self,
610        snapshot: &SnapshotHandle,
611        filter: &str,
612    ) -> Result<Option<u64>> {
613        TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
614    }
615
616    async fn table_state(
617        &self,
618        dataset_uri: &str,
619        snapshot: &SnapshotHandle,
620    ) -> Result<TableState> {
621        TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
622    }
623
624    async fn stage_append(
625        &self,
626        snapshot: &SnapshotHandle,
627        batch: RecordBatch,
628        prior_stages: &[StagedHandle],
629    ) -> Result<StagedHandle> {
630        let staged_writes = staged_handles_as_writes(prior_stages);
631        TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
632            .await
633            .map(StagedHandle::new)
634    }
635
636    async fn stage_merge_insert(
637        &self,
638        snapshot: SnapshotHandle,
639        batch: RecordBatch,
640        key_columns: Vec<String>,
641        when_matched: WhenMatched,
642        when_not_matched: WhenNotMatched,
643    ) -> Result<StagedHandle> {
644        let ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
645        TableStore::stage_merge_insert(self, ds, batch, key_columns, when_matched, when_not_matched)
646            .await
647            .map(StagedHandle::new)
648    }
649
650    async fn commit_staged(
651        &self,
652        snapshot: SnapshotHandle,
653        staged: StagedHandle,
654    ) -> Result<SnapshotHandle> {
655        let ds_arc = snapshot.into_arc();
656        let transaction = staged.into_staged().transaction;
657        TableStore::commit_staged(self, ds_arc, transaction)
658            .await
659            .map(SnapshotHandle::new)
660    }
661
662    async fn stage_overwrite(
663        &self,
664        snapshot: &SnapshotHandle,
665        batch: RecordBatch,
666    ) -> Result<StagedHandle> {
667        TableStore::stage_overwrite(self, snapshot.dataset(), batch)
668            .await
669            .map(StagedHandle::new)
670    }
671
672    async fn stage_create_btree_index(
673        &self,
674        snapshot: &SnapshotHandle,
675        columns: &[&str],
676    ) -> Result<StagedHandle> {
677        TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
678            .await
679            .map(StagedHandle::new)
680    }
681
682    async fn stage_create_inverted_index(
683        &self,
684        snapshot: &SnapshotHandle,
685        column: &str,
686    ) -> Result<StagedHandle> {
687        TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
688            .await
689            .map(StagedHandle::new)
690    }
691
692    async fn append_batch(
693        &self,
694        dataset_uri: &str,
695        snapshot: SnapshotHandle,
696        batch: RecordBatch,
697    ) -> Result<(SnapshotHandle, TableState)> {
698        let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
699        let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?;
700        Ok((SnapshotHandle::new(ds), state))
701    }
702
703    async fn merge_insert_batches(
704        &self,
705        dataset_uri: &str,
706        snapshot: SnapshotHandle,
707        batches: Vec<RecordBatch>,
708        key_columns: Vec<String>,
709        when_matched: WhenMatched,
710        when_not_matched: WhenNotMatched,
711    ) -> Result<TableState> {
712        let ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
713        TableStore::merge_insert_batches(
714            self,
715            dataset_uri,
716            ds,
717            batches,
718            key_columns,
719            when_matched,
720            when_not_matched,
721        )
722        .await
723    }
724
725    async fn overwrite_batch(
726        &self,
727        dataset_uri: &str,
728        snapshot: SnapshotHandle,
729        batch: RecordBatch,
730    ) -> Result<(SnapshotHandle, TableState)> {
731        let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
732        let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
733        Ok((SnapshotHandle::new(ds), state))
734    }
735
736    async fn delete_where(
737        &self,
738        dataset_uri: &str,
739        snapshot: SnapshotHandle,
740        filter: &str,
741    ) -> Result<(SnapshotHandle, DeleteState)> {
742        let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
743        let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
744        Ok((SnapshotHandle::new(ds), state))
745    }
746
747    async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
748        TableStore::has_btree_index(self, snapshot.dataset(), column).await
749    }
750
751    async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
752        TableStore::has_fts_index(self, snapshot.dataset(), column).await
753    }
754
755    async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
756        TableStore::has_vector_index(self, snapshot.dataset(), column).await
757    }
758
759    async fn create_btree_index(
760        &self,
761        snapshot: SnapshotHandle,
762        columns: &[&str],
763    ) -> Result<SnapshotHandle> {
764        let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
765        TableStore::create_btree_index(self, &mut ds, columns).await?;
766        Ok(SnapshotHandle::new(ds))
767    }
768
769    async fn create_inverted_index(
770        &self,
771        snapshot: SnapshotHandle,
772        column: &str,
773    ) -> Result<SnapshotHandle> {
774        let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
775        TableStore::create_inverted_index(self, &mut ds, column).await?;
776        Ok(SnapshotHandle::new(ds))
777    }
778
779    async fn create_vector_index(
780        &self,
781        snapshot: SnapshotHandle,
782        column: &str,
783    ) -> Result<SnapshotHandle> {
784        let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
785        TableStore::create_vector_index(self, &mut ds, column).await?;
786        Ok(SnapshotHandle::new(ds))
787    }
788
789    fn root_uri(&self) -> &str {
790        TableStore::root_uri(self)
791    }
792
793    fn dataset_uri(&self, table_path: &str) -> String {
794        TableStore::dataset_uri(self, table_path)
795    }
796
797    async fn scan_stream(
798        &self,
799        snapshot: &SnapshotHandle,
800        projection: Option<&[&str]>,
801        filter: Option<&str>,
802        order_by: Option<Vec<ColumnOrdering>>,
803        with_row_id: bool,
804    ) -> Result<DatasetRecordBatchStream> {
805        // Note: existing TableStore::scan_stream is an associated fn that
806        // takes &Dataset, so we delegate via the dataset reference held by
807        // the snapshot.
808        TableStore::scan_stream(
809            snapshot.dataset(),
810            projection,
811            filter,
812            order_by,
813            with_row_id,
814        )
815        .await
816    }
817}