Skip to main content

omnigraph/
storage_layer.rs

1//! Storage trait surface — MR-793.
2//!
3//! `TableStorage` is the engine-internal trait that exposes the
4//! staged-write primitives (`stage_append`, `stage_merge_insert`,
5//! `stage_overwrite`, `stage_create_btree_index`,
6//! `stage_create_inverted_index`) plus `commit_staged` as the canonical
7//! way for new engine writers to advance Lance HEAD without coupling
8//! "write bytes" with "advance HEAD" in one Lance API call.
9//!
10//! ## Transitional residuals on the trait
11//!
12//! Several inline-commit methods remain on the trait surface as
13//! documented residuals: `delete_where`
14//! ([#6658](https://github.com/lance-format/lance/issues/6658) closed
15//! 2026-05-14, but the public `DeleteBuilder::execute_uncommitted` API
16//! did not backport to the 6.x release line — it first ships in
17//! `v7.0.0-beta.10`. Migration to staged two-phase delete is tracked as
18//! MR-A and is gated on the Lance v7.x bump, not the current v6.0.1 pin),
19//! `create_vector_index` (segment-commit-path requires
20//! `build_index_metadata_from_segments` which is `pub(crate)` — see
21//! [#6666](https://github.com/lance-format/lance/issues/6666), still open), and the
22//! legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` /
23//! `create_btree_index` / `create_inverted_index` paths kept while
24//! engine call sites finish migrating off of them (Phase 1b / Phase 9
25//! of MR-793). These are named honestly at every call site; the
26//! forbidden-API guard test catches direct lance::* misuse outside the
27//! storage layer.
28//!
29//! ## Sealed
30//!
31//! `TableStorage: sealed::Sealed`. Only types in this crate can implement
32//! the trait, so a downstream crate cannot subvert the contract by
33//! providing its own impl.
34//!
35//! ## Opaque handles
36//!
37//! `SnapshotHandle` and `StagedHandle` wrap `lance::Dataset` and
38//! `StagedWrite` respectively. Their inner Lance types are
39//! `pub(crate)` — engine code outside `table_store` cannot reach
40//! through. This aligns with the storage-boundary invariant:
41//! `lance::Dataset` does not appear in trait signatures.
42//!
43//! ## Migration status (MR-793 PR #70)
44//!
45//! Phases 1a / 2 / 4 / 5 / 6 are landed: trait scaffolding, three new
46//! staged primitives (`stage_overwrite`, scalar index staging), and
47//! migration of `ensure_indices`, `branch_merge`, `schema_apply` onto
48//! the staged surface. Phase 1b (call-site conversion to
49//! `Arc<dyn TableStorage>`), Phase 9 (demote unused inline-commit
50//! methods to `pub(crate)`), Phase 7 (recovery reconciler — MR-847),
51//! and Phase 8 (index reconciler — MR-848) are deferred to follow-ups.
52
53use std::fmt::Debug;
54use std::sync::Arc;
55
56use arrow_array::RecordBatch;
57use arrow_schema::SchemaRef;
58use async_trait::async_trait;
59use lance::Dataset;
60use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
61use lance::dataset::{WhenMatched, WhenNotMatched};
62
63use crate::db::{Snapshot, SubTableEntry};
64use crate::error::Result;
65use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
66
67// ─── sealed module ──────────────────────────────────────────────────────────
68
69pub(crate) mod sealed {
70    /// Sealed marker — only types defined in `omnigraph-engine` can
71    /// implement `TableStorage`. Combined with the trait being the only
72    /// route to write APIs from engine code, this gives type-system
73    /// enforcement of the staged-write invariant.
74    pub trait Sealed {}
75
76    impl Sealed for crate::table_store::TableStore {}
77}
78
79// ─── opaque handles ────────────────────────────────────────────────────────
80
81/// Opaque handle to a snapshot of a single sub-table dataset at a
82/// specific version.
83///
84/// Engine code never sees `lance::Dataset` directly; it holds
85/// `SnapshotHandle` and passes it back to `TableStorage` methods.
86/// Inside this crate, `pub(crate)` accessors expose the inner
87/// `Arc<Dataset>` to the `TableStorage` impl.
88#[derive(Debug, Clone)]
89pub struct SnapshotHandle {
90    pub(crate) inner: Arc<Dataset>,
91}
92
93impl SnapshotHandle {
94    /// Construct from a Lance dataset. `pub(crate)` — only
95    /// `TableStore` should produce these.
96    pub(crate) fn new(ds: Dataset) -> Self {
97        Self { inner: Arc::new(ds) }
98    }
99
100    /// Borrow the underlying Lance dataset. `pub(crate)` so only the
101    /// `TableStorage` impl in this crate can reach through.
102    pub(crate) fn dataset(&self) -> &Dataset {
103        &self.inner
104    }
105
106    /// Take ownership of the inner `Arc<Dataset>`. Used when committing
107    /// staged writes (the call needs to consume the snapshot).
108    pub(crate) fn into_arc(self) -> Arc<Dataset> {
109        self.inner
110    }
111
112    // ── public, lance-free accessors ──
113
114    /// Current Lance manifest version of the snapshot.
115    pub fn version(&self) -> u64 {
116        self.inner.version().version
117    }
118
119    /// Whether the underlying dataset uses stable row IDs.
120    pub fn uses_stable_row_ids(&self) -> bool {
121        self.inner.manifest.uses_stable_row_ids()
122    }
123}
124
125/// Opaque handle to a staged Lance transaction (data write or scalar
126/// index build) that has not yet advanced HEAD.
127///
128/// Produced by `TableStorage::stage_*`, consumed by
129/// `TableStorage::commit_staged`. Carries the underlying `StagedWrite`
130/// (transaction + read-your-writes deltas) behind `pub(crate)`.
131#[derive(Debug, Clone)]
132pub struct StagedHandle {
133    pub(crate) inner: StagedWrite,
134}
135
136impl StagedHandle {
137    pub(crate) fn new(staged: StagedWrite) -> Self {
138        Self { inner: staged }
139    }
140
141    /// Take ownership of the inner `StagedWrite`. Used by
142    /// `commit_staged`.
143    pub(crate) fn into_staged(self) -> StagedWrite {
144        self.inner
145    }
146}
147
148/// Helper: clone the inner `StagedWrite` out of each `StagedHandle` and
149/// collect into a `Vec<StagedWrite>` for handing to
150/// `TableStore::stage_append`'s `prior_stages` parameter. The result is
151/// owned (not borrowed) — callers that already had a `&[StagedHandle]`
152/// pay a clone cost per element. `StagedWrite::clone` is cheap because
153/// `Transaction` and `Vec<Fragment>` are shallow-clone friendly.
154pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
155    handles.iter().map(|h| h.inner.clone()).collect()
156}
157
158// ─── TableStorage trait ────────────────────────────────────────────────────
159
160/// Engine-internal trait covering every Lance dataset operation an
161/// `omnigraph` engine call site might perform.
162///
163/// `TableStore` is the only `impl`. The trait is sealed; the inline
164/// Lance APIs are not reachable through trait dispatch. New writers that
165/// might advance Lance HEAD MUST add a staged-shape method here.
166#[async_trait]
167pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
168    // ── Snapshot opens (no HEAD advance) ────────────────────────────────
169
170    async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
171
172    async fn open_snapshot_at_table(
173        &self,
174        snapshot: &Snapshot,
175        table_key: &str,
176    ) -> Result<SnapshotHandle>;
177
178    async fn open_dataset_head(
179        &self,
180        dataset_uri: &str,
181        branch: Option<&str>,
182    ) -> Result<SnapshotHandle>;
183
184    async fn open_dataset_head_for_write(
185        &self,
186        table_key: &str,
187        dataset_uri: &str,
188        branch: Option<&str>,
189    ) -> Result<SnapshotHandle>;
190
191    async fn open_dataset_at_state(
192        &self,
193        table_path: &str,
194        branch: Option<&str>,
195        version: u64,
196    ) -> Result<SnapshotHandle>;
197
198    async fn fork_branch_from_state(
199        &self,
200        dataset_uri: &str,
201        source_branch: Option<&str>,
202        table_key: &str,
203        source_version: u64,
204        target_branch: &str,
205    ) -> Result<SnapshotHandle>;
206
207    async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
208
209    async fn reopen_for_mutation(
210        &self,
211        dataset_uri: &str,
212        branch: Option<&str>,
213        table_key: &str,
214        expected_version: u64,
215    ) -> Result<SnapshotHandle>;
216
217    fn ensure_expected_version(
218        &self,
219        snapshot: &SnapshotHandle,
220        table_key: &str,
221        expected_version: u64,
222    ) -> Result<()>;
223
224    // ── Reads (no HEAD advance) ────────────────────────────────────────
225
226    async fn scan(
227        &self,
228        snapshot: &SnapshotHandle,
229        projection: Option<&[&str]>,
230        filter: Option<&str>,
231        order_by: Option<Vec<ColumnOrdering>>,
232    ) -> Result<Vec<RecordBatch>>;
233
234    async fn scan_with_row_id(
235        &self,
236        snapshot: &SnapshotHandle,
237        projection: Option<&[&str]>,
238        filter: Option<&str>,
239        order_by: Option<Vec<ColumnOrdering>>,
240        with_row_id: bool,
241    ) -> Result<Vec<RecordBatch>>;
242
243    async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
244
245    async fn scan_batches_for_rewrite(
246        &self,
247        snapshot: &SnapshotHandle,
248    ) -> Result<Vec<RecordBatch>>;
249
250    async fn count_rows(
251        &self,
252        snapshot: &SnapshotHandle,
253        filter: Option<String>,
254    ) -> Result<usize>;
255
256    async fn count_rows_with_staged(
257        &self,
258        snapshot: &SnapshotHandle,
259        staged: &[StagedHandle],
260        filter: Option<String>,
261    ) -> Result<usize>;
262
263    async fn scan_with_staged(
264        &self,
265        snapshot: &SnapshotHandle,
266        staged: &[StagedHandle],
267        projection: Option<&[&str]>,
268        filter: Option<&str>,
269    ) -> Result<Vec<RecordBatch>>;
270
271    async fn scan_with_pending(
272        &self,
273        snapshot: &SnapshotHandle,
274        pending: &[RecordBatch],
275        pending_schema: Option<SchemaRef>,
276        projection: Option<&[&str]>,
277        filter: Option<&str>,
278        key_column: Option<&str>,
279    ) -> Result<Vec<RecordBatch>>;
280
281    async fn first_row_id_for_filter(
282        &self,
283        snapshot: &SnapshotHandle,
284        filter: &str,
285    ) -> Result<Option<u64>>;
286
287    async fn table_state(
288        &self,
289        dataset_uri: &str,
290        snapshot: &SnapshotHandle,
291    ) -> Result<TableState>;
292
293    // ── Staged writes (no HEAD advance) ────────────────────────────────
294
295    async fn stage_append(
296        &self,
297        snapshot: &SnapshotHandle,
298        batch: RecordBatch,
299        prior_stages: &[StagedHandle],
300    ) -> Result<StagedHandle>;
301
302    async fn stage_merge_insert(
303        &self,
304        snapshot: SnapshotHandle,
305        batch: RecordBatch,
306        key_columns: Vec<String>,
307        when_matched: WhenMatched,
308        when_not_matched: WhenNotMatched,
309    ) -> Result<StagedHandle>;
310
311    async fn commit_staged(
312        &self,
313        snapshot: SnapshotHandle,
314        staged: StagedHandle,
315    ) -> Result<SnapshotHandle>;
316
317    /// Stage an overwrite (Operation::Overwrite). MR-793 Phase 2.
318    async fn stage_overwrite(
319        &self,
320        snapshot: &SnapshotHandle,
321        batch: RecordBatch,
322    ) -> Result<StagedHandle>;
323
324    /// Stage a BTREE scalar index build. MR-793 Phase 2.
325    async fn stage_create_btree_index(
326        &self,
327        snapshot: &SnapshotHandle,
328        columns: &[&str],
329    ) -> Result<StagedHandle>;
330
331    /// Stage an INVERTED (FTS) scalar index build. MR-793 Phase 2.
332    async fn stage_create_inverted_index(
333        &self,
334        snapshot: &SnapshotHandle,
335        column: &str,
336    ) -> Result<StagedHandle>;
337
338    // ── Inline-commit residuals (named honestly per MR-793 §3.2) ──────
339    //
340    // These methods advance Lance HEAD as a side effect of writing.
341    // They stay on the trait until the corresponding upstream Lance API
342    // ships:
343    //
344    // * `delete_where` — Lance #6658 (two-phase delete).
345    // * `create_*_index` — `build_index_metadata_from_segments` is
346    //   `pub(crate)` for vector indices in lance-4.0.0; scalar indices
347    //   migrate to staged in MR-793 Phase 2.
348    // * `append_batch`, `merge_insert_batches`, `overwrite_batch` —
349    //   legacy paths that will be demoted to `pub(crate)` in MR-793
350    //   Phase 9 once all engine sites route through the staged
351    //   primitives.
352
353    async fn append_batch(
354        &self,
355        dataset_uri: &str,
356        snapshot: SnapshotHandle,
357        batch: RecordBatch,
358    ) -> Result<(SnapshotHandle, TableState)>;
359
360    async fn merge_insert_batches(
361        &self,
362        dataset_uri: &str,
363        snapshot: SnapshotHandle,
364        batches: Vec<RecordBatch>,
365        key_columns: Vec<String>,
366        when_matched: WhenMatched,
367        when_not_matched: WhenNotMatched,
368    ) -> Result<TableState>;
369
370    async fn overwrite_batch(
371        &self,
372        dataset_uri: &str,
373        snapshot: SnapshotHandle,
374        batch: RecordBatch,
375    ) -> Result<(SnapshotHandle, TableState)>;
376
377    async fn delete_where(
378        &self,
379        dataset_uri: &str,
380        snapshot: SnapshotHandle,
381        filter: &str,
382    ) -> Result<(SnapshotHandle, DeleteState)>;
383
384    async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
385    async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
386    async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
387
388    async fn create_btree_index(
389        &self,
390        snapshot: SnapshotHandle,
391        columns: &[&str],
392    ) -> Result<SnapshotHandle>;
393
394    async fn create_inverted_index(
395        &self,
396        snapshot: SnapshotHandle,
397        column: &str,
398    ) -> Result<SnapshotHandle>;
399
400    async fn create_vector_index(
401        &self,
402        snapshot: SnapshotHandle,
403        column: &str,
404    ) -> Result<SnapshotHandle>;
405
406    // ── URI helpers ────────────────────────────────────────────────────
407    //
408    // These are pure string formatting; they live on the trait so engine
409    // code holding `Arc<dyn TableStorage>` can compute dataset URIs
410    // without importing the concrete struct.
411
412    fn root_uri(&self) -> &str;
413    fn dataset_uri(&self, table_path: &str) -> String;
414
415    // ── Streaming access (used by the export path) ────────────────────
416    //
417    // Engine code that needs a `DatasetRecordBatchStream` (rather than a
418    // collected `Vec<RecordBatch>`) goes through this trait method.
419    // Useful for the JSONL exporter that streams rows to a writer
420    // without materializing the whole result.
421
422    async fn scan_stream(
423        &self,
424        snapshot: &SnapshotHandle,
425        projection: Option<&[&str]>,
426        filter: Option<&str>,
427        order_by: Option<Vec<ColumnOrdering>>,
428        with_row_id: bool,
429    ) -> Result<DatasetRecordBatchStream>;
430}
431
432// ─── single impl: TableStore ──────────────────────────────────────────────
433
434#[async_trait]
435impl TableStorage for TableStore {
436    async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
437        self.open_at_entry(entry).await.map(SnapshotHandle::new)
438    }
439
440    async fn open_snapshot_at_table(
441        &self,
442        snapshot: &Snapshot,
443        table_key: &str,
444    ) -> Result<SnapshotHandle> {
445        self.open_snapshot_table(snapshot, table_key)
446            .await
447            .map(SnapshotHandle::new)
448    }
449
450    async fn open_dataset_head(
451        &self,
452        dataset_uri: &str,
453        branch: Option<&str>,
454    ) -> Result<SnapshotHandle> {
455        TableStore::open_dataset_head(self, dataset_uri, branch)
456            .await
457            .map(SnapshotHandle::new)
458    }
459
460    async fn open_dataset_head_for_write(
461        &self,
462        table_key: &str,
463        dataset_uri: &str,
464        branch: Option<&str>,
465    ) -> Result<SnapshotHandle> {
466        TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
467            .await
468            .map(SnapshotHandle::new)
469    }
470
471    async fn open_dataset_at_state(
472        &self,
473        table_path: &str,
474        branch: Option<&str>,
475        version: u64,
476    ) -> Result<SnapshotHandle> {
477        TableStore::open_dataset_at_state(self, table_path, branch, version)
478            .await
479            .map(SnapshotHandle::new)
480    }
481
482    async fn fork_branch_from_state(
483        &self,
484        dataset_uri: &str,
485        source_branch: Option<&str>,
486        table_key: &str,
487        source_version: u64,
488        target_branch: &str,
489    ) -> Result<SnapshotHandle> {
490        TableStore::fork_branch_from_state(
491            self,
492            dataset_uri,
493            source_branch,
494            table_key,
495            source_version,
496            target_branch,
497        )
498        .await
499        .map(SnapshotHandle::new)
500    }
501
502    async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
503        TableStore::delete_branch(self, dataset_uri, branch).await
504    }
505
506    async fn reopen_for_mutation(
507        &self,
508        dataset_uri: &str,
509        branch: Option<&str>,
510        table_key: &str,
511        expected_version: u64,
512    ) -> Result<SnapshotHandle> {
513        TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
514            .await
515            .map(SnapshotHandle::new)
516    }
517
518    fn ensure_expected_version(
519        &self,
520        snapshot: &SnapshotHandle,
521        table_key: &str,
522        expected_version: u64,
523    ) -> Result<()> {
524        TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
525    }
526
527    async fn scan(
528        &self,
529        snapshot: &SnapshotHandle,
530        projection: Option<&[&str]>,
531        filter: Option<&str>,
532        order_by: Option<Vec<ColumnOrdering>>,
533    ) -> Result<Vec<RecordBatch>> {
534        TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
535    }
536
537    async fn scan_with_row_id(
538        &self,
539        snapshot: &SnapshotHandle,
540        projection: Option<&[&str]>,
541        filter: Option<&str>,
542        order_by: Option<Vec<ColumnOrdering>>,
543        with_row_id: bool,
544    ) -> Result<Vec<RecordBatch>> {
545        TableStore::scan_with(
546            self,
547            snapshot.dataset(),
548            projection,
549            filter,
550            order_by,
551            with_row_id,
552            |_| Ok(()),
553        )
554        .await
555    }
556
557    async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
558        TableStore::scan_batches(self, snapshot.dataset()).await
559    }
560
561    async fn scan_batches_for_rewrite(
562        &self,
563        snapshot: &SnapshotHandle,
564    ) -> Result<Vec<RecordBatch>> {
565        TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
566    }
567
568    async fn count_rows(
569        &self,
570        snapshot: &SnapshotHandle,
571        filter: Option<String>,
572    ) -> Result<usize> {
573        TableStore::count_rows(self, snapshot.dataset(), filter).await
574    }
575
576    async fn count_rows_with_staged(
577        &self,
578        snapshot: &SnapshotHandle,
579        staged: &[StagedHandle],
580        filter: Option<String>,
581    ) -> Result<usize> {
582        let staged_writes = staged_handles_as_writes(staged);
583        TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
584    }
585
586    async fn scan_with_staged(
587        &self,
588        snapshot: &SnapshotHandle,
589        staged: &[StagedHandle],
590        projection: Option<&[&str]>,
591        filter: Option<&str>,
592    ) -> Result<Vec<RecordBatch>> {
593        let staged_writes = staged_handles_as_writes(staged);
594        TableStore::scan_with_staged(
595            self,
596            snapshot.dataset(),
597            &staged_writes,
598            projection,
599            filter,
600        )
601        .await
602    }
603
604    async fn scan_with_pending(
605        &self,
606        snapshot: &SnapshotHandle,
607        pending: &[RecordBatch],
608        pending_schema: Option<SchemaRef>,
609        projection: Option<&[&str]>,
610        filter: Option<&str>,
611        key_column: Option<&str>,
612    ) -> Result<Vec<RecordBatch>> {
613        TableStore::scan_with_pending(
614            self,
615            snapshot.dataset(),
616            pending,
617            pending_schema,
618            projection,
619            filter,
620            key_column,
621        )
622        .await
623    }
624
625    async fn first_row_id_for_filter(
626        &self,
627        snapshot: &SnapshotHandle,
628        filter: &str,
629    ) -> Result<Option<u64>> {
630        TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
631    }
632
633    async fn table_state(
634        &self,
635        dataset_uri: &str,
636        snapshot: &SnapshotHandle,
637    ) -> Result<TableState> {
638        TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
639    }
640
641    async fn stage_append(
642        &self,
643        snapshot: &SnapshotHandle,
644        batch: RecordBatch,
645        prior_stages: &[StagedHandle],
646    ) -> Result<StagedHandle> {
647        let staged_writes = staged_handles_as_writes(prior_stages);
648        TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
649            .await
650            .map(StagedHandle::new)
651    }
652
653    async fn stage_merge_insert(
654        &self,
655        snapshot: SnapshotHandle,
656        batch: RecordBatch,
657        key_columns: Vec<String>,
658        when_matched: WhenMatched,
659        when_not_matched: WhenNotMatched,
660    ) -> Result<StagedHandle> {
661        let ds = Arc::try_unwrap(snapshot.into_arc())
662            .unwrap_or_else(|arc| (*arc).clone());
663        TableStore::stage_merge_insert(
664            self,
665            ds,
666            batch,
667            key_columns,
668            when_matched,
669            when_not_matched,
670        )
671        .await
672        .map(StagedHandle::new)
673    }
674
675    async fn commit_staged(
676        &self,
677        snapshot: SnapshotHandle,
678        staged: StagedHandle,
679    ) -> Result<SnapshotHandle> {
680        let ds_arc = snapshot.into_arc();
681        let transaction = staged.into_staged().transaction;
682        TableStore::commit_staged(self, ds_arc, transaction)
683            .await
684            .map(SnapshotHandle::new)
685    }
686
687    async fn stage_overwrite(
688        &self,
689        snapshot: &SnapshotHandle,
690        batch: RecordBatch,
691    ) -> Result<StagedHandle> {
692        TableStore::stage_overwrite(self, snapshot.dataset(), batch)
693            .await
694            .map(StagedHandle::new)
695    }
696
697    async fn stage_create_btree_index(
698        &self,
699        snapshot: &SnapshotHandle,
700        columns: &[&str],
701    ) -> Result<StagedHandle> {
702        TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
703            .await
704            .map(StagedHandle::new)
705    }
706
707    async fn stage_create_inverted_index(
708        &self,
709        snapshot: &SnapshotHandle,
710        column: &str,
711    ) -> Result<StagedHandle> {
712        TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
713            .await
714            .map(StagedHandle::new)
715    }
716
717    async fn append_batch(
718        &self,
719        dataset_uri: &str,
720        snapshot: SnapshotHandle,
721        batch: RecordBatch,
722    ) -> Result<(SnapshotHandle, TableState)> {
723        let mut ds = Arc::try_unwrap(snapshot.into_arc())
724            .unwrap_or_else(|arc| (*arc).clone());
725        let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?;
726        Ok((SnapshotHandle::new(ds), state))
727    }
728
729    async fn merge_insert_batches(
730        &self,
731        dataset_uri: &str,
732        snapshot: SnapshotHandle,
733        batches: Vec<RecordBatch>,
734        key_columns: Vec<String>,
735        when_matched: WhenMatched,
736        when_not_matched: WhenNotMatched,
737    ) -> Result<TableState> {
738        let ds = Arc::try_unwrap(snapshot.into_arc())
739            .unwrap_or_else(|arc| (*arc).clone());
740        TableStore::merge_insert_batches(
741            self,
742            dataset_uri,
743            ds,
744            batches,
745            key_columns,
746            when_matched,
747            when_not_matched,
748        )
749        .await
750    }
751
752    async fn overwrite_batch(
753        &self,
754        dataset_uri: &str,
755        snapshot: SnapshotHandle,
756        batch: RecordBatch,
757    ) -> Result<(SnapshotHandle, TableState)> {
758        let mut ds = Arc::try_unwrap(snapshot.into_arc())
759            .unwrap_or_else(|arc| (*arc).clone());
760        let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
761        Ok((SnapshotHandle::new(ds), state))
762    }
763
764    async fn delete_where(
765        &self,
766        dataset_uri: &str,
767        snapshot: SnapshotHandle,
768        filter: &str,
769    ) -> Result<(SnapshotHandle, DeleteState)> {
770        let mut ds = Arc::try_unwrap(snapshot.into_arc())
771            .unwrap_or_else(|arc| (*arc).clone());
772        let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
773        Ok((SnapshotHandle::new(ds), state))
774    }
775
776    async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
777        TableStore::has_btree_index(self, snapshot.dataset(), column).await
778    }
779
780    async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
781        TableStore::has_fts_index(self, snapshot.dataset(), column).await
782    }
783
784    async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
785        TableStore::has_vector_index(self, snapshot.dataset(), column).await
786    }
787
788    async fn create_btree_index(
789        &self,
790        snapshot: SnapshotHandle,
791        columns: &[&str],
792    ) -> Result<SnapshotHandle> {
793        let mut ds = Arc::try_unwrap(snapshot.into_arc())
794            .unwrap_or_else(|arc| (*arc).clone());
795        TableStore::create_btree_index(self, &mut ds, columns).await?;
796        Ok(SnapshotHandle::new(ds))
797    }
798
799    async fn create_inverted_index(
800        &self,
801        snapshot: SnapshotHandle,
802        column: &str,
803    ) -> Result<SnapshotHandle> {
804        let mut ds = Arc::try_unwrap(snapshot.into_arc())
805            .unwrap_or_else(|arc| (*arc).clone());
806        TableStore::create_inverted_index(self, &mut ds, column).await?;
807        Ok(SnapshotHandle::new(ds))
808    }
809
810    async fn create_vector_index(
811        &self,
812        snapshot: SnapshotHandle,
813        column: &str,
814    ) -> Result<SnapshotHandle> {
815        let mut ds = Arc::try_unwrap(snapshot.into_arc())
816            .unwrap_or_else(|arc| (*arc).clone());
817        TableStore::create_vector_index(self, &mut ds, column).await?;
818        Ok(SnapshotHandle::new(ds))
819    }
820
821    fn root_uri(&self) -> &str {
822        TableStore::root_uri(self)
823    }
824
825    fn dataset_uri(&self, table_path: &str) -> String {
826        TableStore::dataset_uri(self, table_path)
827    }
828
829    async fn scan_stream(
830        &self,
831        snapshot: &SnapshotHandle,
832        projection: Option<&[&str]>,
833        filter: Option<&str>,
834        order_by: Option<Vec<ColumnOrdering>>,
835        with_row_id: bool,
836    ) -> Result<DatasetRecordBatchStream> {
837        // Note: existing TableStore::scan_stream is an associated fn that
838        // takes &Dataset, so we delegate via the dataset reference held by
839        // the snapshot.
840        TableStore::scan_stream(snapshot.dataset(), projection, filter, order_by, with_row_id).await
841    }
842}