Skip to main content

omnigraph/
storage_layer.rs

1//! Storage trait surface — MR-793.
2//!
3//! `TableStorage` is the engine-internal trait that exposes the
4//! staged-write primitives (`stage_append`, `stage_merge_insert`,
5//! `stage_overwrite`, `stage_create_btree_index`,
6//! `stage_create_inverted_index`) plus `commit_staged` as the canonical
7//! way for new engine writers to advance Lance HEAD without coupling
8//! "write bytes" with "advance HEAD" in one Lance API call.
9//!
10//! ## Inline-commit residuals live on a separate trait
11//!
12//! The inline-commit writes that Lance cannot yet express as
13//! stage-then-commit are NOT on `TableStorage`. They sit on
14//! [`InlineCommitResidual`], reachable only via
15//! `Omnigraph::storage_inline_residual()`, so the default `db.storage()`
16//! surface is staged-only and cannot couple "write bytes" with "advance
17//! HEAD" — MR-793 acceptance §1 closes by construction. The residuals:
18//!
19//! * `delete_where` — Lance #6658 (`DeleteBuilder::execute_uncommitted`)
20//!   did not backport to the 6.x line; it first ships in `v7.0.0-beta.10`.
21//!   Migration to staged two-phase delete is tracked as MR-A, gated on the
22//!   Lance v7.x bump.
23//! * `create_vector_index` — segment-commit-path needs
24//!   `build_index_metadata_from_segments`, still `pub(crate)` in Lance
25//!   6.0.1 ([#6666](https://github.com/lance-format/lance/issues/6666),
26//!   open). Scalar indices already stage.
27//!
28//! Each is named honestly at its call site; the forbidden-API guard test
29//! catches direct lance::* misuse outside the storage layer.
30//!
31//! ## Sealed
32//!
33//! Both `TableStorage` and `InlineCommitResidual` are `: sealed::Sealed`.
34//! Only types in this crate can implement them, so a downstream crate
35//! cannot subvert the contract by providing its own impl.
36//!
37//! ## Opaque handles
38//!
39//! `SnapshotHandle` and `StagedHandle` wrap `lance::Dataset` and
40//! `StagedWrite` respectively. Their inner Lance types are
41//! `pub(crate)` — engine code outside `table_store` cannot reach
42//! through. This aligns with the storage-boundary invariant:
43//! `lance::Dataset` does not appear in trait signatures.
44//!
45//! ## Migration status
46//!
47//! Phases 1a / 2 / 4 / 5 / 6 landed in MR-793 PR #70 (trait scaffolding,
48//! staged primitives, migration of `ensure_indices` / `branch_merge` /
49//! `schema_apply` onto the staged surface). Phase 1b (call-site
50//! conversion) and Phase 9 landed in MR-854, which also split the
51//! inline-commit residuals onto `InlineCommitResidual` so `db.storage()`
52//! is staged-only. Phase 7 (recovery reconciler) shipped as MR-847;
53//! Phase 8 (index reconciler) is tracked as MR-848.
54
55use std::fmt::Debug;
56use std::sync::Arc;
57
58use arrow_array::RecordBatch;
59use arrow_schema::SchemaRef;
60use async_trait::async_trait;
61use lance::Dataset;
62use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
63use lance::dataset::{WhenMatched, WhenNotMatched};
64
65use crate::db::{Snapshot, SubTableEntry};
66use crate::error::Result;
67use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
68
69// ─── sealed module ──────────────────────────────────────────────────────────
70
71pub(crate) mod sealed {
72    /// Sealed marker — only types defined in `omnigraph-engine` can
73    /// implement `TableStorage`. Combined with the trait being the only
74    /// route to write APIs from engine code, this gives type-system
75    /// enforcement of the staged-write invariant.
76    pub trait Sealed {}
77
78    impl Sealed for crate::table_store::TableStore {}
79}
80
81// ─── opaque handles ────────────────────────────────────────────────────────
82
83/// Opaque handle to a snapshot of a single sub-table dataset at a
84/// specific version.
85///
86/// Engine code never sees `lance::Dataset` directly; it holds
87/// `SnapshotHandle` and passes it back to `TableStorage` methods.
88/// Inside this crate, `pub(crate)` accessors expose the inner
89/// `Arc<Dataset>` to the `TableStorage` impl.
90#[derive(Debug, Clone)]
91pub struct SnapshotHandle {
92    pub(crate) inner: Arc<Dataset>,
93}
94
95impl SnapshotHandle {
96    /// Construct from a Lance dataset. `pub(crate)` — only
97    /// `TableStore` should produce these.
98    pub(crate) fn new(ds: Dataset) -> Self {
99        Self {
100            inner: Arc::new(ds),
101        }
102    }
103
104    /// Borrow the underlying Lance dataset. `pub(crate)` so only the
105    /// `TableStorage` impl in this crate can reach through.
106    pub(crate) fn dataset(&self) -> &Dataset {
107        &self.inner
108    }
109
110    /// Take ownership of the inner `Arc<Dataset>`. Used by the
111    /// `TableStorage` impl when an op needs to mutate the dataset in
112    /// place (commit a staged write, append, overwrite, …).
113    ///
114    /// Performance note: callers consume the returned `Arc` via
115    /// `Arc::try_unwrap(...).unwrap_or_else(|arc| (*arc).clone())`. The
116    /// fast path (no clone) only fires when the snapshot is single-ref
117    /// — i.e. the caller dropped every other `SnapshotHandle` clone
118    /// before calling. Holding parallel clones (e.g. across an `await`
119    /// point or stashed in a struct) forces a deep `Dataset` clone on
120    /// every mutating op. Engine callers should pass `SnapshotHandle`
121    /// by value into the mutating method, not keep a side copy.
122    pub(crate) fn into_arc(self) -> Arc<Dataset> {
123        self.inner
124    }
125
126    /// Take ownership of the inner `Dataset` by unwrapping the `Arc`
127    /// (or cloning if the snapshot is shared). `pub(crate)` — used
128    /// only by the maintenance path (`optimize`, `cleanup`) which
129    /// must hand `&mut Dataset` to Lance compaction / cleanup APIs
130    /// that the `TableStorage` trait does not (and should not)
131    /// surface. Engine code that participates in the staged-write
132    /// invariant must stay on the trait methods.
133    ///
134    /// Single-ref invariant: same fast-path/clone behavior as
135    /// `into_arc` — see that method's doc. Drop sibling
136    /// `SnapshotHandle` clones before calling.
137    pub(crate) fn into_dataset(self) -> Dataset {
138        Arc::try_unwrap(self.inner).unwrap_or_else(|arc| (*arc).clone())
139    }
140
141    // ── public, lance-free accessors ──
142
143    /// Current Lance manifest version of the snapshot.
144    pub fn version(&self) -> u64 {
145        self.inner.version().version
146    }
147
148    /// Whether the underlying dataset uses stable row IDs.
149    pub fn uses_stable_row_ids(&self) -> bool {
150        self.inner.manifest.uses_stable_row_ids()
151    }
152}
153
154/// Opaque handle to a staged Lance transaction (data write or scalar
155/// index build) that has not yet advanced HEAD.
156///
157/// Produced by `TableStorage::stage_*`, consumed by
158/// `TableStorage::commit_staged`. Carries the underlying `StagedWrite`
159/// (transaction + read-your-writes deltas) behind `pub(crate)`.
160#[derive(Debug, Clone)]
161pub struct StagedHandle {
162    pub(crate) inner: StagedWrite,
163}
164
165impl StagedHandle {
166    pub(crate) fn new(staged: StagedWrite) -> Self {
167        Self { inner: staged }
168    }
169
170    /// Take ownership of the inner `StagedWrite`. Used by
171    /// `commit_staged`.
172    pub(crate) fn into_staged(self) -> StagedWrite {
173        self.inner
174    }
175}
176
177/// Helper: clone the inner `StagedWrite` out of each `StagedHandle` and
178/// collect into a `Vec<StagedWrite>` for handing to
179/// `TableStore::stage_append`'s `prior_stages` parameter. The result is
180/// owned (not borrowed) — callers that already had a `&[StagedHandle]`
181/// pay a clone cost per element. `StagedWrite::clone` is cheap because
182/// `Transaction` and `Vec<Fragment>` are shallow-clone friendly.
183pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
184    handles.iter().map(|h| h.inner.clone()).collect()
185}
186
187/// Outcome of a per-table branch fork (`fork_branch_from_state`).
188///
189/// `RefAlreadyExists` means a Lance branch ref for the target already exists
190/// on the dataset, so `create_branch` could not create it cleanly. By the
191/// fork caller's contract — the caller re-checks the live manifest under the
192/// held per-`(table, branch)` write queue and only forks when the manifest
193/// does *not* place the table on the branch — such a ref is a
194/// manifest-unreferenced fork (the residue of an interrupted prior fork, or a
195/// delete+recreate), which the caller reclaims and re-forks. The fork
196/// operation does not editorialize ("incomplete prior delete"); it returns
197/// this typed signal and lets the db layer decide.
198// `pub` (not `pub(crate)`) to match the visibility of the sealed
199// `TableStorage::fork_branch_from_state` that returns it (and the already-`pub`
200// `SnapshotHandle`); avoids a private-interfaces warning. The trait is sealed,
201// so this widening does not let external code construct or branch on it.
202pub enum ForkOutcome<D> {
203    Created(D),
204    RefAlreadyExists,
205}
206
207// ─── TableStorage trait ────────────────────────────────────────────────────
208
209/// Engine-internal trait covering every Lance dataset operation an
210/// `omnigraph` engine call site might perform.
211///
212/// `TableStore` is the only `impl`. The trait is sealed; the inline
213/// Lance APIs are not reachable through trait dispatch. New writers that
214/// might advance Lance HEAD MUST add a staged-shape method here.
215#[async_trait]
216pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
217    // ── Snapshot opens (no HEAD advance) ────────────────────────────────
218
219    async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
220
221    async fn open_snapshot_at_table(
222        &self,
223        snapshot: &Snapshot,
224        table_key: &str,
225    ) -> Result<SnapshotHandle>;
226
227    async fn open_dataset_head(
228        &self,
229        dataset_uri: &str,
230        branch: Option<&str>,
231    ) -> Result<SnapshotHandle>;
232
233    async fn open_dataset_head_for_write(
234        &self,
235        table_key: &str,
236        dataset_uri: &str,
237        branch: Option<&str>,
238    ) -> Result<SnapshotHandle>;
239
240    async fn open_dataset_at_state(
241        &self,
242        table_path: &str,
243        branch: Option<&str>,
244        version: u64,
245    ) -> Result<SnapshotHandle>;
246
247    async fn fork_branch_from_state(
248        &self,
249        dataset_uri: &str,
250        source_branch: Option<&str>,
251        table_key: &str,
252        source_version: u64,
253        target_branch: &str,
254    ) -> Result<ForkOutcome<SnapshotHandle>>;
255
256    async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
257
258    /// Idempotent variant of `delete_branch` used by the best-effort fork
259    /// reclaim under branch delete (`db/omnigraph.rs::cleanup_deleted_branch_tables`)
260    /// and by the orphan-fork reconciler in `optimize`. Tolerates an
261    /// already-absent branch (both Lance's `RefNotFound` and the local-store
262    /// `NotFound` quirk on a missing `tree/{branch}/` dir). A still-referenced
263    /// branch (`RefConflict`) still surfaces as `OmniError::Lance`.
264    async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
265
266    /// List the named Lance branches present on the dataset at `dataset_uri`.
267    /// The `cleanup` orphan reconciler diffs this against the manifest
268    /// branch set to find orphaned per-table forks. `main`/default is not a
269    /// named branch and never appears here.
270    async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>>;
271
272    async fn reopen_for_mutation(
273        &self,
274        dataset_uri: &str,
275        branch: Option<&str>,
276        table_key: &str,
277        expected_version: u64,
278    ) -> Result<SnapshotHandle>;
279
280    fn ensure_expected_version(
281        &self,
282        snapshot: &SnapshotHandle,
283        table_key: &str,
284        expected_version: u64,
285    ) -> Result<()>;
286
287    // ── Reads (no HEAD advance) ────────────────────────────────────────
288
289    async fn scan(
290        &self,
291        snapshot: &SnapshotHandle,
292        projection: Option<&[&str]>,
293        filter: Option<&str>,
294        order_by: Option<Vec<ColumnOrdering>>,
295    ) -> Result<Vec<RecordBatch>>;
296
297    async fn scan_with_row_id(
298        &self,
299        snapshot: &SnapshotHandle,
300        projection: Option<&[&str]>,
301        filter: Option<&str>,
302        order_by: Option<Vec<ColumnOrdering>>,
303        with_row_id: bool,
304    ) -> Result<Vec<RecordBatch>>;
305
306    async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
307
308    async fn scan_batches_for_rewrite(&self, snapshot: &SnapshotHandle)
309    -> Result<Vec<RecordBatch>>;
310
311    async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize>;
312
313    async fn count_rows_with_staged(
314        &self,
315        snapshot: &SnapshotHandle,
316        staged: &[StagedHandle],
317        filter: Option<String>,
318    ) -> Result<usize>;
319
320    async fn scan_with_staged(
321        &self,
322        snapshot: &SnapshotHandle,
323        staged: &[StagedHandle],
324        projection: Option<&[&str]>,
325        filter: Option<&str>,
326    ) -> Result<Vec<RecordBatch>>;
327
328    async fn scan_with_pending(
329        &self,
330        snapshot: &SnapshotHandle,
331        pending: &[RecordBatch],
332        pending_schema: Option<SchemaRef>,
333        projection: Option<&[&str]>,
334        filter: Option<&str>,
335        key_column: Option<&str>,
336    ) -> Result<Vec<RecordBatch>>;
337
338    async fn first_row_id_for_filter(
339        &self,
340        snapshot: &SnapshotHandle,
341        filter: &str,
342    ) -> Result<Option<u64>>;
343
344    async fn table_state(&self, dataset_uri: &str, snapshot: &SnapshotHandle)
345    -> Result<TableState>;
346
347    // ── Staged writes (no HEAD advance) ────────────────────────────────
348
349    async fn stage_append(
350        &self,
351        snapshot: &SnapshotHandle,
352        batch: RecordBatch,
353        prior_stages: &[StagedHandle],
354    ) -> Result<StagedHandle>;
355
356    async fn stage_merge_insert(
357        &self,
358        snapshot: SnapshotHandle,
359        batch: RecordBatch,
360        key_columns: Vec<String>,
361        when_matched: WhenMatched,
362        when_not_matched: WhenNotMatched,
363    ) -> Result<StagedHandle>;
364
365    async fn commit_staged(
366        &self,
367        snapshot: SnapshotHandle,
368        staged: StagedHandle,
369    ) -> Result<SnapshotHandle>;
370
371    /// Stage an overwrite (Operation::Overwrite). MR-793 Phase 2.
372    async fn stage_overwrite(
373        &self,
374        snapshot: &SnapshotHandle,
375        batch: RecordBatch,
376    ) -> Result<StagedHandle>;
377
378    /// Stage a BTREE scalar index build. MR-793 Phase 2.
379    async fn stage_create_btree_index(
380        &self,
381        snapshot: &SnapshotHandle,
382        columns: &[&str],
383    ) -> Result<StagedHandle>;
384
385    /// Stage an INVERTED (FTS) scalar index build. MR-793 Phase 2.
386    async fn stage_create_inverted_index(
387        &self,
388        snapshot: &SnapshotHandle,
389        column: &str,
390    ) -> Result<StagedHandle>;
391
392    // ── Index presence (reads, no HEAD advance) ──────────────────────
393    //
394    // The inline-commit writes (`delete_where`, `create_vector_index`) are
395    // deliberately NOT on this trait. They live on
396    // the separate `InlineCommitResidual` trait, reachable only through
397    // `Omnigraph::storage_inline_residual()`. As a result the default
398    // `db.storage()` surface cannot couple "write bytes" with "advance HEAD"
399    // — closing MR-793 acceptance §1 by construction rather than by review.
400
401    async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
402    async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
403    async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
404
405    // ── URI helpers ────────────────────────────────────────────────────
406    //
407    // These are pure string formatting; they live on the trait so engine
408    // code holding `Arc<dyn TableStorage>` can compute dataset URIs
409    // without importing the concrete struct.
410
411    fn root_uri(&self) -> &str;
412    fn dataset_uri(&self, table_path: &str) -> String;
413
414    // ── Streaming access (used by the export path) ────────────────────
415    //
416    // Engine code that needs a `DatasetRecordBatchStream` (rather than a
417    // collected `Vec<RecordBatch>`) goes through this trait method.
418    // Useful for the JSONL exporter that streams rows to a writer
419    // without materializing the whole result.
420
421    async fn scan_stream(
422        &self,
423        snapshot: &SnapshotHandle,
424        projection: Option<&[&str]>,
425        filter: Option<&str>,
426        order_by: Option<Vec<ColumnOrdering>>,
427        with_row_id: bool,
428    ) -> Result<DatasetRecordBatchStream>;
429}
430
431// ─── InlineCommitResidual trait ────────────────────────────────────────────
432
433/// Inline-commit residual surface: the writes Lance cannot yet express as a
434/// stage-then-commit pair, so they advance Lance HEAD as a side effect of
435/// writing. Kept OFF `TableStorage` and reachable only through
436/// `Omnigraph::storage_inline_residual()`, so the default `db.storage()` path
437/// is staged-only and a new writer cannot reintroduce the write+commit coupling
438/// by accident (MR-793 acceptance §1, by construction).
439///
440/// Residual reasons (each is named honestly at its call site):
441/// * `delete_where` — Lance has no public two-phase delete on the 6.x line
442///   (`DeleteBuilder::execute_uncommitted` first ships in v7.x; MR-A / Lance
443///   #6658). The D2 parse-time rule + recovery sidecars cover the gap meanwhile.
444/// * `create_vector_index` — vector-index segment-commit needs
445///   `build_index_metadata_from_segments`, still `pub(crate)` in Lance 6.0.1
446///   (Lance #6666). Scalar indices already stage.
447#[async_trait]
448pub(crate) trait InlineCommitResidual: sealed::Sealed + Send + Sync + Debug {
449    async fn delete_where(
450        &self,
451        dataset_uri: &str,
452        snapshot: SnapshotHandle,
453        filter: &str,
454    ) -> Result<(SnapshotHandle, DeleteState)>;
455
456    async fn create_vector_index(
457        &self,
458        snapshot: SnapshotHandle,
459        column: &str,
460    ) -> Result<SnapshotHandle>;
461}
462
463// ─── single impl: TableStore ──────────────────────────────────────────────
464
465#[async_trait]
466impl TableStorage for TableStore {
467    async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
468        self.open_at_entry(entry).await.map(SnapshotHandle::new)
469    }
470
471    async fn open_snapshot_at_table(
472        &self,
473        snapshot: &Snapshot,
474        table_key: &str,
475    ) -> Result<SnapshotHandle> {
476        self.open_snapshot_table(snapshot, table_key)
477            .await
478            .map(SnapshotHandle::new)
479    }
480
481    async fn open_dataset_head(
482        &self,
483        dataset_uri: &str,
484        branch: Option<&str>,
485    ) -> Result<SnapshotHandle> {
486        TableStore::open_dataset_head(self, dataset_uri, branch)
487            .await
488            .map(SnapshotHandle::new)
489    }
490
491    async fn open_dataset_head_for_write(
492        &self,
493        table_key: &str,
494        dataset_uri: &str,
495        branch: Option<&str>,
496    ) -> Result<SnapshotHandle> {
497        TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
498            .await
499            .map(SnapshotHandle::new)
500    }
501
502    async fn open_dataset_at_state(
503        &self,
504        table_path: &str,
505        branch: Option<&str>,
506        version: u64,
507    ) -> Result<SnapshotHandle> {
508        TableStore::open_dataset_at_state(self, table_path, branch, version)
509            .await
510            .map(SnapshotHandle::new)
511    }
512
513    async fn fork_branch_from_state(
514        &self,
515        dataset_uri: &str,
516        source_branch: Option<&str>,
517        table_key: &str,
518        source_version: u64,
519        target_branch: &str,
520    ) -> Result<ForkOutcome<SnapshotHandle>> {
521        Ok(
522            match TableStore::fork_branch_from_state(
523                self,
524                dataset_uri,
525                source_branch,
526                table_key,
527                source_version,
528                target_branch,
529            )
530            .await?
531            {
532                ForkOutcome::Created(ds) => ForkOutcome::Created(SnapshotHandle::new(ds)),
533                ForkOutcome::RefAlreadyExists => ForkOutcome::RefAlreadyExists,
534            },
535        )
536    }
537
538    async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
539        TableStore::delete_branch(self, dataset_uri, branch).await
540    }
541
542    async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
543        TableStore::force_delete_branch(self, dataset_uri, branch).await
544    }
545
546    async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>> {
547        TableStore::list_branches(self, dataset_uri).await
548    }
549
550    async fn reopen_for_mutation(
551        &self,
552        dataset_uri: &str,
553        branch: Option<&str>,
554        table_key: &str,
555        expected_version: u64,
556    ) -> Result<SnapshotHandle> {
557        TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
558            .await
559            .map(SnapshotHandle::new)
560    }
561
562    fn ensure_expected_version(
563        &self,
564        snapshot: &SnapshotHandle,
565        table_key: &str,
566        expected_version: u64,
567    ) -> Result<()> {
568        TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
569    }
570
571    async fn scan(
572        &self,
573        snapshot: &SnapshotHandle,
574        projection: Option<&[&str]>,
575        filter: Option<&str>,
576        order_by: Option<Vec<ColumnOrdering>>,
577    ) -> Result<Vec<RecordBatch>> {
578        TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
579    }
580
581    async fn scan_with_row_id(
582        &self,
583        snapshot: &SnapshotHandle,
584        projection: Option<&[&str]>,
585        filter: Option<&str>,
586        order_by: Option<Vec<ColumnOrdering>>,
587        with_row_id: bool,
588    ) -> Result<Vec<RecordBatch>> {
589        TableStore::scan_with(
590            self,
591            snapshot.dataset(),
592            projection,
593            filter,
594            order_by,
595            with_row_id,
596            |_| Ok(()),
597        )
598        .await
599    }
600
601    async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
602        TableStore::scan_batches(self, snapshot.dataset()).await
603    }
604
605    async fn scan_batches_for_rewrite(
606        &self,
607        snapshot: &SnapshotHandle,
608    ) -> Result<Vec<RecordBatch>> {
609        TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
610    }
611
612    async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize> {
613        TableStore::count_rows(self, snapshot.dataset(), filter).await
614    }
615
616    async fn count_rows_with_staged(
617        &self,
618        snapshot: &SnapshotHandle,
619        staged: &[StagedHandle],
620        filter: Option<String>,
621    ) -> Result<usize> {
622        let staged_writes = staged_handles_as_writes(staged);
623        TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
624    }
625
626    async fn scan_with_staged(
627        &self,
628        snapshot: &SnapshotHandle,
629        staged: &[StagedHandle],
630        projection: Option<&[&str]>,
631        filter: Option<&str>,
632    ) -> Result<Vec<RecordBatch>> {
633        let staged_writes = staged_handles_as_writes(staged);
634        TableStore::scan_with_staged(self, snapshot.dataset(), &staged_writes, projection, filter)
635            .await
636    }
637
638    async fn scan_with_pending(
639        &self,
640        snapshot: &SnapshotHandle,
641        pending: &[RecordBatch],
642        pending_schema: Option<SchemaRef>,
643        projection: Option<&[&str]>,
644        filter: Option<&str>,
645        key_column: Option<&str>,
646    ) -> Result<Vec<RecordBatch>> {
647        TableStore::scan_with_pending(
648            self,
649            snapshot.dataset(),
650            pending,
651            pending_schema,
652            projection,
653            filter,
654            key_column,
655        )
656        .await
657    }
658
659    async fn first_row_id_for_filter(
660        &self,
661        snapshot: &SnapshotHandle,
662        filter: &str,
663    ) -> Result<Option<u64>> {
664        TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
665    }
666
667    async fn table_state(
668        &self,
669        dataset_uri: &str,
670        snapshot: &SnapshotHandle,
671    ) -> Result<TableState> {
672        TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
673    }
674
675    async fn stage_append(
676        &self,
677        snapshot: &SnapshotHandle,
678        batch: RecordBatch,
679        prior_stages: &[StagedHandle],
680    ) -> Result<StagedHandle> {
681        let staged_writes = staged_handles_as_writes(prior_stages);
682        TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
683            .await
684            .map(StagedHandle::new)
685    }
686
687    async fn stage_merge_insert(
688        &self,
689        snapshot: SnapshotHandle,
690        batch: RecordBatch,
691        key_columns: Vec<String>,
692        when_matched: WhenMatched,
693        when_not_matched: WhenNotMatched,
694    ) -> Result<StagedHandle> {
695        let ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
696        TableStore::stage_merge_insert(self, ds, batch, key_columns, when_matched, when_not_matched)
697            .await
698            .map(StagedHandle::new)
699    }
700
701    async fn commit_staged(
702        &self,
703        snapshot: SnapshotHandle,
704        staged: StagedHandle,
705    ) -> Result<SnapshotHandle> {
706        let ds_arc = snapshot.into_arc();
707        let transaction = staged.into_staged().transaction;
708        TableStore::commit_staged(self, ds_arc, transaction)
709            .await
710            .map(SnapshotHandle::new)
711    }
712
713    async fn stage_overwrite(
714        &self,
715        snapshot: &SnapshotHandle,
716        batch: RecordBatch,
717    ) -> Result<StagedHandle> {
718        TableStore::stage_overwrite(self, snapshot.dataset(), batch)
719            .await
720            .map(StagedHandle::new)
721    }
722
723    async fn stage_create_btree_index(
724        &self,
725        snapshot: &SnapshotHandle,
726        columns: &[&str],
727    ) -> Result<StagedHandle> {
728        TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
729            .await
730            .map(StagedHandle::new)
731    }
732
733    async fn stage_create_inverted_index(
734        &self,
735        snapshot: &SnapshotHandle,
736        column: &str,
737    ) -> Result<StagedHandle> {
738        TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
739            .await
740            .map(StagedHandle::new)
741    }
742
743    async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
744        TableStore::has_btree_index(self, snapshot.dataset(), column).await
745    }
746
747    async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
748        TableStore::has_fts_index(self, snapshot.dataset(), column).await
749    }
750
751    async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
752        TableStore::has_vector_index(self, snapshot.dataset(), column).await
753    }
754
755    fn root_uri(&self) -> &str {
756        TableStore::root_uri(self)
757    }
758
759    fn dataset_uri(&self, table_path: &str) -> String {
760        TableStore::dataset_uri(self, table_path)
761    }
762
763    async fn scan_stream(
764        &self,
765        snapshot: &SnapshotHandle,
766        projection: Option<&[&str]>,
767        filter: Option<&str>,
768        order_by: Option<Vec<ColumnOrdering>>,
769        with_row_id: bool,
770    ) -> Result<DatasetRecordBatchStream> {
771        // Note: existing TableStore::scan_stream is an associated fn that
772        // takes &Dataset, so we delegate via the dataset reference held by
773        // the snapshot.
774        TableStore::scan_stream(
775            snapshot.dataset(),
776            projection,
777            filter,
778            order_by,
779            with_row_id,
780        )
781        .await
782    }
783}
784
785#[async_trait]
786impl InlineCommitResidual for TableStore {
787    async fn delete_where(
788        &self,
789        dataset_uri: &str,
790        snapshot: SnapshotHandle,
791        filter: &str,
792    ) -> Result<(SnapshotHandle, DeleteState)> {
793        let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
794        let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
795        Ok((SnapshotHandle::new(ds), state))
796    }
797
798    async fn create_vector_index(
799        &self,
800        snapshot: SnapshotHandle,
801        column: &str,
802    ) -> Result<SnapshotHandle> {
803        let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
804        TableStore::create_vector_index(self, &mut ds, column).await?;
805        Ok(SnapshotHandle::new(ds))
806    }
807}