Skip to main content

omnigraph/
storage_layer.rs

1//! Storage trait surface — MR-793.
2//!
3//! `TableStorage` is the engine-internal trait that exposes the
4//! staged-write primitives (`stage_append`, `stage_merge_insert`,
5//! `stage_overwrite`, `stage_create_btree_index`,
6//! `stage_create_inverted_index`) plus `commit_staged` as the canonical
7//! way for new engine writers to advance Lance HEAD without coupling
8//! "write bytes" with "advance HEAD" in one Lance API call.
9//!
10//! ## Inline-commit residuals live on a separate trait
11//!
12//! The inline-commit writes that Lance cannot yet express as
13//! stage-then-commit are NOT on `TableStorage`. They sit on
14//! [`InlineCommitResidual`], reachable only via
15//! `Omnigraph::storage_inline_residual()`, so the default `db.storage()`
16//! surface is staged-only and cannot couple "write bytes" with "advance
17//! HEAD" — MR-793 acceptance §1 closes by construction. The residuals:
18//!
19//! * `delete_where` — Lance #6658 (`DeleteBuilder::execute_uncommitted`)
20//!   did not backport to the 6.x line; it first ships in `v7.0.0-beta.10`.
21//!   Migration to staged two-phase delete is tracked as MR-A, gated on the
22//!   Lance v7.x bump.
23//! * `create_vector_index` — segment-commit-path needs
24//!   `build_index_metadata_from_segments`, still `pub(crate)` in Lance
25//!   6.0.1 ([#6666](https://github.com/lance-format/lance/issues/6666),
26//!   open). Scalar indices already stage.
27//!
28//! Each is named honestly at its call site; the forbidden-API guard test
29//! catches direct lance::* misuse outside the storage layer.
30//!
31//! ## Sealed
32//!
33//! Both `TableStorage` and `InlineCommitResidual` are `: sealed::Sealed`.
34//! Only types in this crate can implement them, so a downstream crate
35//! cannot subvert the contract by providing its own impl.
36//!
37//! ## Opaque handles
38//!
39//! `SnapshotHandle` and `StagedHandle` wrap `lance::Dataset` and
40//! `StagedWrite` respectively. Their inner Lance types are
41//! `pub(crate)` — engine code outside `table_store` cannot reach
42//! through. This aligns with the storage-boundary invariant:
43//! `lance::Dataset` does not appear in trait signatures.
44//!
45//! ## Migration status
46//!
47//! Phases 1a / 2 / 4 / 5 / 6 landed in MR-793 PR #70 (trait scaffolding,
48//! staged primitives, migration of `ensure_indices` / `branch_merge` /
49//! `schema_apply` onto the staged surface). Phase 1b (call-site
50//! conversion) and Phase 9 landed in MR-854, which also split the
51//! inline-commit residuals onto `InlineCommitResidual` so `db.storage()`
52//! is staged-only. Phase 7 (recovery reconciler) shipped as MR-847;
53//! Phase 8 (index reconciler) is tracked as MR-848.
54
55use std::fmt::Debug;
56use std::sync::Arc;
57
58use arrow_array::RecordBatch;
59use arrow_schema::SchemaRef;
60use async_trait::async_trait;
61use lance::Dataset;
62use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
63use lance::dataset::{WhenMatched, WhenNotMatched};
64
65use crate::db::{Snapshot, SubTableEntry};
66use crate::error::Result;
67use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
68
69// ─── sealed module ──────────────────────────────────────────────────────────
70
71pub(crate) mod sealed {
72    /// Sealed marker — only types defined in `omnigraph-engine` can
73    /// implement `TableStorage`. Combined with the trait being the only
74    /// route to write APIs from engine code, this gives type-system
75    /// enforcement of the staged-write invariant.
76    pub trait Sealed {}
77
78    impl Sealed for crate::table_store::TableStore {}
79}
80
81// ─── opaque handles ────────────────────────────────────────────────────────
82
83/// Opaque handle to a snapshot of a single sub-table dataset at a
84/// specific version.
85///
86/// Engine code never sees `lance::Dataset` directly; it holds
87/// `SnapshotHandle` and passes it back to `TableStorage` methods.
88/// Inside this crate, `pub(crate)` accessors expose the inner
89/// `Arc<Dataset>` to the `TableStorage` impl.
90#[derive(Debug, Clone)]
91pub struct SnapshotHandle {
92    pub(crate) inner: Arc<Dataset>,
93}
94
95impl SnapshotHandle {
96    /// Construct from a Lance dataset. `pub(crate)` — only
97    /// `TableStore` should produce these.
98    pub(crate) fn new(ds: Dataset) -> Self {
99        Self {
100            inner: Arc::new(ds),
101        }
102    }
103
104    /// Borrow the underlying Lance dataset. `pub(crate)` so only the
105    /// `TableStorage` impl in this crate can reach through.
106    pub(crate) fn dataset(&self) -> &Dataset {
107        &self.inner
108    }
109
110    /// Take ownership of the inner `Arc<Dataset>`. Used by the
111    /// `TableStorage` impl when an op needs to mutate the dataset in
112    /// place (commit a staged write, append, overwrite, …).
113    ///
114    /// Performance note: callers consume the returned `Arc` via
115    /// `Arc::try_unwrap(...).unwrap_or_else(|arc| (*arc).clone())`. The
116    /// fast path (no clone) only fires when the snapshot is single-ref
117    /// — i.e. the caller dropped every other `SnapshotHandle` clone
118    /// before calling. Holding parallel clones (e.g. across an `await`
119    /// point or stashed in a struct) forces a deep `Dataset` clone on
120    /// every mutating op. Engine callers should pass `SnapshotHandle`
121    /// by value into the mutating method, not keep a side copy.
122    pub(crate) fn into_arc(self) -> Arc<Dataset> {
123        self.inner
124    }
125
126    /// Take ownership of the inner `Dataset` by unwrapping the `Arc`
127    /// (or cloning if the snapshot is shared). `pub(crate)` — used
128    /// only by the maintenance path (`optimize`, `cleanup`) which
129    /// must hand `&mut Dataset` to Lance compaction / cleanup APIs
130    /// that the `TableStorage` trait does not (and should not)
131    /// surface. Engine code that participates in the staged-write
132    /// invariant must stay on the trait methods.
133    ///
134    /// Single-ref invariant: same fast-path/clone behavior as
135    /// `into_arc` — see that method's doc. Drop sibling
136    /// `SnapshotHandle` clones before calling.
137    pub(crate) fn into_dataset(self) -> Dataset {
138        Arc::try_unwrap(self.inner).unwrap_or_else(|arc| (*arc).clone())
139    }
140
141    // ── public, lance-free accessors ──
142
143    /// Current Lance manifest version of the snapshot.
144    pub fn version(&self) -> u64 {
145        self.inner.version().version
146    }
147
148    /// Whether the underlying dataset uses stable row IDs.
149    pub fn uses_stable_row_ids(&self) -> bool {
150        self.inner.manifest.uses_stable_row_ids()
151    }
152}
153
154/// Opaque handle to a staged Lance transaction (data write or scalar
155/// index build) that has not yet advanced HEAD.
156///
157/// Produced by `TableStorage::stage_*`, consumed by
158/// `TableStorage::commit_staged`. Carries the underlying `StagedWrite`
159/// (transaction + read-your-writes deltas) behind `pub(crate)`.
160#[derive(Debug, Clone)]
161pub struct StagedHandle {
162    pub(crate) inner: StagedWrite,
163}
164
165impl StagedHandle {
166    pub(crate) fn new(staged: StagedWrite) -> Self {
167        Self { inner: staged }
168    }
169
170    /// Take ownership of the inner `StagedWrite`. Used by
171    /// `commit_staged`.
172    pub(crate) fn into_staged(self) -> StagedWrite {
173        self.inner
174    }
175}
176
177/// Helper: clone the inner `StagedWrite` out of each `StagedHandle` and
178/// collect into a `Vec<StagedWrite>` for handing to
179/// `TableStore::stage_append`'s `prior_stages` parameter. The result is
180/// owned (not borrowed) — callers that already had a `&[StagedHandle]`
181/// pay a clone cost per element. `StagedWrite::clone` is cheap because
182/// `Transaction` and `Vec<Fragment>` are shallow-clone friendly.
183pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
184    handles.iter().map(|h| h.inner.clone()).collect()
185}
186
187/// Outcome of a per-table branch fork (`fork_branch_from_state`).
188///
189/// `RefAlreadyExists` means a Lance branch ref for the target already exists
190/// on the dataset, so `create_branch` could not create it cleanly. By the
191/// fork caller's contract — the caller re-checks the live manifest under the
192/// held per-`(table, branch)` write queue and only forks when the manifest
193/// does *not* place the table on the branch — such a ref is a
194/// manifest-unreferenced fork (the residue of an interrupted prior fork, or a
195/// delete+recreate), which the caller reclaims and re-forks. The fork
196/// operation does not editorialize ("incomplete prior delete"); it returns
197/// this typed signal and lets the db layer decide.
198// `pub` (not `pub(crate)`) to match the visibility of the sealed
199// `TableStorage::fork_branch_from_state` that returns it (and the already-`pub`
200// `SnapshotHandle`); avoids a private-interfaces warning. The trait is sealed,
201// so this widening does not let external code construct or branch on it.
202pub enum ForkOutcome<D> {
203    Created(D),
204    RefAlreadyExists,
205}
206
207// ─── TableStorage trait ────────────────────────────────────────────────────
208
209/// Engine-internal trait covering every Lance dataset operation an
210/// `omnigraph` engine call site might perform.
211///
212/// `TableStore` is the only `impl`. The trait is sealed; the inline
213/// Lance APIs are not reachable through trait dispatch. New writers that
214/// might advance Lance HEAD MUST add a staged-shape method here.
215#[async_trait]
216pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
217    // ── Snapshot opens (no HEAD advance) ────────────────────────────────
218
219    async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
220
221    async fn open_snapshot_at_table(
222        &self,
223        snapshot: &Snapshot,
224        table_key: &str,
225    ) -> Result<SnapshotHandle>;
226
227    async fn open_dataset_head(
228        &self,
229        dataset_uri: &str,
230        branch: Option<&str>,
231    ) -> Result<SnapshotHandle>;
232
233    async fn open_dataset_head_for_write(
234        &self,
235        table_key: &str,
236        dataset_uri: &str,
237        branch: Option<&str>,
238    ) -> Result<SnapshotHandle>;
239
240    async fn open_dataset_at_state(
241        &self,
242        table_path: &str,
243        branch: Option<&str>,
244        version: u64,
245    ) -> Result<SnapshotHandle>;
246
247    async fn fork_branch_from_state(
248        &self,
249        dataset_uri: &str,
250        source_branch: Option<&str>,
251        table_key: &str,
252        source_version: u64,
253        target_branch: &str,
254    ) -> Result<ForkOutcome<SnapshotHandle>>;
255
256    async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
257
258    /// Idempotent variant of `delete_branch` used by the best-effort fork
259    /// reclaim under branch delete (`db/omnigraph.rs::cleanup_deleted_branch_tables`)
260    /// and by the orphan-fork reconciler in `optimize`. Tolerates an
261    /// already-absent branch (both Lance's `RefNotFound` and the local-store
262    /// `NotFound` quirk on a missing `tree/{branch}/` dir). A still-referenced
263    /// branch (`RefConflict`) still surfaces as `OmniError::Lance`.
264    async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
265
266    /// List the named Lance branches present on the dataset at `dataset_uri`.
267    /// The `cleanup` orphan reconciler diffs this against the manifest
268    /// branch set to find orphaned per-table forks. `main`/default is not a
269    /// named branch and never appears here.
270    async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>>;
271
272    async fn reopen_for_mutation(
273        &self,
274        dataset_uri: &str,
275        branch: Option<&str>,
276        table_key: &str,
277        expected_version: u64,
278    ) -> Result<SnapshotHandle>;
279
280    fn ensure_expected_version(
281        &self,
282        snapshot: &SnapshotHandle,
283        table_key: &str,
284        expected_version: u64,
285    ) -> Result<()>;
286
287    // ── Reads (no HEAD advance) ────────────────────────────────────────
288
289    async fn scan(
290        &self,
291        snapshot: &SnapshotHandle,
292        projection: Option<&[&str]>,
293        filter: Option<&str>,
294        order_by: Option<Vec<ColumnOrdering>>,
295    ) -> Result<Vec<RecordBatch>>;
296
297    async fn scan_with_row_id(
298        &self,
299        snapshot: &SnapshotHandle,
300        projection: Option<&[&str]>,
301        filter: Option<&str>,
302        order_by: Option<Vec<ColumnOrdering>>,
303        with_row_id: bool,
304    ) -> Result<Vec<RecordBatch>>;
305
306    async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
307
308    async fn scan_batches_for_rewrite(&self, snapshot: &SnapshotHandle)
309    -> Result<Vec<RecordBatch>>;
310
311    async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize>;
312
313    async fn count_rows_with_staged(
314        &self,
315        snapshot: &SnapshotHandle,
316        staged: &[StagedHandle],
317        filter: Option<String>,
318    ) -> Result<usize>;
319
320    async fn scan_with_staged(
321        &self,
322        snapshot: &SnapshotHandle,
323        staged: &[StagedHandle],
324        projection: Option<&[&str]>,
325        filter: Option<&str>,
326    ) -> Result<Vec<RecordBatch>>;
327
328    async fn scan_with_pending(
329        &self,
330        snapshot: &SnapshotHandle,
331        pending: &[RecordBatch],
332        pending_schema: Option<SchemaRef>,
333        projection: Option<&[&str]>,
334        filter: Option<&str>,
335        key_column: Option<&str>,
336    ) -> Result<Vec<RecordBatch>>;
337
338    async fn first_row_id_for_filter(
339        &self,
340        snapshot: &SnapshotHandle,
341        filter: &str,
342    ) -> Result<Option<u64>>;
343
344    async fn table_state(&self, dataset_uri: &str, snapshot: &SnapshotHandle)
345    -> Result<TableState>;
346
347    // ── Staged writes (no HEAD advance) ────────────────────────────────
348
349    async fn stage_append(
350        &self,
351        snapshot: &SnapshotHandle,
352        batch: RecordBatch,
353        prior_stages: &[StagedHandle],
354    ) -> Result<StagedHandle>;
355
356    /// Append `source`'s rows into `snapshot`'s table, streaming so the whole
357    /// row set is never materialized in memory (see `TableStore::stage_append_stream`).
358    async fn stage_append_stream(
359        &self,
360        snapshot: &SnapshotHandle,
361        source: &SnapshotHandle,
362        prior_stages: &[StagedHandle],
363    ) -> Result<StagedHandle>;
364
365    async fn stage_merge_insert(
366        &self,
367        snapshot: SnapshotHandle,
368        batch: RecordBatch,
369        key_columns: Vec<String>,
370        when_matched: WhenMatched,
371        when_not_matched: WhenNotMatched,
372    ) -> Result<StagedHandle>;
373
374    async fn commit_staged(
375        &self,
376        snapshot: SnapshotHandle,
377        staged: StagedHandle,
378    ) -> Result<SnapshotHandle>;
379
380    /// Stage an overwrite (Operation::Overwrite). MR-793 Phase 2.
381    async fn stage_overwrite(
382        &self,
383        snapshot: &SnapshotHandle,
384        batch: RecordBatch,
385    ) -> Result<StagedHandle>;
386
387    /// Stage a BTREE scalar index build. MR-793 Phase 2.
388    async fn stage_create_btree_index(
389        &self,
390        snapshot: &SnapshotHandle,
391        columns: &[&str],
392    ) -> Result<StagedHandle>;
393
394    /// Stage an INVERTED (FTS) scalar index build. MR-793 Phase 2.
395    async fn stage_create_inverted_index(
396        &self,
397        snapshot: &SnapshotHandle,
398        column: &str,
399    ) -> Result<StagedHandle>;
400
401    // ── Index presence (reads, no HEAD advance) ──────────────────────
402    //
403    // The inline-commit writes (`delete_where`, `create_vector_index`) are
404    // deliberately NOT on this trait. They live on
405    // the separate `InlineCommitResidual` trait, reachable only through
406    // `Omnigraph::storage_inline_residual()`. As a result the default
407    // `db.storage()` surface cannot couple "write bytes" with "advance HEAD"
408    // — closing MR-793 acceptance §1 by construction rather than by review.
409
410    async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
411    async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
412    async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
413
414    // ── URI helpers ────────────────────────────────────────────────────
415    //
416    // These are pure string formatting; they live on the trait so engine
417    // code holding `Arc<dyn TableStorage>` can compute dataset URIs
418    // without importing the concrete struct.
419
420    fn root_uri(&self) -> &str;
421    fn dataset_uri(&self, table_path: &str) -> String;
422
423    // ── Streaming access (used by the export path) ────────────────────
424    //
425    // Engine code that needs a `DatasetRecordBatchStream` (rather than a
426    // collected `Vec<RecordBatch>`) goes through this trait method.
427    // Useful for the JSONL exporter that streams rows to a writer
428    // without materializing the whole result.
429
430    async fn scan_stream(
431        &self,
432        snapshot: &SnapshotHandle,
433        projection: Option<&[&str]>,
434        filter: Option<&str>,
435        order_by: Option<Vec<ColumnOrdering>>,
436        with_row_id: bool,
437    ) -> Result<DatasetRecordBatchStream>;
438}
439
440// ─── InlineCommitResidual trait ────────────────────────────────────────────
441
442/// Inline-commit residual surface: the writes Lance cannot yet express as a
443/// stage-then-commit pair, so they advance Lance HEAD as a side effect of
444/// writing. Kept OFF `TableStorage` and reachable only through
445/// `Omnigraph::storage_inline_residual()`, so the default `db.storage()` path
446/// is staged-only and a new writer cannot reintroduce the write+commit coupling
447/// by accident (MR-793 acceptance §1, by construction).
448///
449/// Residual reasons (each is named honestly at its call site):
450/// * `delete_where` — Lance has no public two-phase delete on the 6.x line
451///   (`DeleteBuilder::execute_uncommitted` first ships in v7.x; MR-A / Lance
452///   #6658). The D2 parse-time rule + recovery sidecars cover the gap meanwhile.
453/// * `create_vector_index` — vector-index segment-commit needs
454///   `build_index_metadata_from_segments`, still `pub(crate)` in Lance 6.0.1
455///   (Lance #6666). Scalar indices already stage.
456#[async_trait]
457pub(crate) trait InlineCommitResidual: sealed::Sealed + Send + Sync + Debug {
458    async fn delete_where(
459        &self,
460        dataset_uri: &str,
461        snapshot: SnapshotHandle,
462        filter: &str,
463    ) -> Result<(SnapshotHandle, DeleteState)>;
464
465    async fn create_vector_index(
466        &self,
467        snapshot: SnapshotHandle,
468        column: &str,
469    ) -> Result<SnapshotHandle>;
470}
471
472// ─── single impl: TableStore ──────────────────────────────────────────────
473
474#[async_trait]
475impl TableStorage for TableStore {
476    async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
477        self.open_at_entry(entry).await.map(SnapshotHandle::new)
478    }
479
480    async fn open_snapshot_at_table(
481        &self,
482        snapshot: &Snapshot,
483        table_key: &str,
484    ) -> Result<SnapshotHandle> {
485        self.open_snapshot_table(snapshot, table_key)
486            .await
487            .map(SnapshotHandle::new)
488    }
489
490    async fn open_dataset_head(
491        &self,
492        dataset_uri: &str,
493        branch: Option<&str>,
494    ) -> Result<SnapshotHandle> {
495        TableStore::open_dataset_head(self, dataset_uri, branch)
496            .await
497            .map(SnapshotHandle::new)
498    }
499
500    async fn open_dataset_head_for_write(
501        &self,
502        table_key: &str,
503        dataset_uri: &str,
504        branch: Option<&str>,
505    ) -> Result<SnapshotHandle> {
506        TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
507            .await
508            .map(SnapshotHandle::new)
509    }
510
511    async fn open_dataset_at_state(
512        &self,
513        table_path: &str,
514        branch: Option<&str>,
515        version: u64,
516    ) -> Result<SnapshotHandle> {
517        TableStore::open_dataset_at_state(self, table_path, branch, version)
518            .await
519            .map(SnapshotHandle::new)
520    }
521
522    async fn fork_branch_from_state(
523        &self,
524        dataset_uri: &str,
525        source_branch: Option<&str>,
526        table_key: &str,
527        source_version: u64,
528        target_branch: &str,
529    ) -> Result<ForkOutcome<SnapshotHandle>> {
530        Ok(
531            match TableStore::fork_branch_from_state(
532                self,
533                dataset_uri,
534                source_branch,
535                table_key,
536                source_version,
537                target_branch,
538            )
539            .await?
540            {
541                ForkOutcome::Created(ds) => ForkOutcome::Created(SnapshotHandle::new(ds)),
542                ForkOutcome::RefAlreadyExists => ForkOutcome::RefAlreadyExists,
543            },
544        )
545    }
546
547    async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
548        TableStore::delete_branch(self, dataset_uri, branch).await
549    }
550
551    async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
552        TableStore::force_delete_branch(self, dataset_uri, branch).await
553    }
554
555    async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>> {
556        TableStore::list_branches(self, dataset_uri).await
557    }
558
559    async fn reopen_for_mutation(
560        &self,
561        dataset_uri: &str,
562        branch: Option<&str>,
563        table_key: &str,
564        expected_version: u64,
565    ) -> Result<SnapshotHandle> {
566        TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
567            .await
568            .map(SnapshotHandle::new)
569    }
570
571    fn ensure_expected_version(
572        &self,
573        snapshot: &SnapshotHandle,
574        table_key: &str,
575        expected_version: u64,
576    ) -> Result<()> {
577        TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
578    }
579
580    async fn scan(
581        &self,
582        snapshot: &SnapshotHandle,
583        projection: Option<&[&str]>,
584        filter: Option<&str>,
585        order_by: Option<Vec<ColumnOrdering>>,
586    ) -> Result<Vec<RecordBatch>> {
587        TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
588    }
589
590    async fn scan_with_row_id(
591        &self,
592        snapshot: &SnapshotHandle,
593        projection: Option<&[&str]>,
594        filter: Option<&str>,
595        order_by: Option<Vec<ColumnOrdering>>,
596        with_row_id: bool,
597    ) -> Result<Vec<RecordBatch>> {
598        TableStore::scan_with(
599            self,
600            snapshot.dataset(),
601            projection,
602            filter,
603            order_by,
604            with_row_id,
605            |_| Ok(()),
606        )
607        .await
608    }
609
610    async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
611        TableStore::scan_batches(self, snapshot.dataset()).await
612    }
613
614    async fn scan_batches_for_rewrite(
615        &self,
616        snapshot: &SnapshotHandle,
617    ) -> Result<Vec<RecordBatch>> {
618        TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
619    }
620
621    async fn count_rows(&self, snapshot: &SnapshotHandle, filter: Option<String>) -> Result<usize> {
622        TableStore::count_rows(self, snapshot.dataset(), filter).await
623    }
624
625    async fn count_rows_with_staged(
626        &self,
627        snapshot: &SnapshotHandle,
628        staged: &[StagedHandle],
629        filter: Option<String>,
630    ) -> Result<usize> {
631        let staged_writes = staged_handles_as_writes(staged);
632        TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
633    }
634
635    async fn scan_with_staged(
636        &self,
637        snapshot: &SnapshotHandle,
638        staged: &[StagedHandle],
639        projection: Option<&[&str]>,
640        filter: Option<&str>,
641    ) -> Result<Vec<RecordBatch>> {
642        let staged_writes = staged_handles_as_writes(staged);
643        TableStore::scan_with_staged(self, snapshot.dataset(), &staged_writes, projection, filter)
644            .await
645    }
646
647    async fn scan_with_pending(
648        &self,
649        snapshot: &SnapshotHandle,
650        pending: &[RecordBatch],
651        pending_schema: Option<SchemaRef>,
652        projection: Option<&[&str]>,
653        filter: Option<&str>,
654        key_column: Option<&str>,
655    ) -> Result<Vec<RecordBatch>> {
656        TableStore::scan_with_pending(
657            self,
658            snapshot.dataset(),
659            pending,
660            pending_schema,
661            projection,
662            filter,
663            key_column,
664        )
665        .await
666    }
667
668    async fn first_row_id_for_filter(
669        &self,
670        snapshot: &SnapshotHandle,
671        filter: &str,
672    ) -> Result<Option<u64>> {
673        TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
674    }
675
676    async fn table_state(
677        &self,
678        dataset_uri: &str,
679        snapshot: &SnapshotHandle,
680    ) -> Result<TableState> {
681        TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
682    }
683
684    async fn stage_append(
685        &self,
686        snapshot: &SnapshotHandle,
687        batch: RecordBatch,
688        prior_stages: &[StagedHandle],
689    ) -> Result<StagedHandle> {
690        let staged_writes = staged_handles_as_writes(prior_stages);
691        TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
692            .await
693            .map(StagedHandle::new)
694    }
695
696    async fn stage_append_stream(
697        &self,
698        snapshot: &SnapshotHandle,
699        source: &SnapshotHandle,
700        prior_stages: &[StagedHandle],
701    ) -> Result<StagedHandle> {
702        let staged_writes = staged_handles_as_writes(prior_stages);
703        TableStore::stage_append_stream(self, snapshot.dataset(), source.dataset(), &staged_writes)
704            .await
705            .map(StagedHandle::new)
706    }
707
708    async fn stage_merge_insert(
709        &self,
710        snapshot: SnapshotHandle,
711        batch: RecordBatch,
712        key_columns: Vec<String>,
713        when_matched: WhenMatched,
714        when_not_matched: WhenNotMatched,
715    ) -> Result<StagedHandle> {
716        let ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
717        TableStore::stage_merge_insert(self, ds, batch, key_columns, when_matched, when_not_matched)
718            .await
719            .map(StagedHandle::new)
720    }
721
722    async fn commit_staged(
723        &self,
724        snapshot: SnapshotHandle,
725        staged: StagedHandle,
726    ) -> Result<SnapshotHandle> {
727        let ds_arc = snapshot.into_arc();
728        let transaction = staged.into_staged().transaction;
729        TableStore::commit_staged(self, ds_arc, transaction)
730            .await
731            .map(SnapshotHandle::new)
732    }
733
734    async fn stage_overwrite(
735        &self,
736        snapshot: &SnapshotHandle,
737        batch: RecordBatch,
738    ) -> Result<StagedHandle> {
739        TableStore::stage_overwrite(self, snapshot.dataset(), batch)
740            .await
741            .map(StagedHandle::new)
742    }
743
744    async fn stage_create_btree_index(
745        &self,
746        snapshot: &SnapshotHandle,
747        columns: &[&str],
748    ) -> Result<StagedHandle> {
749        TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
750            .await
751            .map(StagedHandle::new)
752    }
753
754    async fn stage_create_inverted_index(
755        &self,
756        snapshot: &SnapshotHandle,
757        column: &str,
758    ) -> Result<StagedHandle> {
759        TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
760            .await
761            .map(StagedHandle::new)
762    }
763
764    async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
765        TableStore::has_btree_index(self, snapshot.dataset(), column).await
766    }
767
768    async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
769        TableStore::has_fts_index(self, snapshot.dataset(), column).await
770    }
771
772    async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
773        TableStore::has_vector_index(self, snapshot.dataset(), column).await
774    }
775
776    fn root_uri(&self) -> &str {
777        TableStore::root_uri(self)
778    }
779
780    fn dataset_uri(&self, table_path: &str) -> String {
781        TableStore::dataset_uri(self, table_path)
782    }
783
784    async fn scan_stream(
785        &self,
786        snapshot: &SnapshotHandle,
787        projection: Option<&[&str]>,
788        filter: Option<&str>,
789        order_by: Option<Vec<ColumnOrdering>>,
790        with_row_id: bool,
791    ) -> Result<DatasetRecordBatchStream> {
792        // Note: existing TableStore::scan_stream is an associated fn that
793        // takes &Dataset, so we delegate via the dataset reference held by
794        // the snapshot.
795        TableStore::scan_stream(
796            snapshot.dataset(),
797            projection,
798            filter,
799            order_by,
800            with_row_id,
801        )
802        .await
803    }
804}
805
806#[async_trait]
807impl InlineCommitResidual for TableStore {
808    async fn delete_where(
809        &self,
810        dataset_uri: &str,
811        snapshot: SnapshotHandle,
812        filter: &str,
813    ) -> Result<(SnapshotHandle, DeleteState)> {
814        let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
815        let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
816        Ok((SnapshotHandle::new(ds), state))
817    }
818
819    async fn create_vector_index(
820        &self,
821        snapshot: SnapshotHandle,
822        column: &str,
823    ) -> Result<SnapshotHandle> {
824        let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
825        TableStore::create_vector_index(self, &mut ds, column).await?;
826        Ok(SnapshotHandle::new(ds))
827    }
828}